The dataset 'yoochoose-clicks' is a set of click events collected from a website of an online retailer. Each record in the dataset has 4 fields:

Session ID - the id of the session. In one session there are one or many clicks.  
Timestamp - the time when the click occurred.  
Item ID – the unique identifier of item.  
Category – the category of the item.  

The value "S" indicates a special offer, "0" indicates a missing value, a number between 1 to 12 indicates a
real category identifier, and any other number indicates a brand. If an item has been clicked in the context of
a promotion or special offer then the value is "S". If the category is a brand (i.e., BOSCH) then the value is an
8-10 digits number.

The objective is to **compute the average time** that users stay on items in each category.

For analysis purposes in this task, the following definitions will be used:  
(i) There are 15 item categories in the dataset: S, 0, 1 to 12, and B (for any 8-10 digits number)  
(ii) In each session, the time that a user stays on some item is the timestamp difference between a user
clicking on this item and the next item (if there is a next item).

In [1]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark-master", "local").getOrCreate()

In [2]:
from pyspark.sql.types import *

In [3]:
!head -n 3 data/yoochoose-clicks.dat

1,2014-04-07T10:51:09.277Z,214536502,0
1,2014-04-07T10:54:09.868Z,214536500,0
1,2014-04-07T10:54:46.998Z,214536506,0


In [4]:
myManualSchema = StructType([
    StructField("Session_ID", IntegerType(), nullable=False),
    StructField("Timestamp", TimestampType(), nullable=False),
    StructField("Item_ID", IntegerType(), nullable=False),
    StructField("Category", StringType(), nullable=False)])

get data types for schema by analyzing dataset

In [5]:
df_RD = spark.read.format("csv").load("data/yoochoose-clicks.dat", schema=myManualSchema)
df_RD.printSchema()

root
 |-- Session_ID: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Item_ID: integer (nullable = true)
 |-- Category: string (nullable = true)



In [6]:
df_RD.show(3,False)

+----------+-----------------------+---------+--------+
|Session_ID|Timestamp              |Item_ID  |Category|
+----------+-----------------------+---------+--------+
|1         |2014-04-07 18:51:09.277|214536502|0       |
|1         |2014-04-07 18:54:09.868|214536500|0       |
|1         |2014-04-07 18:54:46.998|214536506|0       |
+----------+-----------------------+---------+--------+
only showing top 3 rows



In [7]:
from pyspark.sql.functions import when, col

df_RD = df_RD.withColumn("Category", when(col('Category') > 12, 'B').otherwise(col('Category')))

standardize all categories outside of S, 0 and 1-12 to B for brands

In [8]:
df_RD.where(col("Session_ID") == 6157803).show()

+----------+--------------------+---------+--------+
|Session_ID|           Timestamp|  Item_ID|Category|
+----------+--------------------+---------+--------+
|   6157803|2014-07-12 02:37:...|214845956|       B|
|   6157803|2014-07-12 02:38:...|214845956|       B|
+----------+--------------------+---------+--------+



In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, lag

w = Window.partitionBy("Session_ID").orderBy(col("Timestamp").desc())
df_RD = df_RD.withColumn("next_Timestamp", lag(df_RD.Timestamp).over(w))

use window & lag to get previous row's timestamp value by sorting timestamp in descending order

In [10]:
df_RD.where(df_RD.Session_ID == 5274901).show(5, False)

+----------+-----------------------+---------+--------+-----------------------+
|Session_ID|Timestamp              |Item_ID  |Category|next_Timestamp         |
+----------+-----------------------+---------+--------+-----------------------+
|5274901   |2014-06-30 22:39:06.767|214691525|2       |null                   |
|5274901   |2014-06-30 22:37:45.792|214850822|2       |2014-06-30 22:39:06.767|
|5274901   |2014-06-30 22:34:45.691|214691587|S       |2014-06-30 22:37:45.792|
|5274901   |2014-06-30 22:34:12.711|214840907|2       |2014-06-30 22:34:45.691|
+----------+-----------------------+---------+--------+-----------------------+



In [11]:
from pyspark.sql.functions import isnull, unix_timestamp

df_RD = df_RD.withColumn("Time_Spent (s)", when(isnull(unix_timestamp("next_Timestamp") - unix_timestamp("Timestamp")), 0).otherwise(unix_timestamp("next_Timestamp") - unix_timestamp("Timestamp")))

next_timestamp minus time_stamp = time spent on item. null values indicate the last item accessed in session. since the dataset offers no way to determine the time of exit, set as 0 and later exclude row when calculating average

In [12]:
df_RD = df_RD.sort(col("Session_ID"), col("Timestamp"))

In [13]:
df_RD.where(df_RD.Session_ID == 5274901).show(5, False)

+----------+-----------------------+---------+--------+-----------------------+--------------+
|Session_ID|Timestamp              |Item_ID  |Category|next_Timestamp         |Time_Spent (s)|
+----------+-----------------------+---------+--------+-----------------------+--------------+
|5274901   |2014-06-30 22:34:12.711|214840907|2       |2014-06-30 22:34:45.691|33            |
|5274901   |2014-06-30 22:34:45.691|214691587|S       |2014-06-30 22:37:45.792|180           |
|5274901   |2014-06-30 22:37:45.792|214850822|2       |2014-06-30 22:39:06.767|81            |
|5274901   |2014-06-30 22:39:06.767|214691525|2       |null                   |0             |
+----------+-----------------------+---------+--------+-----------------------+--------------+



In [14]:
df_RD_no_zeroes = df_RD.where(col("Time_Spent (s)") != 0)
df_RD_no_zeroes.where(df_RD_no_zeroes.Session_ID == 5274901).show(5, False)

+----------+-----------------------+---------+--------+-----------------------+--------------+
|Session_ID|Timestamp              |Item_ID  |Category|next_Timestamp         |Time_Spent (s)|
+----------+-----------------------+---------+--------+-----------------------+--------------+
|5274901   |2014-06-30 22:34:12.711|214840907|2       |2014-06-30 22:34:45.691|33            |
|5274901   |2014-06-30 22:34:45.691|214691587|S       |2014-06-30 22:37:45.792|180           |
|5274901   |2014-06-30 22:37:45.792|214850822|2       |2014-06-30 22:39:06.767|81            |
+----------+-----------------------+---------+--------+-----------------------+--------------+



filter the 0s in time_spent column before calculating average

In [15]:
from pyspark.sql.functions import avg

df_RD_no_zeroes.groupBy("Category").agg(avg("Time_Spent (s)")).show()

+--------+-------------------+
|Category|avg(Time_Spent (s))|
+--------+-------------------+
|       7|  176.2787246127175|
|      11| 136.95635630224143|
|       3| 115.27989871062425|
|       8|  159.6770950770387|
|       0| 145.48937745235582|
|       5| 197.63863904487755|
|       B| 173.52147913561848|
|       6| 196.89169795888017|
|       S| 147.20889172922796|
|       9| 175.07878404053199|
|       1| 166.26310856804616|
|      10| 163.99088694946215|
|       4|  165.0959717712642|
|      12| 233.70240973901443|
|       2|  177.1940657677661|
+--------+-------------------+



Average time spent in each category type