In [1]:
import findspark
findspark.init()
findspark.find()

'/Users/calvinyu/miniforge3/envs/spark/lib/python3.11/site-packages/pyspark'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, split, udf, lit, sum, avg, max, count
import matplotlib.pyplot as plt
import plotly.express as px
import functools
from pyspark.sql import DataFrame
import pandas as pd

In [3]:
import plotly.io as pio
pio.renderers.default = "iframe"

In [4]:
spark = SparkSession.builder.appName('test').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/03 19:46:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
mapping = {10: "October", 11: "November"}
def process_dfs(_spark, _file_names):
    dfs = []
    for file_name in _file_names:
        df = _spark.read.option("header",True).csv(file_name)
        df = df.withColumn("price", df.price.cast('float'))
        dt_col = split(df["event_time"], " ")
        df = df.withColumn("date", dt_col.getItem(0))
        df = df.withColumn("time", dt_col.getItem(1))
        date_col = split(df["date"], "-")
        df = df.withColumn("year", date_col.getItem(0))
        df = df.withColumn("year", df.year.cast('int'))
        df = df.withColumn("month", date_col.getItem(1))
        df = df.withColumn("month", df.month.cast('int'))
        df = df.withColumn("day", date_col.getItem(2))
        df = df.withColumn("day", df.day.cast('int'))

        df = df.withColumn("month", udf(lambda x : mapping[x])("month"))
        df = df.drop("event_time")
        # df = df.drop("date")
        dfs.append(df)

    df_output = functools.reduce(DataFrame.union, dfs)
    return df_output

df = process_dfs(spark, ["data/2019-Nov.csv", "data/2019-Oct.csv"])

                                                                                

In [130]:
df.count()

                                                                                

109950743

In [131]:
df[["brand"]].distinct().count()

                                                                                

4304

In [129]:
df.show(5, truncate=False)

+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+--------+----+--------+---+
|event_type|product_id|category_id        |category_code            |brand |price |user_id  |user_session                        |time    |year|month   |day|
+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+--------+----+--------+---+
|view      |1003461   |2053013555631882655|electronics.smartphone   |xiaomi|489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|00:00:00|2019|November|1  |
|view      |5000088   |2053013566100866035|appliances.sewing_machine|janome|293.65|530496790|8e5f4f83-366c-4f70-860e-ca7417414283|00:00:00|2019|November|1  |
|view      |17302664  |2053013553853497655|null                     |creed |28.31 |561587266|755422e7-9040-477b-9bd2-6a6e8fd97387|00:00:01|2019|November|1  |
|view      |3601530   |2053013563810775923|appliance

In [12]:
nov_df = spark.read.option("header",True).csv("data/2019-Nov.csv")

In [51]:
nov_df = nov_df.withColumn("price", nov_df.price.cast('float'))

In [13]:
oct_df = spark.read.option("header",True).csv("data/2019-Oct.csv")

In [7]:
nov_df.count()

                                                                                

67501980

In [8]:
oct_df.count()

                                                                                

42448765

In [14]:
nov_df.show(5, truncate = False)

+-----------------------+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+
|event_time             |event_type|product_id|category_id        |category_code            |brand |price |user_id  |user_session                        |
+-----------------------+----------+----------+-------------------+-------------------------+------+------+---------+------------------------------------+
|2019-11-01 00:00:00 UTC|view      |1003461   |2053013555631882655|electronics.smartphone   |xiaomi|489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|
|2019-11-01 00:00:00 UTC|view      |5000088   |2053013566100866035|appliances.sewing_machine|janome|293.65|530496790|8e5f4f83-366c-4f70-860e-ca7417414283|
|2019-11-01 00:00:01 UTC|view      |17302664  |2053013553853497655|null                     |creed |28.31 |561587266|755422e7-9040-477b-9bd2-6a6e8fd97387|
|2019-11-01 00:00:01 UTC|view      |3601530   |2053013563810775923|app

In [15]:
nov_df.columns

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'user_session']

### Aggregations for Brands

