# Day 3 : PySpark Transformations Deep Dive

In [0]:
from pyspark.sql import functions as F

path_oct = "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv"
path_nov = "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv" 

df_oct = spark.read.csv(path_oct, header=True, inferSchema=True)
df_nov = spark.read.csv(path_nov, header=True, inferSchema=True)

full_data = df_oct.unionByName(df_nov)

print(f"Total Rows: {full_data.count()}")

Total Rows: 109950743


In [0]:
premium_brands = full_data.filter(F.col("price") > 1000).select("brand").distinct().withColumn("is_premium_brand", F.lit(True))

enriched_df = full_data.join(F.broadcast(premium_brands), on="brand", how="left").fillna({"is_premium_brand": False})

display(enriched_df.select("event_time", "brand", "price", "is_premium_brand").limit(10))

event_time,brand,price,is_premium_brand
2019-10-01T00:00:00.000Z,shiseido,35.79,False
2019-10-01T00:00:00.000Z,aqua,33.2,False
2019-10-01T00:00:01.000Z,,543.1,False
2019-10-01T00:00:01.000Z,lenovo,251.74,True
2019-10-01T00:00:04.000Z,apple,1081.98,True
2019-10-01T00:00:05.000Z,pulser,908.62,True
2019-10-01T00:00:08.000Z,creed,380.96,False
2019-10-01T00:00:08.000Z,luminarc,41.16,False
2019-10-01T00:00:10.000Z,baden,102.71,False
2019-10-01T00:00:11.000Z,huawei,566.01,True


In [0]:
from pyspark.sql.window import Window

user_window = Window.partitionBy("user_id").orderBy("event_time")

final_df = enriched_df.withColumn("running_total_spend", F.sum("price").over(user_window)).withColumn("event_rank", F.row_number().over(user_window))

display(final_df.select("user_id", "event_time", "event_type", "price", "running_total_spend", "event_rank").limit(20))

user_id,event_time,event_type,price,running_total_spend,event_rank
65800726,2019-11-27T04:33:16.000Z,view,81.8,81.8,1
65800726,2019-11-27T04:35:24.000Z,view,81.8,163.6,2
81255481,2019-11-08T07:44:45.000Z,view,66.35,66.35,1
81255481,2019-11-21T14:11:26.000Z,view,66.14,132.49,2
106416780,2019-11-28T05:43:46.000Z,view,256.53,256.53,1
106416780,2019-11-28T05:47:29.000Z,view,476.12,732.65,2
106416780,2019-11-28T05:48:32.000Z,view,450.44,1183.09,3
106416780,2019-11-28T05:49:14.000Z,view,450.44,1633.53,4
117019800,2019-11-19T05:09:49.000Z,view,19.56,19.56,1
138365902,2019-11-14T08:03:01.000Z,view,159.57,159.57,1


In [0]:
prev_time_window = Window.partitionBy("user_id").orderBy("event_time")

feature_df = final_df.withColumn("prev_event_time", F.lag("event_time", 1).over(prev_time_window)).withColumn("seconds_since_last_event",(F.unix_timestamp("event_time") - F.unix_timestamp("prev_event_time")))

display(feature_df.select("user_id", "event_time", "event_type", "seconds_since_last_event").limit(10))

user_id,event_time,event_type,seconds_since_last_event
65800726,2019-11-27T04:33:16.000Z,view,
65800726,2019-11-27T04:35:24.000Z,view,128.0
81255481,2019-11-08T07:44:45.000Z,view,
81255481,2019-11-21T14:11:26.000Z,view,1146401.0
82079354,2019-11-28T04:58:01.000Z,view,
82079354,2019-11-28T04:58:22.000Z,view,21.0
82079354,2019-11-28T04:59:29.000Z,view,67.0
82079354,2019-11-28T04:59:54.000Z,view,25.0
82079354,2019-11-28T05:00:22.000Z,view,28.0
82079354,2019-11-28T05:00:47.000Z,view,25.0
