## Load the data from Volume ecommerce_data from ecommerce DB

In [0]:
# Load data
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)

## Practice

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

In [0]:
#Top 5 Product by revenue
revenue = events.filter(F.col('event_type') == 'purchase').groupBy('product_id').agg(F.sum('price').alias(
    'revenue')).orderBy(F.desc('revenue')).limit(5)
revenue.show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|   1005115|2.0625574319999967E7|
|   1005105|1.1445354689999992E7|
|   1005135|  7086522.1299999915|
|   1004249|   6815294.620000016|
|   1002544|   5603193.590000012|
+----------+--------------------+



In [0]:
# Running total per user
window = Window.partitionBy("user_id").orderBy("event_time")
events.withColumn("cummulative_events",F.count('*').over(window))

DataFrame[event_time: timestamp, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: double, user_id: int, user_session: string, cummulative_events: bigint]

In [0]:
events.limit(5).show()

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL| creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:01|      view|   1004775|2053013555631882655|electronics.smart...|xiaomi

In [0]:
from pyspark.sql import functions as F

conversion_df = (
    events
    .groupBy("category_code")
    .pivot("event_type", ["view", "purchase"])
    .count()
    .withColumn(
        "conversion_rate",
        F.col("purchase") / F.col("view") * 100
    )
)

conversion_df.show()


+--------------------+-------+--------+-------------------+
|       category_code|   view|purchase|    conversion_rate|
+--------------------+-------+--------+-------------------+
|furniture.living_...| 417428|    1562| 0.3741962685780542|
|      apparel.jumper|  31269|      82|0.26224055774089355|
| stationery.cartrige|  11943|     191| 1.5992631667085322|
|       sport.bicycle| 106037|     536| 0.5054839348529288|
|        apparel.sock|   3455|      19| 0.5499276410998553|
|appliances.enviro...|   3316|      32| 0.9650180940892641|
|          kids.swing|  57430|     482| 0.8392826049103257|
|auto.accessories....|   3397|      18| 0.5298793052693553|
|auto.accessories....|  47145|     544| 1.1538869445328244|
|electronics.audio...|  44645|     489| 1.0953074252435884|
|  electronics.clocks|1994440|   23237| 1.1650889472734203|
|electronics.audio...|  60363|     696|  1.153024203568411|
|appliances.kitche...| 191699|    1484| 0.7741302771532454|
|appliances.kitche...| 242604|    3010| 