In [16]:
agg_by_brand = df.filter(df.event_type == "purchase") \
      .groupby("brand") \
      .agg(count("*").alias("count"), \
         sum("price").alias("revenue")) \
      .orderBy("count", ascending=False).toPandas()

                                                                                

In [None]:
print(len(agg_by_brand))

In [18]:
agg_by_brand.head(5)

Unnamed: 0,brand,count,revenue
0,samsung,372923,101277400.0
1,apple,308937,238721800.0
2,,131487,19566320.0
3,xiaomi,124908,20453900.0
4,huawei,47204,9664104.0


In [19]:
agg_by_brand.to_csv("agg_by_brand.csv", index=False)

### Popular Brands

In [7]:
top_categories = ['samsung', 'apple', 'xiaomi', 'huawei', 'cordiant', 'lucente', 'oppo', 'lg', 'sony', 'artel', 'bosch', 'acer', 'lenovo', 'nokian']
df = df.withColumn("adjusted_brand", udf(lambda x : x if x in top_categories else "other")("brand"))

brand_counts = df.filter(df.event_type == "purchase") \
      .groupby("adjusted_brand") \
      .count()

In [123]:
brand_counts = brand_counts.toPandas()

                                                                                

In [124]:
brand_counts

Unnamed: 0,adjusted_brand,count
0,cordiant,27534
1,apple,308937
2,other,623127
3,oppo,25971
4,sony,17038
5,lg,21606
6,acer,13284
7,artel,15391
8,samsung,372923
9,lucente,26137


In [129]:
brand_counts.to_csv("brand_counts.csv", index=False)

In [112]:
temp_counts = brand_counts.iloc[:30]

In [128]:
fig = px.pie(brand_counts, values='count', names='adjusted_brand', hole=0.3)
fig.show()

In [None]:
### Dai

In [21]:
temp_table = df.createOrReplaceTempView("temp_table")

In [22]:
highest_thru_nov = spark.sql("WITH CTE AS (SELECT month, day, brand, COUNT(brand) as count FROM temp_table WHERE event_type = 'purchase' GROUP BY month, day, brand ORDER BY count DESC), \
                            CTE_2 AS (SELECT month, day, MAX(count) as max_count FROM CTE WHERE month = 'November' GROUP BY month, day ORDER BY day) \
                            SELECT c1.month, c1.day, c1.brand, max_count FROM CTE c1 JOIN CTE_2 c2 ON c1.month = c2.month and c1.day = c2.day and c1.count = c2.max_count ORDER BY c1.day").toPandas()

                                                                                

In [23]:
highest_thru_oct = spark.sql("WITH CTE AS (SELECT month, day, brand, COUNT(brand) as count FROM temp_table WHERE event_type = 'purchase' GROUP BY month, day, brand ORDER BY count DESC), \
                            CTE_2 AS (SELECT month, day, MAX(count) as max_count FROM CTE WHERE month = 'October' GROUP BY month, day ORDER BY day) \
                            SELECT c1.month, c1.day, c1.brand, max_count FROM CTE c1 JOIN CTE_2 c2 ON c1.month = c2.month and c1.day = c2.day and c1.count = c2.max_count ORDER BY c1.day").toPandas()

                                                                                

In [37]:
highest_thru_nov_rev = spark.sql("WITH CTE AS (SELECT month, day, brand, SUM(price) as revenue FROM temp_table WHERE event_type = 'purchase' GROUP BY month, day, brand ORDER BY revenue DESC), \
                            CTE_2 AS (SELECT month, day, MAX(revenue) as max_revenue FROM CTE WHERE month = 'November' GROUP BY month, day ORDER BY day) \
                            SELECT c1.month, c1.day, c1.brand, max_revenue FROM CTE c1 JOIN CTE_2 c2 ON c1.month = c2.month and c1.day = c2.day and c1.revenue = c2.max_revenue ORDER BY c1.day").toPandas()

                                                                                

