In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("experiments") \
    .getOrCreate()

## Load data (sample)

In [2]:
from pathlib import Path
path = Path("../data")

In [3]:
items_paths = []
for p in path.glob("items/**/*.json"):
    if not p.stem.endswith("_available"):
        items_paths.append(str(p))

len(items_paths)

43

In [4]:
items = spark.read.json(items_paths)

In [5]:
items.count()

14200

In [6]:
items.show()

+--------------------+--------+--------------------+--------------------+-----+-----------+
|                desc|discount|                  id|                name|price|       type|
+--------------------+--------+--------------------+--------------------+-----+-----------+
|Young development...|    61.0|f667cd19-b954-4de...|              Degree| 1.03|       game|
|Son age indicate ...|     0.0|2f217373-be7c-49c...|Better security r...| 0.05|     office|
|Practice although...|    11.0|00970c97-f2a2-4b6...|              Itself|14.36|    fitness|
|American his meet...|     0.0|c8cecdc1-7a96-465...|Result young qual...| 0.16|    fitness|
|Operation still c...|    55.0|de3da53f-0527-4a8...|  Agree many compare| 2.76|       game|
|Almost guy home b...|    12.0|391f21a5-4c36-417...|Whole responsibil...| 0.83|    fitness|
|Research which ba...|     5.0|7cbd51cf-2117-432...|               Owner| 0.51|    fitness|
|Sound recognize c...|    73.0|2ed44006-6f39-4ea...|              Social| 7.66| 

In [7]:
user_actions_paths = [str(p) for p in path.glob("user_actions/**/*.json")]

len(user_actions_paths)

13

In [8]:
user_actions = spark.read.json(user_actions_paths)

In [9]:
user_actions.count()

195128

In [10]:
user_actions.show()

+--------------------+----------------+-------------------+--------------------+-----------+--------------------+
|       action_result|     action_type|         event_time|          session_id|status_code|             user_id|
+--------------------+----------------+-------------------+--------------------+-----------+--------------------+
|User a2f3af6d-419...|          log_in|2022-02-15 21:37:19|e415c2f4-bc65-490...|        200|a2f3af6d-4198-4f3...|
|User a2f3af6d-419...|      open_store|2022-02-15 21:51:00|e415c2f4-bc65-490...|        200|a2f3af6d-4198-4f3...|
|                  []|       view_cart|2022-02-15 21:54:06|e415c2f4-bc65-490...|        204|a2f3af6d-4198-4f3...|
|      Cart is empty.|remove_from_cart|2022-02-15 22:03:11|e415c2f4-bc65-490...|        405|a2f3af6d-4198-4f3...|
|User a2f3af6d-419...|      open_store|2022-02-15 22:10:19|e415c2f4-bc65-490...|        100|a2f3af6d-4198-4f3...|
|Item ea362642-a33...|     search_item|2022-02-15 22:14:39|e415c2f4-bc65-490...|        

## Create Purchase dataset

In [11]:
from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy("session_id").orderBy("event_time")
condition = (F.lead("action_type", 1).over(w) == "pay") & (F.lead("status_code", 1).over(w) == 200)

In [12]:
successful_payments = user_actions \
    .where(F.col("action_type").isin("view_cart", "pay")) \
    .withColumn("payment_time", F.when(condition, F.lead("event_time", 1).over(w))) \
    .where(F.col("payment_time").isNotNull())

In [13]:
successful_payments.show()

