In [0]:
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

In [0]:
events.show(10)

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL|   creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|      lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:01|      view|   1004775|2053013555631882655|electronics.s

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Top 5 products by revenue
revenue = events.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id", "brand") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)

revenue.show(10)    

+----------+-----+--------------------+
|product_id|brand|             revenue|
+----------+-----+--------------------+
|   1005115|apple|2.0625574319999937E7|
|   1005105|apple|1.1445354690000003E7|
|   1005135|apple|   7086522.129999991|
|   1004249|apple|          6815294.62|
|   1002544|apple|   5603193.590000003|
+----------+-----+--------------------+



In [0]:
# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events.withColumn("cumulative_events", F.count("*").over(window))

DataFrame[event_time: timestamp, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: double, user_id: int, user_session: string, cumulative_events: bigint]

In [0]:
# Conversion rate by category
from pyspark.sql import functions as F

# Aggregate views and purchases per category
views = events.filter(F.col("event_type") == "view") \
    .groupBy("category_code") \
    .count() \
    .withColumnRenamed("count", "view_count")
purchases = events.filter(F.col("event_type") == "purchase") \
    .groupBy("category_code") \
    .count() \
    .withColumnRenamed("count", "purchase_count")

# Join and calculate conversion rate
conversion = views.join(purchases, "category_code", "left") \
    .withColumn("conversion_rate", (F.col("purchase_count")/F.col("view_count")*100))
conversion.show(10)

+--------------------+----------+--------------+------------------+
|       category_code|view_count|purchase_count|   conversion_rate|
+--------------------+----------+--------------+------------------+
| stationery.cartrige|     11943|           191|1.5992631667085322|
|electronics.video.tv|   2071305|         30274| 1.461590639717473|
|  accessories.wallet|     68909|           366|0.5311352653499543|
|appliances.kitche...|     71894|           733|1.0195565693938298|
|                NULL|  20837460|          NULL|              NULL|
|construction.tool...|    141960|          1119|0.7882502113271344|
|appliances.enviro...|    269283|          3583|1.3305704407630634|
|country_yard.furn...|      1567|             4|0.2552648372686663|
|       apparel.shoes|   1836676|         10140|0.5520843088274687|
|electronics.audio...|     44645|           489|1.0953074252435884|
+--------------------+----------+--------------+------------------+
only showing top 10 rows
