In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import dayofmonth, to_date, count_distinct, hour,minute
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
from pprint import pprint

In [2]:
spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/10 12:57:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.sparkContext.setLogLevel("ERROR")

In [4]:
schema = StructType(
    [
        StructField('event_time', StringType()),
        StructField('event_type', StringType()),
        StructField('product_id', StringType()),
        StructField('category_id', StringType()),
        StructField('category_code', StringType()),
        StructField('brand', StringType()),
        StructField('price', StringType()),
        StructField('user_id', StringType()),
        StructField('user_session', StringType()),
    ]
)

In [5]:
df = spark.read.option("header",True).schema(schema).csv("./2019-Nov.csv")#.limit(100000)

In [6]:
df.show(truncate=False)

+-----------------------+----------+----------+-------------------+--------------------------------+--------+------+---------+------------------------------------+
|event_time             |event_type|product_id|category_id        |category_code                   |brand   |price |user_id  |user_session                        |
+-----------------------+----------+----------+-------------------+--------------------------------+--------+------+---------+------------------------------------+
|2019-11-01 00:00:00 UTC|view      |1003461   |2053013555631882655|electronics.smartphone          |xiaomi  |489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|
|2019-11-01 00:00:00 UTC|view      |5000088   |2053013566100866035|appliances.sewing_machine       |janome  |293.65|530496790|8e5f4f83-366c-4f70-860e-ca7417414283|
|2019-11-01 00:00:01 UTC|view      |17302664  |2053013553853497655|null                            |creed   |28.31 |561587266|755422e7-9040-477b-9bd2-6a6e8fd97387|
|2019-11-01 00:0

In [7]:
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [8]:
df.head(5)