In [38]:
highest_thru_oct_rev = spark.sql("WITH CTE AS (SELECT month, day, brand, SUM(price) as revenue FROM temp_table WHERE event_type = 'purchase' GROUP BY month, day, brand ORDER BY revenue DESC), \
                            CTE_2 AS (SELECT month, day, MAX(revenue) as max_revenue FROM CTE WHERE month = 'October' GROUP BY month, day ORDER BY day) \
                            SELECT c1.month, c1.day, c1.brand, max_revenue FROM CTE c1 JOIN CTE_2 c2 ON c1.month = c2.month and c1.day = c2.day and c1.revenue = c2.max_revenue ORDER BY c1.day").toPandas()

                                                                                

In [105]:
merged = pd.merge(highest_thru_oct, highest_thru_nov, how='outer', on='day')
merged = pd.merge(merged, highest_thru_nov_rev, how='outer', on='day')[["day", "brand_x", "brand_y", "max_count_x", "max_count_y", "max_revenue"]].rename({"max_revenue":"max_revenue_x"})
merged = pd.merge(merged, highest_thru_oct_rev, how='outer', on='day').rename({"brand_x":"oct_brand", "brand_y":"nov_brand", "max_count_x":"oct_count", "max_count_y":"nov_count","max_revenue_x":"oct_rev", "max_revenue_y":"nov_rev"}, axis=1)

In [107]:
merged.to_csv("day_stats.csv", index=False)

In [106]:
merged.head(5)

Unnamed: 0,day,oct_brand,nov_brand,oct_count,nov_count,oct_rev,month,brand,nov_rev
0,1,samsung,samsung,4427,5291.0,3454626.0,October,apple,3175375.0
1,2,samsung,samsung,4618,5179.0,2979444.0,October,apple,3092671.0
2,3,samsung,samsung,4590,5332.0,3199490.0,October,apple,3081512.0
3,4,samsung,apple,6368,5548.0,4186447.0,October,apple,4165188.0
4,5,samsung,samsung,5323,5125.0,3643563.0,October,apple,3488695.0


In [103]:
fig = px.bar(merged, y="day", x=["oct_count", "nov_count"], orientation='h')
fig.update_layout(width=750, height=750,
legend=dict(
        x=0.8,
        y=0.8,
        traceorder="normal",
        font=dict(
            family="sans-serif",
            size=12,
            color="black"
        ),
    ))

In [104]:
fig = px.bar(merged, y="day", x=["oct_rev", "nov_rev"], orientation='h')
fig.update_layout(xaxis=dict(autorange="reversed"), width=750, height=750,
legend=dict(
        x=0.1,
        y=0.8,
        traceorder="normal",
        font=dict(
            family="sans-serif",
            size=12,
            color="black"
        ),
    ))
fig.update_yaxes(visible=False)


In [8]:
temp_table = df.filter(df["brand"].isin(top_categories[:5])).limit(3000000).createOrReplaceTempView("small_table")

In [8]:
view_to_purchase = spark.sql(
    "WITH CTE AS (SELECT \
    v.brand, \
    v.product_id,\
    v.user_session,\
    v.time AS view_time,\
    p.time AS purchase_time,\
    TIMESTAMPDIFF(SECOND, v.time, p.time) AS time_difference FROM small_table v JOIN \
    small_table p ON v.product_id = p.product_id\
          AND v.user_session = p.user_session\
          AND v.event_type = 'view'\
          AND p.event_type = 'purchase'\
          AND v.time < p.time) \
    SELECT brand, AVG(time_difference) as avg_vtp_time, COUNT(*) as num_purchases FROM CTE GROUP BY brand HAVING num_purchases > 100 ORDER BY avg_vtp_time ASC LIMIT 30")


In [140]:
view_to_purchase.show(20)

[Stage 134:>                                                        (0 + 1) / 1]

+--------+------------------+-------------+
|   brand|      avg_vtp_time|num_purchases|
+--------+------------------+-------------+
|   apple|249.96059957173446|        14010|
|  huawei| 317.2927802153969|         2507|
| samsung|  328.220799664183|        19058|
|cordiant| 388.9107573564882|         2073|
|  xiaomi|401.90677091016374|         6779|
+--------+------------------+-------------+



                                                                                

