In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Python Spark Windowing") \
    .master("local[2]") \
    .config("spark.driver.memory","2g") \
    .config("spark.executor.memory","2g") \
    .getOrCreate()

In [4]:
rows_test = [
    { 'value': 1, 'name': 'Luka' },
    { 'value': 2, 'name': 'Luka'},
    { 'value': 3, 'name': 'Dirk' },
    { 'value': 4, 'name': 'Dirk' },
    { 'value': 5, 'name': 'Luka' },
]

df = spark.createDataFrame(rows_test)

In [8]:
df.createOrReplaceTempView("rows_test")

In [None]:
spark.sql("""
SELECT 
    value,
    SUM(value) OVER ( 
        order by value 
        rows between 2 preceding and 2 following
    ) AS rolling_sum
FROM rows_test""").show()

+-----+-----------+
|value|rolling_sum|
+-----+-----------+
|    1|          6|
|    2|         10|
|    3|         15|
|    4|         14|
|    5|         12|
+-----+-----------+



In [10]:
spark.sql("""
SELECT 
    value,
    SUM(value) OVER (
        order by value 
        rows between unbounded preceding and 2 following
    ) AS rolling_sum
FROM rows_test""").show()

+-----+-----------+
|value|rolling_sum|
+-----+-----------+
|    1|          6|
|    2|         10|
|    3|         15|
|    4|         15|
|    5|         15|
+-----+-----------+



In [12]:
spark.sql("""
SELECT 
*,
FIRST_VALUE(value) OVER (
    partition by name
    order by value 
    rows between unbounded preceding and unbounded following
) AS min_value,
LAST_VALUE(value) OVER (
    partition by name
    order by value 
    rows between unbounded preceding and unbounded following
) AS max_value    
FROM rows_test""").show()

+----+-----+---------+---------+
|name|value|min_value|max_value|
+----+-----+---------+---------+
|Dirk|    3|        3|        4|
|Dirk|    4|        3|        4|
|Luka|    1|        1|        5|
|Luka|    2|        1|        5|
|Luka|    5|        1|        5|
+----+-----+---------+---------+



In [2]:
# Snowflae 연결 정보
from dotenv import load_dotenv
import os

load_dotenv("C:/Users/Yeojun/airflow/airflow-dag/.env")

user = os.environ.get("SNOWFLAKE_USER")
password = os.environ.get("SNOWFLAKE_PASS")
account = os.environ.get("SNOWFLAKE_ACCOUNT")
jdbc_url = f"jdbc:snowflake://{account}.snowflakecomputing.com/?warehouse=COMPUTE_WH"

In [3]:
df_user_session_channel = spark.read \
.format("jdbc")\
.option("url",jdbc_url)\
.option("dbtable","dev.raw_data.user_session_channel")\
.option("user",user)\
.option("password",password)\
.option("driver", "net.snowflake.client.jdbc.SnowflakeDriver") \
.load()

df_session_timestamp = spark.read \
    .format("jdbc")\
    .option("url",jdbc_url)\
    .option("dbtable","dev.raw_data.session_timestamp")\
    .option("user",user)\
    .option("password",password)\
    .option("driver", "net.snowflake.client.jdbc.SnowflakeDriver") \
    .load()

df_session_transaction = spark.read \
    .format("jdbc")\
    .option("url",jdbc_url)\
    .option("dbtable","dev.raw_data.session_transaction")\
    .option("user",user)\
    .option("password",password)\
    .option("driver", "net.snowflake.client.jdbc.SnowflakeDriver") \
    .load()

df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")

In [13]:
first_last_channel_df = spark.sql("""
WITH RECORD AS (
  SELECT /*사용자의 유입에 따른, 채널 순서 매기는 쿼리*/
      userid,
      channel, 
      ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts ASC) AS seq_first,
      ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last
  FROM user_session_channel u
  LEFT JOIN session_timestamp t
    ON u.sessionid = t.sessionid
)
SELECT /*유저의 첫번째 유입채널, 마지막 유입 채널 구하기*/
      f.userid,
      f.channel first_channel,
      l.channel last_channel
FROM RECORD f
INNER JOIN RECORD l ON f.userid = l.userid
WHERE f.seq_first = 1 and l.seq_last = 1
ORDER BY userid
""").show(10)

+------+-------------+------------+
|userid|first_channel|last_channel|
+------+-------------+------------+
|    27|      Youtube|   Instagram|
|    29|        Naver|       Naver|
|    33|       Google|     Youtube|
|    34|      Youtube|       Naver|
|    36|        Naver|     Youtube|
|    40|      Youtube|      Google|
|    41|     Facebook|     Youtube|
|    44|        Naver|   Instagram|
|    45|      Youtube|   Instagram|
|    59|    Instagram|   Instagram|
+------+-------------+------------+
only showing top 10 rows



In [14]:
first_last_channel_df2 = spark.sql("""
SELECT DISTINCT A.userid,
    FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel,
    LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid""").show(10)

+------+-------------+------------+
|userid|First_Channel|Last_Channel|
+------+-------------+------------+
|    27|      Youtube|   Instagram|
|    29|        Naver|       Naver|
|    33|       Google|     Youtube|
|    34|      Youtube|       Naver|
|    36|        Naver|     Youtube|
|    40|      Youtube|      Google|
|    41|     Facebook|     Youtube|
|    44|        Naver|   Instagram|
|    45|      Youtube|   Instagram|
|    59|    Instagram|   Instagram|
+------+-------------+------------+
only showing top 10 rows

