In [0]:
df_o= (spark.read
.option("header", "true")
.option("inferSchema", "true").csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv"))

In [0]:
df_n= (spark.read
.option("header", "true")
.option("inferSchema", "true").csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv"))

In [0]:
df_o_products = df_o.select("product_id","category_id","category_code","brand","price").dropDuplicates(["product_id"]);
df_o_products.count()

166794

In [0]:
df_o_users = df_o.select("user_id").dropDuplicates(["user_id"])
df_o_users.count()

3022290

In [0]:
df_o_events = df_o.select("event_time","event_type","product_id","user_id","user_session");

In [0]:
df_o_events.count()

42448764

In [0]:
df_o_join = df_o_events.join(df_o_products, on="product_id", how="left_outer") \
            .join(df_o_users, on="user_id", how="left_outer");
df_o_join.count()

42448764

In [0]:
df_o.orderBy("event_time","user_id").show(10)

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:04|      view|   1004237|2053013555631882655|electr

In [0]:
df_n.show(10)

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL|   creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|      lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:01|      view|   1004775|2053013555631882655|electronics.s

In [0]:
joined_df = df_o.join(df_n, on="product_id", how="inner").show(5)

+----------+-------------------+----------+-------------------+--------------------+------+-----+---------+--------------------+-------------------+----------+-------------------+--------------------+------+-----+---------+--------------------+
|product_id|         event_time|event_type|        category_id|       category_code| brand|price|  user_id|        user_session|         event_time|event_type|        category_id|       category_code| brand|price|  user_id|        user_session|
+----------+-------------------+----------+-------------------+--------------------+------+-----+---------+--------------------+-------------------+----------+-------------------+--------------------+------+-----+---------+--------------------+
|   1005159|2019-10-31 23:59:27|      view|2053013555631882655|electronics.smart...|xiaomi|212.1|566143627|aa610ab3-5c60-455...|2019-11-01 00:13:45|      view|2053013555631882655|electronics.smart...|xiaomi|212.1|564280011|78fcfe93-d480-4b5...|
|   1005159|2019-10-

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = Window.partitionBy("user_id").orderBy("event_time")
df_o.withColumn("cumulative_events", F.count("*").over(window)).show(20)

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+-----------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|cumulative_events|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+-----------------+
|2019-10-09 10:30:19|      view|  17301541|2053013553853497655|                NULL|    NULL|162.17|205053188|e1eadbc6-aef5-4cf...|                1|
|2019-10-09 10:30:44|      view|  17301541|2053013553853497655|                NULL|    NULL|162.17|205053188|e1eadbc6-aef5-4cf...|                2|
|2019-10-07 06:23:01|      view|  16200119|2053013556344914381|   kids.fmcg.diapers|   moony| 18.47|222907508|cb653adc-46a2-4d9...|                1|
|2019-10-07 06:26:23|      view|  16200162|2053013556344914381|   kids.fmcg.diapers|   moony| 18.47|

In [0]:
revenue = df_o.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id", "product_name") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)