+--------------------+-----------+-------------------+--------------------+-----------+--------------------+-------------------+
|       action_result|action_type|         event_time|          session_id|status_code|             user_id|       payment_time|
+--------------------+-----------+-------------------+--------------------+-----------+--------------------+-------------------+
|['dfcad75d-df4c-4...|  view_cart|2022-02-15 21:35:19|0015dca6-e825-49d...|        200|1330243f-19da-410...|2022-02-15 21:41:23|
|['5d2629ec-ff58-4...|  view_cart|2022-02-15 21:33:43|0027de64-9141-468...|        200|833a53b1-7882-4ad...|2022-02-15 21:38:51|
|['17eb311a-e0be-4...|  view_cart|2022-02-16 12:19:56|005c1d09-82f4-4b0...|        200|1ed21a96-93cd-413...|2022-02-16 12:27:07|
|['1306934e-0290-4...|  view_cart|2022-02-16 04:43:41|00623a00-ff6a-40a...|        200|33e8b30d-7032-4cf...|2022-02-16 04:57:28|
|['2b5d6816-368f-4...|  view_cart|2022-02-15 21:49:35|00665a96-ef2b-4d5...|        200|baad42cd-5

In [14]:
total_price = items \
    .select(F.expr("price - (price * discount / 100)") \
            .alias("without_round"),
            F.round(F.expr("price - (price * discount / 100)"), 2) \
            .alias("with_round")
           ) \
    .agg({"without_round": "sum", "with_round": "sum"}) \
    .collect()[0]

# Difference between the rounded and non-rounded total prices
total_price[0] - total_price[1]

2.788700000201061

In [15]:
items_prices = items \
    .select(F.col("id").alias("product_id"), "price",
            F.round(F.expr("price - (price * discount / 100)"), 2) \
            .alias("discount_price")
           )

In [16]:
items_prices.show()

+--------------------+-----+--------------+
|          product_id|price|discount_price|
+--------------------+-----+--------------+
|f667cd19-b954-4de...| 1.03|           0.4|
|2f217373-be7c-49c...| 0.05|          0.05|
|00970c97-f2a2-4b6...|14.36|         12.78|
|c8cecdc1-7a96-465...| 0.16|          0.16|
|de3da53f-0527-4a8...| 2.76|          1.24|
|391f21a5-4c36-417...| 0.83|          0.73|
|7cbd51cf-2117-432...| 0.51|          0.48|
|2ed44006-6f39-4ea...| 7.66|          2.07|
|4464ddbc-60a2-42a...|11.55|         11.55|
|aeeb36f9-0df5-492...|15.72|         15.72|
|a03e7bf8-ab80-4b9...| 7.04|          7.04|
|34511cfd-46f0-403...|  0.2|          0.18|
|17e01ef8-3a43-497...|26.04|         23.18|
|6c663615-2ffe-4b5...| 0.14|          0.09|
|df383eb3-280b-4c4...| 7.39|          5.91|
|d3f7553b-5161-4fb...| 6.73|          6.46|
|968d8bff-7c89-4c8...| 0.73|          0.72|
|e5c679b0-40df-460...|12.28|         12.28|
|30f3ca15-fa18-4d8...| 1.78|          1.76|
|205536af-4c0b-407...| 0.36|    

In [17]:
purchases_tmp = successful_payments \
    .select(F.col("session_id").alias("purchase_id"),
            F.col("payment_time").alias("event_time"),
            "user_id",
            F.from_json("action_result", "ARRAY<STRING>").alias("purchased_items")
           )

purchases_tmp.show(3)

+--------------------+-------------------+--------------------+--------------------+
|         purchase_id|         event_time|             user_id|     purchased_items|
+--------------------+-------------------+--------------------+--------------------+
|0015dca6-e825-49d...|2022-02-15 21:41:23|1330243f-19da-410...|[dfcad75d-df4c-41...|
|0027de64-9141-468...|2022-02-15 21:38:51|833a53b1-7882-4ad...|[5d2629ec-ff58-48...|
|005c1d09-82f4-4b0...|2022-02-16 12:27:07|1ed21a96-93cd-413...|[17eb311a-e0be-44...|
+--------------------+-------------------+--------------------+--------------------+
only showing top 3 rows



In [18]:
# TODO: .isNotNull() temporary; check whether user_id has a discount 

purchases_cost = purchases_tmp \
    .select("purchase_id", "user_id", F.explode("purchased_items").alias("product_id")) \
    .join(items_prices, on="product_id") \
    .groupBy("purchase_id", "user_id") \
    .agg(
        F.round(F.when(F.col("user_id").isNotNull(), F.sum("price")) \
                .otherwise(F.sum("discount_price")), 2)
        .alias("amount_paid")
    )

In [19]:
purchases_cost.show()

+--------------------+--------------------+-----------+
|         purchase_id|             user_id|amount_paid|
+--------------------+--------------------+-----------+
|0015dca6-e825-49d...|1330243f-19da-410...|       8.07|
|0027de64-9141-468...|833a53b1-7882-4ad...|      21.84|
|005c1d09-82f4-4b0...|1ed21a96-93cd-413...|       0.99|
|00623a00-ff6a-40a...|33e8b30d-7032-4cf...|       2.49|
|00665a96-ef2b-4d5...|baad42cd-523b-4c8...|       4.14|
|0078712e-ad26-497...|cd25d012-15ba-420...|       5.11|
|009de783-d808-4e6...|3e9ebfd9-31a6-48d...|      11.62|
|00aed027-7d56-4c1...|3acc1f03-5b14-46f...|      24.91|
|00d6581f-399f-4e1...|5493f403-bbc8-449...|       1.91|
|00e6a40c-c561-49d...|ab172e8c-452d-420...|       3.39|
|00ee42b9-9103-41d...|7937ae45-d741-4da...|       3.35|
|00efe752-c269-40e...|f031c94b-b66e-4d0...|       1.41|
|011efb28-3380-488...|615e0436-9626-43d...|       3.94|
|01203c0d-8073-4ca...|304e9bc4-a4cc-432...|      43.59|
|01274da7-9e31-43c...|536c19f5-2b41-4e0...|     

In [20]:
purchases = purchases_tmp \
    .join(purchases_cost, on="purchase_id") \
    .drop(purchases_cost.user_id)

In [21]:
purchases.show()

+--------------------+-------------------+--------------------+--------------------+-----------+
|         purchase_id|         event_time|     purchased_items|             user_id|amount_paid|
+--------------------+-------------------+--------------------+--------------------+-----------+
|0015dca6-e825-49d...|2022-02-15 21:41:23|[dfcad75d-df4c-41...|1330243f-19da-410...|       8.07|
|0027de64-9141-468...|2022-02-15 21:38:51|[5d2629ec-ff58-48...|833a53b1-7882-4ad...|      21.84|
|005c1d09-82f4-4b0...|2022-02-16 12:27:07|[17eb311a-e0be-44...|1ed21a96-93cd-413...|       0.99|
|00623a00-ff6a-40a...|2022-02-16 04:57:28|[1306934e-0290-40...|33e8b30d-7032-4cf...|       2.49|
|00665a96-ef2b-4d5...|2022-02-15 22:01:25|[2b5d6816-368f-41...|baad42cd-523b-4c8...|       4.14|
|0078712e-ad26-497...|2022-02-16 13:31:46|[12f9deac-3260-45...|cd25d012-15ba-420...|       5.11|
|009de783-d808-4e6...|2022-02-15 23:30:33|[095834a7-3e9f-41...|3e9ebfd9-31a6-48d...|      11.62|
|00aed027-7d56-4c1...|2022-02-

## Aggregations

### Delete
Return 500 "worst" items that should be deleted

In [22]:
sold_items = purchases \
    .select(F.explode("purchased_items").alias("product_id")) \
    .groupBy("product_id") \
    .count()

In [23]:
sold_items.show()

+--------------------+-----+
|          product_id|count|
+--------------------+-----+
|c5bd6ad3-ac60-4c0...|    2|
|d44a56da-bb34-4d8...|    2|
|7c849a15-c74d-4a6...|    3|
|85696f4b-da15-42c...|    2|
|72157635-b624-4ff...|    3|
|d90c35fc-5e13-4aa...|    2|
|80f2aef9-2c26-448...|    1|
|f3b118bf-f82e-4be...|    3|
|8aaaaa73-1a08-448...|    2|
|ee3a788a-36f3-4db...|    3|
|28306fc3-f8af-45a...|    3|
|1375c117-be7a-401...|    2|
|97ddf2a4-9763-4b0...|    1|
|974e8213-0de9-4be...|    1|
|dd0610ad-90e8-496...|    2|
|be63a878-fdf2-4c0...|    4|
|45a19dcc-f542-454...|    3|
|edfd038a-d28f-45f...|    2|
|5a8e07fd-d7de-45a...|    3|
|047f7154-7260-495...|    3|
+--------------------+-----+
only showing top 20 rows



In [24]:
delete_items = items \
    .join(sold_items, items.id==sold_items.product_id, how="left") \
    .orderBy("count") \
    .limit(500) \
    .select("id")

In [25]:
delete_items.show()

+--------------------+
|                  id|
+--------------------+
|cd8b23d3-2db3-4f2...|
|d3ee0cf0-6fa7-4ac...|
|694eeed6-e82f-4e6...|
|361b7292-3da0-4b0...|
|6a37119c-c1bb-4a4...|
|c5d3dd9e-64f7-4df...|
|13547d25-6fcc-4fb...|
|5538e02e-d640-42f...|
|6cec5cb3-cc76-45c...|
|672ed299-3238-435...|
|c3d720bf-d635-4f9...|
|74a9ed95-bc0a-427...|
|cb2c3489-0338-48f...|
|040ed5dd-c74f-4e9...|
|20e895d5-ccac-47d...|
|4983ffe0-bf21-4bd...|
|e04e2398-a3dd-44d...|
|72ee81a2-6a8b-4b1...|
|8d4c70cf-d128-4cc...|
|85f79e2c-a74e-40f...|
+--------------------+
only showing top 20 rows



### Discount
Return top 10% of users that should have a discount (a user is counted as top based on money spent)

In [26]:
n_users = purchases \
    .groupBy("user_id") \
    .count() \
    .count()

n_users

1000

In [27]:
n_top_users = int(n_users * 0.1)

n_top_users

100

In [28]:
top_users = purchases \
    .groupBy("user_id") \
    .agg(F.sum("amount_paid").alias("money_spent")) \
    .orderBy(F.col("money_spent").desc()) \
    .limit(n_top_users) \
    .select("user_id")

In [29]:
top_users.show()

+--------------------+
|             user_id|
+--------------------+
|22bcbed0-41ec-47a...|
|a0cfa21b-dde0-486...|
|c3db58ba-427d-49a...|
|221c4529-bb6e-48e...|
|5ef9dc9a-5330-4c3...|
|7290914b-05cb-4de...|
|d9441dcd-989b-4da...|
|9652a53d-25f2-40a...|
|292bdbb4-5963-470...|
|d1564cb6-a3df-4d7...|
|6341d982-b442-455...|
|3ef52fdf-ca7c-4c4...|
|6842deba-b35d-4d7...|
|ddf3963f-e51f-41c...|
|80b2f316-decf-488...|
|43a7d1a3-a8b3-496...|
|678c1c9e-7939-46d...|
|6f72c1f0-408d-4d5...|
|8b8614fb-217b-464...|
|7da35886-1b17-466...|
+--------------------+
only showing top 20 rows



### Recommendation (Report)
- Top 10 users by number of purchases
- Top 10 users by money spent
- Top 10 frequent items (The most frequent bought)
- Top 10 valuable items (The biggest amount of money spent on them)

In [30]:
# Top 10 users by number of purchases

top_10_users_by_purchases = purchases \
    .groupBy("user_id") \
    .count() \
    .orderBy(F.col("count").desc()) \
    .limit(10)

top_10_users_by_purchases.show()

+--------------------+-----+
|             user_id|count|
+--------------------+-----+
|41e1763e-288a-470...|   19|
|caa164ed-4784-49d...|   19|
|e339f2fe-7da3-44c...|   19|
|29b45029-b90a-4a7...|   18|
|0fb359a8-13a6-444...|   18|
|855d1fc0-948c-460...|   17|
|01fb7739-72f1-4b1...|   17|
|234f892e-e483-46a...|   16|
|d63a1178-4fe3-4df...|   16|
|004f4b3b-c570-434...|   16|
+--------------------+-----+



In [31]:
# Top 10 users by money spent

top_10_users_by_money = purchases \
    .groupBy("user_id") \
    .agg(F.sum("amount_paid").alias("money_spent")) \
    .orderBy(F.col("money_spent").desc()) \
    .limit(10) \
    .select("user_id")

top_10_users_by_money.show()

+--------------------+
|             user_id|
+--------------------+
|22bcbed0-41ec-47a...|
|a0cfa21b-dde0-486...|
|c3db58ba-427d-49a...|
|221c4529-bb6e-48e...|
|5ef9dc9a-5330-4c3...|
|7290914b-05cb-4de...|
|d9441dcd-989b-4da...|
|9652a53d-25f2-40a...|
|292bdbb4-5963-470...|
|d1564cb6-a3df-4d7...|
+--------------------+



In [32]:
# Top 10 frequent items (The most frequently bought)

top_10_frequent_items = purchases \
    .select(F.explode("purchased_items").alias("product_id")) \
    .groupBy("product_id") \
    .count() \
    .orderBy(F.col("count").desc()) \
    .limit(10) \
    .select("product_id")

top_10_frequent_items.show()

+--------------------+
|          product_id|
+--------------------+
|4dc196ad-c937-4ca...|
|d3a34b45-b4ba-484...|
|ad533934-3ad4-425...|
|c4dc4070-0304-40f...|
|a9087ac0-f367-48e...|
|eb069b69-4582-4cf...|
|07efa244-73e6-471...|
|9d9fc35f-5a6f-4cf...|
|5798fc03-578e-4b2...|
|6b677752-c71c-4ef...|
+--------------------+



In [33]:
# Top 10 valuable items (The biggest amount of money spent on them)
# TODO: .isNotNull() temporary; check whether user_id has a discount 

top_10_valuable_items = purchases \
    .select(F.explode("purchased_items").alias("product_id"),
            "user_id") \
    .join(items_prices, on="product_id") \
    .groupBy("product_id", "user_id") \
    .agg(
        F.when(F.col("user_id").isNotNull(), F.sum("price")) \
        .otherwise(F.sum("discount_price")) \
        .alias("money_spent")
    ) \
    .groupBy("product_id") \
    .agg(
        F.round(F.sum("money_spent"), 2) \
        .alias("total_money_spent")) \
    .orderBy(F.col("total_money_spent").desc()) \
    .limit(10)

top_10_valuable_items.show()

+--------------------+-----------------+
|          product_id|total_money_spent|
+--------------------+-----------------+
|1a91bd3d-9b3b-468...|           296.16|
|6aae9d38-a8ab-4fe...|            289.8|
|f6264131-7da1-450...|           198.78|
|ce7f618f-d045-427...|           192.45|
|f9c9f33d-51ea-497...|           186.88|
|e8be9918-38a0-4b6...|           161.68|
|38c689bc-2e2e-4f0...|           158.58|
|54cce3eb-c5d5-45a...|            157.6|
|ade624b8-12da-4ce...|           156.84|
|2a926d40-8090-4a4...|           154.16|
+--------------------+-----------------+