In [9]:
vtp_df = view_to_purchase.toPandas()

23/06/03 19:18:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [11]:
vtp_df.to_csv("vtp.csv", index=False)

In [9]:
view_to_cart = spark.sql(
    "WITH CTE AS (SELECT \
    v.brand, \
    v.product_id,\
    v.user_session,\
    v.time AS view_time,\
    p.time AS cart_time,\
    TIMESTAMPDIFF(SECOND, v.time, p.time) AS time_difference FROM small_table v JOIN \
    small_table p ON v.product_id = p.product_id\
          AND v.user_session = p.user_session\
          AND v.event_type = 'view'\
          AND p.event_type = 'cart'\
          AND v.time < p.time) \
    SELECT brand, AVG(time_difference) as avg_vtc_time, COUNT(*) as num_carts FROM CTE GROUP BY brand HAVING num_carts > 100 ORDER BY avg_vtc_time ASC LIMIT 30")


In [10]:
vtc_df = view_to_cart.toPandas()

23/06/03 19:49:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [11]:
len(vtc_df)

5

In [12]:
print(vtc_df)

      brand  avg_vtc_time  num_carts
0     apple    215.145040      49552
1  cordiant    224.126629       4375
2   samsung    250.490482      66665
3    huawei    278.584385      10541
4    xiaomi    463.716211      28641


In [13]:
vtp_df = pd.read_csv("vtp.csv")

In [14]:
v_times = vtp_df.merge(vtc_df, on="brand")

In [15]:
v_times

Unnamed: 0,brand,avg_vtp_time,num_purchases,avg_vtc_time,num_carts
0,apple,243.120771,43032,215.14504,49552
1,samsung,315.013421,52903,250.490482,66665
2,huawei,321.108403,6974,278.584385,10541
3,xiaomi,367.188554,19745,463.716211,28641
4,cordiant,380.54293,9364,224.126629,4375


In [16]:
v_times.to_csv("v_times.csv", index=False)

In [82]:
weekly_purchases = spark.sql(
"SELECT product_id, COUNT(*) as purchase_count,\
    CASE WHEN day BETWEEN 1 AND 7 THEN 1 \
         WHEN day BETWEEN 8 AND 15 THEN 2\
         WHEN day BETWEEN 16 AND 23 THEN 3\
         ELSE 4 END as purchase_week \
    FROM small_table\
    WHERE event_type = 'purchase'\
    GROUP BY product_id, purchase_week\
    ORDER BY product_id, purchase_count DESC")

In [80]:
weekly_purchases.show(20)



+----------+--------------+-------------+
|product_id|purchase_count|purchase_week|
+----------+--------------+-------------+
|   1002524|             4|            1|
|   1002528|             1|            1|
|   1002532|             3|            1|
|   1002536|             1|            1|
|   1002540|             1|            1|
|   1002544|            24|            1|
|   1002547|             2|            1|
|   1002629|             6|            1|
|   1002633|             8|            1|
|   1003304|             4|            1|
|   1003306|             7|            1|
|   1003309|             1|            1|
|   1003316|             2|            1|
|   1003317|             6|            1|
|   1003441|             1|            1|
|   1003533|             1|            1|
|   1003535|             1|            1|
|   1003772|             2|            1|
|   1003894|             1|            1|
|   1003983|             1|            1|
+----------+--------------+-------

                                                                                

### Sankey Test

In [141]:
temp_spark = df[["category_code"]].filter(df.event_type == "purchase")
temp_spark = temp_spark.na.fill("unknown")

In [142]:
temp_spark = temp_spark.withColumn("test", udf(lambda x : x.replace(".", "-") if not None else "empty")("category_code"))
cat_col = split(temp_spark["test"], "-")

