In [1]:
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

:: loading settings :: url = jar:file:/usr/local/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-05b2e292-fb8c-411f-9272-d2cd105223c7;1.0
	confs: [default]
	found io.delta#delta-core_2.12;0.8.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 918ms :: artifacts dl 40ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;0.8.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtime;3.5.2 fro

In [3]:
df = spark.read.option('header', True).csv('data.csv')

df = df.select([
    F.col('timestamp').cast(T.TimestampType()),
    F.col('user_id').cast(T.IntegerType()),
    F.col('event_name'),
    F.col('event_params')
])

df.show(5, truncate=False)

                                                                                

+-------------------+-------+-----------------+---------------------------------------------------------------+
|timestamp          |user_id|event_name       |event_params                                                   |
+-------------------+-------+-----------------+---------------------------------------------------------------+
|2021-08-01 11:00:00|1      |login            |null                                                           |
|2021-08-01 11:00:10|1      |home_screenview  |null                                                           |
|2021-08-01 11:01:00|1      |restaurant_choose|{'id':10,'brand':'Pizza Hut'}                                  |
|2021-08-01 11:02:00|1      |view_item        |{'item_id':22,'price':10.0,'category':'Pizza','promotion':'No'}|
|2021-08-01 11:04:00|1      |add_to_cart      |{'item_id':22,'qnt':1}                                         |
+-------------------+-------+-----------------+---------------------------------------------------------

In [4]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_params: string (nullable = true)



## Sessionization

In [8]:
max_session_duration = 60 * 5

df_session = df.select([
    F.col('timestamp'),
    F.lag('timestamp', 1).over(Window.partitionBy('user_id').orderBy('timestamp')).alias('lag'),
    F.col('user_id'),
    F.col('event_name'),
    F.col('event_params')
])

df_session = df_session.withColumn('diff',
    df_session.timestamp.cast(T.LongType()) - df_session.lag.cast(T.LongType())
)

df_session = df_session.select([
    'timestamp',
    'lag',
    'user_id',
    'event_name',
    'event_params',
    'diff',
    F.when(df_session.diff < max_session_duration, F.lit(0)).otherwise(F.lit(1)).alias('session')
])

df_session = df_session.select([
    'timestamp',
    'lag',
    'user_id',
    'event_name',
    'event_params',
    'diff',
    'session',
    F.sum('session').over(Window.partitionBy(['user_id']).orderBy('timestamp')).alias('session_id')
]).orderBy(['user_id','session_id','timestamp'])

df_session.show(10, truncate=30)



+-------------------+-------------------+-------+-----------------+------------------------------+----+-------+----------+
|          timestamp|                lag|user_id|       event_name|                  event_params|diff|session|session_id|
+-------------------+-------------------+-------+-----------------+------------------------------+----+-------+----------+
|2021-08-01 11:00:00|               null|      1|            login|                          null|null|      1|         1|
|2021-08-01 11:00:10|2021-08-01 11:00:00|      1|  home_screenview|                          null|  10|      0|         1|
|2021-08-01 11:01:00|2021-08-01 11:00:10|      1|restaurant_choose| {'id':10,'brand':'Pizza Hut'}|  50|      0|         1|
|2021-08-01 11:02:00|2021-08-01 11:01:00|      1|        view_item|{'item_id':22,'price':10.0,...|  60|      0|         1|
|2021-08-01 11:04:00|2021-08-01 11:02:00|      1|      add_to_cart|        {'item_id':22,'qnt':1}| 120|      0|         1|
|2021-08-01 11:0

                                                                                

In [9]:
df_session.select([
    'timestamp',
    'user_id',
    'session_id',
    'event_name',
    'event_params'
]).orderBy(['user_id','session_id','timestamp']).show(50, truncate=50)

                                                                                

+-------------------+-------+----------+-----------------+--------------------------------------------------+
|          timestamp|user_id|session_id|       event_name|                                      event_params|
+-------------------+-------+----------+-----------------+--------------------------------------------------+
|2021-08-01 11:00:00|      1|         1|            login|                                              null|
|2021-08-01 11:00:10|      1|         1|  home_screenview|                                              null|
|2021-08-01 11:01:00|      1|         1|restaurant_choose|                     {'id':10,'brand':'Pizza Hut'}|
|2021-08-01 11:02:00|      1|         1|        view_item|{'item_id':22,'price':10.0,'category':'Pizza','...|
|2021-08-01 11:04:00|      1|         1|      add_to_cart|                            {'item_id':22,'qnt':1}|
|2021-08-01 11:05:00|      1|         1|         purchase|   {'order':100,'value':10.0,'delivery_value':2.0}|
|2021-08-0

In [14]:
total_sessions = df_session.select(['user_id','session_id']).distinct().count()
total_sessions

                                                                                

5

In [17]:
total_sessions_with_purchase = df_session \
    .select(['user_id','session_id']) \
    .filter(df_session.event_name == 'purchase') \
    .distinct() \
    .count()
total_sessions_with_purchase

                                                                                

2

In [19]:
# convertion rate
total_sessions_with_purchase / total_sessions

0.4