### Import required libraries and functions

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

### Load the Datafiles (CSV)

In [0]:
# Load Ecommerce Sample data
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

### Prints the dataset schema

In [0]:
# check dataset schema
events.printSchema()
events.show(5)

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|53

### Ensure Correct Data Types for Column

In [0]:
# spark = SparkSession.builder.appName("ECommerceAnalytics").getOrCreate()

# Ensure numeric types and timestamp
events = events\
.withColumn("price", f.col("price").cast("double")) \
.withColumn("event_time", f.to_timestamp("event_time"))

### Extract product_name from category code and filter data

In [0]:
# Extract product_name from category_code
events = events\
.withColumn(
"product_name",
f.when(f.col("category_code").contains("."), f.split(f.col("category_code"), r"\.")[1])
.otherwise(None)
)

# Filter out rows with no product_name
events_filtered = events.filter(f.col("product_name").isNotNull())

### Deduplicate Product Information after Extraction

In [0]:
products = events_filtered\
.select("product_id","brand") \
.dropDuplicates(["product_id"]) \
.withColumnRenamed("brand", "product_brand")


### Perform Joins on Ecommerce Data

In [0]:
# Join events with product details
events_df = events_filtered.join(products, on="product_id", how="left")
display(events_df.limit(6))

product_id,event_time,event_type,category_id,category_code,brand,price,user_id,user_session,product_name,product_brand
1005159,2019-11-28T04:25:36.000Z,view,2053013555631882655,electronics.smartphone,xiaomi,196.27,577989784,076fe4e3-6470-4ee8-9818-591dfdbd4d67,smartphone,xiaomi
4900092,2019-11-09T18:15:37.000Z,view,2053013555220840837,appliances.kitchen.juicer,zelmer,32.03,550957324,85370975-0300-4192-9e69-f44e9062e8fa,kitchen,zelmer
14701857,2019-11-08T08:12:45.000Z,view,2053013557133443581,furniture.living_room.cabinet,,116.58,556645933,eb8da37b-a581-4bf2-a191-5c4acda8434d,living_room,
3800548,2019-11-20T01:53:04.000Z,view,2053013566176363511,appliances.iron,vitek,28.29,550655616,80c75b9b-5e3c-47b9-a19d-769ad318d78f,iron,vitek
28720794,2019-11-10T10:23:36.000Z,view,2053013565228450757,apparel.shoes,baratto,93.95,540514959,76e4342c-8ff0-494b-b36f-07ea85a94643,shoes,baratto
2401349,2019-11-15T19:54:53.000Z,view,2053013563743667055,appliances.kitchen.hood,elikor,169.37,523893924,b1b710fc-bd7d-44ff-aff4-c7124fd65fe6,kitchen,elikor


### Calculate Running Totals with Window Functions

In [0]:
win = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

events_with_cumsum = events_df\
.withColumn(
    "cumulative_events", f.count("*").over(win))\
.withColumn(
    "cumulative_revenue", f.sum(f.when(f.col("event_type")=="purchase", f.col("price")).otherwise(0)).over(win))

display(events_with_cumsum.limit(5))

product_id,event_time,event_type,category_id,category_code,brand,price,user_id,user_session,product_name,product_brand,cumulative_events,cumulative_revenue
4804056,2019-11-14T08:03:01.000Z,view,2053013554658804075,electronics.audio.headphone,apple,159.57,138365902,39cfa4d2-4120-4b4e-aafa-8bc1d6ac1eb5,audio,apple,1,0.0
1201216,2019-11-11T15:35:53.000Z,view,2172371436436455782,electronics.tablet,lenovo,244.25,153489867,68c119a9-824d-4fc3-98ad-f86438a8a87c,tablet,lenovo,1,0.0
1201471,2019-11-11T15:54:19.000Z,view,2172371436436455782,electronics.tablet,samsung,468.36,153489867,4d7ab95d-d685-476b-81bf-1458edbcd083,tablet,samsung,2,0.0
1201407,2019-11-11T15:54:50.000Z,view,2172371436436455782,electronics.tablet,huawei,257.12,153489867,00c7feef-1944-41d2-a74c-a35c85bdde6f,tablet,huawei,3,0.0
1201407,2019-11-13T14:38:22.000Z,view,2172371436436455782,electronics.tablet,huawei,257.12,153489867,b8f72e1e-2b88-42ea-948e-32fcfa1690f1,tablet,huawei,4,0.0


### Create Derived Features

#### Top Five Products by Revenue

In [0]:
revenue = events_with_cumsum \
    .filter(f.col("event_type") == "purchase") \
    .groupBy("product_id", "product_name") \
    .agg(f.round(f.sum("price"), 3).alias("revenue")) \
    .orderBy(f.desc("revenue")) \
    .limit(5)
display(revenue) 

product_id,product_name,revenue
1005115,smartphone,20625574.32
1005105,smartphone,11445354.69
1005135,smartphone,7086522.13
1004249,smartphone,6815294.62
1002544,smartphone,5603193.59


#### Running Total per _User_

In [0]:
win = Window.partitionBy("user_id").orderBy("event_time") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

events_with_cumsum = events_df\
.withColumn(
"cumulative_events", f.count("*").over(win)
)

display(events_with_cumsum.limit(5))


product_id,event_time,event_type,category_id,category_code,brand,price,user_id,user_session,product_name,product_brand,cumulative_events
4804056,2019-11-14T08:03:01.000Z,view,2053013554658804075,electronics.audio.headphone,apple,159.57,138365902,39cfa4d2-4120-4b4e-aafa-8bc1d6ac1eb5,audio,apple,1
1201216,2019-11-11T15:35:53.000Z,view,2172371436436455782,electronics.tablet,lenovo,244.25,153489867,68c119a9-824d-4fc3-98ad-f86438a8a87c,tablet,lenovo,1
1201471,2019-11-11T15:54:19.000Z,view,2172371436436455782,electronics.tablet,samsung,468.36,153489867,4d7ab95d-d685-476b-81bf-1458edbcd083,tablet,samsung,2
1201407,2019-11-11T15:54:50.000Z,view,2172371436436455782,electronics.tablet,huawei,257.12,153489867,00c7feef-1944-41d2-a74c-a35c85bdde6f,tablet,huawei,3
1201407,2019-11-13T14:38:22.000Z,view,2172371436436455782,electronics.tablet,huawei,257.12,153489867,b8f72e1e-2b88-42ea-948e-32fcfa1690f1,tablet,huawei,4


#### Conversion Rate by Category

In [0]:
conversion = events_df\
.groupBy("category_code") \
.pivot("event_type", ["view", "purchase"]) \
.count() \
.withColumn(
    "conversion_rate",
    f.round((f.col("purchase") / f.col("view")) * 100, 3)
    )

display(conversion)


category_code,view,purchase,conversion_rate
furniture.living_room.sofa,417428,1562.0,0.374
apparel.jumper,31269,82.0,0.262
stationery.cartrige,11943,191.0,1.599
sport.bicycle,106037,536.0,0.505
apparel.sock,3455,19.0,0.55
appliances.environment.fan,3316,32.0,0.965
kids.swing,57430,482.0,0.839
auto.accessories.anti_freeze,3397,18.0,0.53
auto.accessories.radar,47145,544.0,1.154
electronics.audio.microphone,44645,489.0,1.095