temp_spark = temp_spark.withColumn("main_category", cat_col.getItem(0))
temp_spark = temp_spark.withColumn("subcategory_1", cat_col.getItem(1))
temp_spark = temp_spark.withColumn("subcategory_2", cat_col.getItem(2))
# temp_spark = temp_spark.withColumn("subcategory_3", cat_col.getItem(3))

temp_spark = temp_spark.drop("category_code")
temp_spark = temp_spark.drop("test")

In [143]:
grouped_spark = temp_spark.groupby("main_category", "subcategory_1").count().orderBy("count", ascending=False)
grouped_spark[["main_category"]].distinct().count()

                                                                                

14

In [None]:
top_categories = list(temp_spark.groupby("main_category").count().orderBy("count", ascending=False).limit(15).toPandas()["main_category"])

In [25]:
grouped_spark = temp_spark.filter(temp_spark["main_category"].isin(top_categories)).groupby("main_category", "subcategory_1").count().orderBy("count", ascending=False)
grouped_spark = grouped_spark.na.drop("any")

In [13]:
grouped_spark.show()



+-------------+-------------+-------+
|main_category|subcategory_1|  count|
+-------------+-------------+-------+
|  accessories|     umbrella|   8912|
|  accessories|       wallet| 113986|
|  accessories|          bag| 505378|
|      apparel|       jacket|    437|
|      apparel|       shorts|   1284|
|      apparel|        glove|   3824|
|      apparel|        scarf|   6141|
|      apparel|         sock|   6224|
|      apparel|         belt|   7167|
|      apparel|        skirt|   9866|
|      apparel|       jumper|  34339|
|      apparel|       tshirt|  40522|
|      apparel|     trousers|  41422|
|      apparel|        shirt|  64609|
|      apparel|    underwear|  92183|
|      apparel|        jeans| 109707|
|      apparel|        dress| 133886|
|      apparel|      costume| 380291|
|      apparel|        shoes|3622123|
|   appliances|ironing_board|  99103|
+-------------+-------------+-------+
only showing top 20 rows



                                                                                

In [26]:
grouped_spark_2 = temp_spark.groupby("main_category", "subcategory_1", "subcategory_2").count()
grouped_spark_2 = grouped_spark_2.na.drop("any")

In [365]:
grouped_spark_2.show()



+-------------+-------------+---------------+-----+
|main_category|subcategory_1|  subcategory_2|count|
+-------------+-------------+---------------+-----+
|      apparel|        shoes|           keds|    4|
|      apparel|        shoes|      moccasins|    1|
|   appliances|      kitchen|         washer|   12|
|   appliances|      kitchen|  refrigerators|   22|
|   appliances|      kitchen|     dishwasher|   10|
|   appliances|  environment|         vacuum|    4|
|   appliances|  environment|   water_heater|   11|
|   appliances|     personal|       massager|   19|
|   appliances|      kitchen|           hood|    3|
|   appliances|  environment|air_conditioner|    1|
|   appliances|      kitchen|        blender|    2|
|         auto|  accessories|     compressor|    2|
|         auto|  accessories|         player|   12|
|         auto|  accessories|  videoregister|    1|
|    computers|   components|    motherboard|   13|
|    computers|  peripherals|        printer|    2|
|    compute

                                                                                

In [27]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="main_category", outputCol="main_cat_index") 
indexed_model_1 = indexer.fit(grouped_spark)
indexed_spark = indexed_model_1.transform(grouped_spark) 

indexer = StringIndexer(inputCol="subcategory_1", outputCol="subcat_1_index") 
indexed_model_2 = indexer.fit(indexed_spark)
indexed_spark = indexed_model_2.transform(indexed_spark) 

                                                                                

In [28]:
max_val = indexed_spark.agg({"main_cat_index": "max"}).collect()[0][0]
indexed_spark = indexed_spark.withColumn("subcat_1_index_adjusted", indexed_spark["subcat_1_index"] + max_val + 1)

                                                                                

In [None]:
indexed_spark_2 = indexed_model_2.transform(grouped_spark_2)
indexer = StringIndexer(inputCol="subcategory_2", outputCol="subcat_2_index") 
indexed_model_3 = indexer.fit(indexed_spark_2)
indexed_spark_2 = indexed_model_3.transform(indexed_spark_2) 