[Row(event_time='2019-11-01 00:00:00 UTC', event_type='view', product_id='1003461', category_id='2053013555631882655', category_code='electronics.smartphone', brand='xiaomi', price='489.07', user_id='520088904', user_session='4d3b30da-a5e4-49df-b1a8-ba5943f1dd33'),
 Row(event_time='2019-11-01 00:00:00 UTC', event_type='view', product_id='5000088', category_id='2053013566100866035', category_code='appliances.sewing_machine', brand='janome', price='293.65', user_id='530496790', user_session='8e5f4f83-366c-4f70-860e-ca7417414283'),
 Row(event_time='2019-11-01 00:00:01 UTC', event_type='view', product_id='17302664', category_id='2053013553853497655', category_code=None, brand='creed', price='28.31', user_id='561587266', user_session='755422e7-9040-477b-9bd2-6a6e8fd97387'),
 Row(event_time='2019-11-01 00:00:01 UTC', event_type='view', product_id='3601530', category_id='2053013563810775923', category_code='appliances.kitchen.washer', brand='lg', price='712.87', user_id='518085591', user_sess

In [None]:
df.tail(5)



# 1. 해당 전체 기간에서, KST 기준으로 active user 수가 제일 큰 날짜를 구하세요

In [6]:
max_active = df.groupby(dayofmonth('event_time').alias('date')).agg(count_distinct('user_id').alias('u_count')).orderBy('u_count', ascending=False)

In [77]:
max_user = max_active.collect()

                                                                                

In [78]:
max_user

[Row(date=1, u_count=20778)]

In [79]:
target_date = max_user[0].date

In [10]:
target_date

17

---

# 2. 1의 날짜에서, 세션이 가장 긴 사용자 10명에 대해 "user_id, session_id, 세션시간"를 구하세요

In [8]:
tmp = df.filter(dayofmonth(df.event_time) == target_date).orderBy('user_session')

In [9]:
tmp.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [11]:
spark.catalog.dropTempView('tmp')
tmp.createTempView('tmp')

spark.sql("""
        select first(t.user_id) as user_id, last(user_id) as last
        from tmp t
        group by t.user_session
        having last != user_id
        """).show(truncate=False)
#세션아이디가 같으면서 유저아이디가 다른 경우가 있는지 확인


spark.sql("""
        select t.user_session, min(t.event_time) as start, max(t.event_time) as end, bigint(to_timestamp(max(t.event_time)))-bigint(to_timestamp(min(t.event_time))) as keep, first(t.user_id) as user_id, last(user_id) as last
        from tmp t
        group by t.user_session
        having last != user_id
        order by keep desc
        
        """).show(truncate=False)

In [34]:
spark.sql("""
        select first(user_id) as user_id, 
                user_session as session_id, 
                round(
                    (extract(hours from to_timestamp(max(event_time))-to_timestamp(min(event_time)))*60 
                    + extract(mins from to_timestamp(max(event_time))-to_timestamp(min(event_time)))
                    + extract(seconds from to_timestamp(max(event_time))-to_timestamp(min(event_time)))/60), 1
                ) as session_time
        from tmp
        group by session_id
        order by session_time desc
        """).show(truncate=False)



+---------+------------------------------------+------------+
|user_id  |session_id                          |session_time|
+---------+------------------------------------+------------+
|512677696|87c256b2-a9a9-4c55-8867-4a3ab116aee7|132.8       |
|513151632|15ff5731-fe42-4628-b4bc-5f362b06432f|122.7       |
|566263856|93476518-09d0-4430-a224-d37a28d30488|120.0       |
|522957619|a88c923f-d3d1-4d24-9525-21c7ffbab9e6|108.6       |
|563875620|20fdf02c-5922-4662-9360-d876ac380157|102.5       |
|530187571|390a9a5f-a7b8-49be-b3f1-6037eeb181fa|98.8        |
|566284963|15bff1fb-de2a-4185-bea9-293ff2c9d5de|94.2        |
|517702267|42599d98-752a-4fd8-a485-d8d03a753dbb|93.2        |
|562824468|c0f72b7d-4b57-43b1-913e-d277649b1388|92.1        |
|536475561|a65d7633-2e46-4e5a-9b89-e3f1603dc63a|89.2        |
|512432743|a69bb438-d8ea-416b-8274-475a431823a3|81.6        |
|519155085|084a39d2-894f-436e-82f6-dcce242b389f|81.3        |
|566190908|d8c04b8e-fb2f-4990-a662-8e7b2c80bf38|77.8        |
|5662839

                                                                                

# 3. 1의 날짜의 15분단위로 active user 수를 구하세요

spark.sql("""
        with RECURSIVE time as (
            select 0 MINUTE
            union
            select MINUTE+15
            from time
        )
        SELECT *, count(*) as 'COUNT'
        FROM (
            select minute(to_timestamp(event_time)) as MINUTE from tmp
            union all
            select MINUTE from time
        ) as f
        group by MINUTE
        order by MINUTE
        """).show(truncate=False)

In [28]:
spark.sql("""
        SELECT floor((hour(event_time)*60 + minute(event_time))/15)*15 as m, count(distinct(user_id))
        from tmp
        group by floor((hour(event_time)*60 + minute(event_time))/15)
        """).show(truncate=False)



+---+-----------------------+
|m  |count(DISTINCT user_id)|
+---+-----------------------+
|0  |542                    |
|15 |628                    |
|30 |768                    |
|45 |669                    |
+---+-----------------------+



                                                                                

query = """concat( string( floor((floor((hour(event_time)*60 + minute(event_time))/15)*15)/60) ), 
                    ":",
                    string( mod( floor((hour(event_time)*60 + minute(event_time))/15)*15, 60)))"""

query2 = """concat( string( floor((floor((hour(event_time)*60 + minute(event_time))/15)*15+15)/60) ), 
                    ":",
                    string( mod( floor((hour(event_time)*60 + minute(event_time))/15)*15+15, 60)))"""

---

In [93]:
start1 = "string(floor((floor((hour(event_time)*60 + minute(event_time))/15)*15)/60))"

In [94]:
start2 = "string(mod(floor((hour(event_time)*60 + minute(event_time))/15)*15, 60))"

In [95]:
query = f"""concat( if(length({start1})=2, 
                    {start1}, 
                    concat(0,{start1})), 
                    
                    ":",
                    
                    if(length({start2})=2,
                    {start2},
                    concat(0, {start2}))
                )"""

In [96]:
end1 = "string(floor((floor((hour(event_time)*60 + minute(event_time))/15)*15+15)/60))"

In [97]:
end2 = "string(mod(floor((hour(event_time)*60 + minute(event_time))/15)*15+15, 60))"

In [98]:
query2 = f"""concat( if(length({end1})=2, 
                    {end1}, 
                    concat(0,{end1})), 
                    
                    ":",
                    
                    if(length({end2})=2,
                    {end2},
                    concat(0, {end2})
                    )
                )"""

In [100]:
spark.sql(f"""
        SELECT {query} as start, {query2} as end, count(distinct(user_id)) as active_users
        from tmp
        group by floor((hour(event_time)*60 + minute(event_time))/15)
        order by start
        """).show(200, truncate=False)



+-----+-----+------------+
|start|end  |active_users|
+-----+-----+------------+
|00:00|00:15|542         |
|00:15|00:30|628         |
|00:30|00:45|768         |
|00:45|01:00|870         |
|01:00|01:15|698         |
|01:15|01:30|411         |
|01:30|01:45|1309        |
|01:45|02:00|1549        |
|02:00|02:15|1796        |
|02:15|02:30|2072        |
|02:30|02:45|2261        |
|02:45|03:00|2578        |
|03:00|03:15|2996        |
|03:15|03:30|3411        |
|03:30|03:45|3463        |
|03:45|04:00|2244        |
+-----+-----+------------+



                                                                                

In [334]:
spark.sql("""
        SELECT floor(bigint(to_timestamp(event_time))/60/15)*15, floor(bigint(to_timestamp(event_time))/60/15)*15-15, count(distinct(user_id))
        from tmp
        group by floor(bigint(to_timestamp(event_time))/60/15)*15
        """).show(truncate=False)



+----------------------------------------------------+-----------------------------------------------------------+-----------------------+
|(FLOOR(((to_timestamp(event_time) / 60) / 15)) * 15)|((FLOOR(((to_timestamp(event_time) / 60) / 15)) * 15) - 15)|count(DISTINCT user_id)|
+----------------------------------------------------+-----------------------------------------------------------+-----------------------+
|26209440                                            |26209425                                                   |542                    |
|26209455                                            |26209440                                                   |628                    |
|26209470                                            |26209455                                                   |768                    |
|26209485                                            |26209470                                                   |669                    |
+--------------------------

                                                                                

# 4. 1의 날짜에서 view → cart → purchase 이벤트 진행에 따른 funnel 수치를 구하세요

In [38]:
spark.sql("""
        select event_type, count(*)
        from tmp
        group by event_type
""").show(truncate=False)



+----------+--------+
|event_type|count(1)|
+----------+--------+
|view      |48964   |
|cart      |468     |
|purchase  |568     |
+----------+--------+



                                                                                

In [107]:
tmp.groupby('event_type').agg(F.count('user_id').alias('activated_user')).show()



+----------+--------------+
|event_type|activated_user|
+----------+--------------+
|      view|         97489|
|      cart|          1089|
|  purchase|          1422|
+----------+--------------+



                                                                                

In [13]:
window = Window.orderBy(F.desc(F.substring('event_type', -1, 1)))

In [22]:
tmp.groupby('event_type').agg(F.count_distinct('user_id').alias('activated_user')
                             ).withColumn('lag', F.lag('activated_user', 1, 0).over(window)
                                         ).withColumn('Bounce_Rate', F.when(F.col('lag') == 0, 0).otherwise(F.round(1 - F.col('activated_user')/F.col('lag'), 2))
                                                     ).show()

[Stage 42:>                                                         (0 + 1) / 1]

+----------+--------------+------+-----------+
|event_type|activated_user|   lag|Bounce_Rate|
+----------+--------------+------+-----------+
|      view|        486485|     0|        0.0|
|      cart|        156741|486485|       0.68|
|  purchase|        113889|156741|       0.27|
+----------+--------------+------+-----------+



                                                                                

In [62]:
tmp.groupby('event_type').agg(F.count_distinct('user_id').alias('activated_user')
                             ).withColumn('lag', F.lag('activated_user', 1, 0).over(window)
                                         ).withColumn('Bounce_Rate', F.when(F.col('lag') == 0, 0).otherwise(F.round(1 - F.col('activated_user')/F.col('lag'), 3))
                                                     ).withColumn('Residual_rate', F.round(F.col('activated_user')/F.max("activated_user").over(window), 3)
                                                                 ).withColumn('Drop_rate', F.lag(F.round(F.col('Residual_rate')-F.lag('Residual_rate', -1, 0).over(window),3), 1, 0).over(window)
                                                                             ).show()



+----------+--------------+------+-----------+-------------+---------+
|event_type|activated_user|   lag|Bounce_Rate|Residual_rate|Drop_rate|
+----------+--------------+------+-----------+-------------+---------+
|      view|        486485|     0|        0.0|          1.0|      0.0|
|      cart|        156741|486485|      0.678|        0.322|    0.678|
|  purchase|        113889|156741|      0.273|        0.234|    0.088|
+----------+--------------+------+-----------+-------------+---------+



                                                                                