In [0]:
### Tasks:

#1. Load full e-commerce dataset
#2. Perform complex joins
#3. Calculate running totals with window functions
#4. Create derived features

In [0]:
# Load Full E-commerce Dataset
events = spark.read.csv(
    "dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)

print(f"Total events: {events.count():,}")
events.printSchema()
display(events.limit(5))

Total events: 42,448,764
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [0]:
# Perform Complex Joins

# Create product dimension
products = events.select("product_id", "brand").dropDuplicates()

# Inner Join
inner_join_df = events.join(products, on="product_id", how="inner")

# Left Join
left_join_df = events.join(products, on="product_id", how="left")

display(inner_join_df.limit(5))
display(left_join_df.limit(5))

product_id,event_time,event_type,category_id,category_code,brand,price,user_id,user_session,brand.1
44600062,2019-10-01T00:00:00.000Z,view,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c,shiseido
3900821,2019-10-01T00:00:00.000Z,view,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc,aqua
17200506,2019-10-01T00:00:01.000Z,view,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8,
1307067,2019-10-01T00:00:01.000Z,view,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713,lenovo
1004237,2019-10-01T00:00:04.000Z,view,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d,apple


product_id,event_time,event_type,category_id,category_code,brand,price,user_id,user_session,brand.1
3900821,2019-10-01T00:00:00.000Z,view,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc,aqua
1307067,2019-10-01T00:00:01.000Z,view,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713,lenovo
17200506,2019-10-01T00:00:01.000Z,view,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8,
44600062,2019-10-01T00:00:00.000Z,view,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c,shiseido
1004237,2019-10-01T00:00:04.000Z,view,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d,apple


In [0]:
# Running Totals Using Window Functions
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Filter purchases
purchases = events.filter(F.col("event_type") == "purchase")

# Window definition
window_spec = Window.partitionBy("brand").orderBy("event_time")

# Running revenue per brand
running_totals = purchases.withColumn("running_revenue",F.sum("price").over(window_spec))

display(
    running_totals.select("event_time", "brand", "price", "running_revenue").limit(10))


event_time,brand,price,running_revenue
2019-10-01T02:19:59.000Z,,153.16,153.16
2019-10-01T02:20:28.000Z,,91.12,244.28
2019-10-01T02:21:45.000Z,,60.49,304.77
2019-10-01T02:23:03.000Z,,120.47,425.24
2019-10-01T02:26:02.000Z,,33.46,458.7
2019-10-01T02:30:49.000Z,,14.16,472.86
2019-10-01T02:34:27.000Z,,226.78,699.64
2019-10-01T02:35:33.000Z,,431.67,1131.31
2019-10-01T02:35:34.000Z,,189.71,1321.02
2019-10-01T02:36:06.000Z,,204.33,1525.35


In [0]:
# Create Derived Features

from pyspark.sql import functions as F

events_simple = events.withColumn(
    "revenue",
    F.when(F.col("event_type") == "purchase", F.col("price"))
     .otherwise(0)
)

display(events_simple.select(
    "event_type", "price", "revenue"
).limit(5))

event_type,price,revenue
view,35.79,0.0
view,33.2,0.0
view,543.1,0.0
view,251.74,0.0
view,1081.98,0.0


In [0]:
# Conversion rate by category
conversion_df = events.groupBy("category_code") \
    .pivot("event_type") \
    .count() \
    .withColumn(
        "conversion_rate",
        F.col("purchase") / F.col("view") * 100
    )
display(conversion_df.limit(5))

category_code,cart,purchase,view,conversion_rate
auto.accessories.parktronic,,46,12305,0.3738317757009346
furniture.living_room.sofa,,1084,215471,0.5030839416905292
stationery.cartrige,106.0,134,7380,1.815718157181572
sport.bicycle,693.0,838,128759,0.6508282916145668
apparel.sock,7.0,21,2621,0.8012209080503624