In [354]:
indexed_spark_2 = indexed_spark_2.withColumn("subcat_1_index_adjusted", indexed_spark_2["subcat_1_index"] + max_val + 1)
max_val_2 = indexed_spark_2.agg({"subcat_1_index_adjusted": "max"}).collect()[0][0]
indexed_spark_2 = indexed_spark_2.withColumn("subcat_2_index_adjusted", indexed_spark_2["subcat_2_index"] + max_val_2 + 1)

                                                                                

In [29]:
indexed_spark.show(1000)

                                                                                

+-------------+--------------+------+--------------+--------------+-----------------------+
|main_category| subcategory_1| count|main_cat_index|subcat_1_index|subcat_1_index_adjusted|
+-------------+--------------+------+--------------+--------------+-----------------------+
|  electronics|    smartphone|720665|           1.0|          42.0|                   55.0|
|   appliances|       kitchen|109465|           2.0|           1.0|                   14.0|
|  electronics|         audio| 81894|           1.0|           4.0|                   17.0|
|  electronics|         video| 52020|           1.0|          56.0|                   69.0|
|   appliances|   environment| 44049|           2.0|          20.0|                   33.0|
|  electronics|        clocks| 41143|           1.0|          13.0|                   26.0|
|    computers|      notebook| 34023|           4.0|          31.0|                   44.0|
|         auto|   accessories| 21339|          10.0|           3.0|             

In [None]:
indexed_spark_2.show(1000)

In [30]:
sankey_df = indexed_spark.limit(1000).toPandas()

                                                                                

In [327]:
sankey_df_2 = indexed_spark_2.limit(1000).toPandas()

                                                                                

In [38]:
labels = indexed_model_1.labels + indexed_model_2.labels

In [352]:
labels

['kitchen',
 'accessories',
 'audio',
 'bag',
 'bathroom',
 'bedroom',
 'bicycle',
 'camera',
 'carriage',
 'clocks',
 'components',
 'costume',
 'desktop',
 'dolls',
 'environment',
 'jeans',
 'living_room',
 'notebook',
 'peripherals',
 'personal',
 'sewing_machine',
 'shirt',
 'shoes',
 'smartphone',
 'tablet',
 'telephone',
 'tools',
 'toys',
 'trousers',
 'tshirt',
 'underwear',
 'video',
 'air_conditioner',
 'bath',
 'bed',
 'blender',
 'cabinet',
 'chair',
 'compressor',
 'cpu',
 'dishwasher',
 'drill',
 'hdd',
 'headphone',
 'hood',
 'keds',
 'keyboard',
 'light',
 'massager',
 'memory',
 'moccasins',
 'motherboard',
 'music_tools',
 'painting',
 'player',
 'printer',
 'refrigerators',
 'saw',
 'sofa',
 'subwoofer',
 'table',
 'tv',
 'vacuum',
 'video',
 'videocards',
 'videoregister',
 'washer',
 'water_heater',
 'welding']

In [196]:
print(sankey_df["main_cat_index"])

0    1.0
1    2.0
2    2.0
3    3.0
4    4.0
5    4.0
6    1.0
7    2.0
8    1.0
9    1.0
Name: main_cat_index, dtype: float64


In [34]:
import plotly.graph_objects as go

source = sankey_df["main_cat_index"] 
target = sankey_df["subcat_1_index_adjusted"]
value = sankey_df["count"]
# data to dict, dict to sankey
link = dict(source = source, target = target, value = value)
node = dict(label = labels, pad=50, thickness=5)
data = go.Sankey(link = link, node = node)
# plot
fig = go.Figure(data)
fig.update_layout(
    width=1000,
    height=1000
)

fig.show()

In [36]:
sankey_df.to_csv("sankey.csv", index=False)

In [40]:
import pickle

with open("sankey_labels.txt", "wb") as f:
    pickle.dump(labels, f)