In [27]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode

In [2]:
spark = SparkSession.builder \
    .appName("HM-EDA") \
    .config("spark.driver.memory", "8g") \
    .master("local[*]") \
    .getOrCreate()

print("Spark is ready")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/29 19:03:34 WARN Utils: Your hostname, DESKTOP-FVKCA6T, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/29 19:03:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/29 19:03:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark is ready


In [3]:
data_path = "file:///home/bartek/real-time-recommendation/data"

articles_path = f"{data_path}/articles.csv"
customers_path = f"{data_path}/customers.csv"
transactions_path = f"{data_path}/transactions_train.csv"

In [4]:
articles = spark.read.csv(articles_path, header=True, inferSchema=True)
customers = spark.read.csv(customers_path, header=True, inferSchema=True)
transactions = spark.read.csv(transactions_path, header=True, inferSchema=True)

                                                                                

In [5]:
articles.write.parquet(f"{data_path}/articles.parquet")
customers.write.parquet(f"{data_path}/customers.parquet")
transactions.write.parquet(f"{data_path}/transactions.parquet")

                                                                                

In [6]:
articles = spark.read.parquet(f"{data_path}/articles.parquet")
customers = spark.read.parquet(f"{data_path}/customers.parquet")
transactions = spark.read.parquet(f"{data_path}/transactions.parquet")

In [7]:
transactions.printSchema()

transactions.show(5)

root
 |-- t_dat: date (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)

+----------+--------------------+----------+--------------------+----------------+
|     t_dat|         customer_id|article_id|               price|sales_channel_id|
+----------+--------------------+----------+--------------------+----------------+
|2019-11-29|aaa78c87aacba903d...| 706016003|0.025288135593220337|               2|
|2019-11-29|aaa78c87aacba903d...| 706016001|0.025288135593220337|               2|
|2019-11-29|aaa78c87aacba903d...| 682236013|0.018983050847457626|               2|
|2019-11-29|aaa78c87aacba903d...| 706016016|0.025288135593220337|               2|
|2019-11-29|aaa7a0483dd5b9e39...| 783335003|0.020322033898305086|               2|
+----------+--------------------+----------+--------------------+----------------+
only showing top 5 rows


In [8]:
total_rows = transactions.count()
print(f"Number of transactions: {total_rows}")

unique_customers = transactions.select("customer_id").distinct().count()
print(f"Number of unique customers: {unique_customers}")

Number of transactions: 31788324




Number of unique customers: 1362281


                                                                                

In [9]:
transactions2 = spark.read.csv(transactions_path, header=True, inferSchema=True)

                                                                                

In [10]:
total_rows2 = transactions2.count()
print(f"Number of transactions: {total_rows2}")

unique_customers2 = transactions2.select("customer_id").distinct().count()
print(f"Number of unique customers: {unique_customers2}")

                                                                                

Number of transactions: 31788324




Number of unique customers: 1362281


                                                                                

well... quite of a time difference between parquet and csv

In [15]:
user_indexer = StringIndexer(inputCol="customer_id", outputCol="user_idx").setHandleInvalid("keep")
item_indexer = StringIndexer(inputCol="article_id", outputCol="item_idx").setHandleInvalid("keep")

pipeline = Pipeline(stages=[user_indexer, item_indexer])
indexer_model = pipeline.fit(transactions)
transactions_indexed = indexer_model.transform(transactions)

transactions_indexed.select("customer_id", "user_idx", "article_id", "item_idx").show(5)

25/12/29 19:16:08 WARN DAGScheduler: Broadcasting large task binary with size 114.4 MiB
[Stage 48:>                                                         (0 + 1) / 1]

+--------------------+---------+----------+--------+
|         customer_id| user_idx|article_id|item_idx|
+--------------------+---------+----------+--------+
|aaa78c87aacba903d...| 340255.0| 706016003|     9.0|
|aaa78c87aacba903d...| 340255.0| 706016001|     0.0|
|aaa78c87aacba903d...| 340255.0| 682236013| 17656.0|
|aaa78c87aacba903d...| 340255.0| 706016016|  6159.0|
|aaa7a0483dd5b9e39...|1318380.0| 783335003| 44019.0|
+--------------------+---------+----------+--------+
only showing top 5 rows


                                                                                

In [16]:
popular_products = transactions_indexed.groupBy("article_id").count().orderBy(F.desc("count")).limit(20)

popular_products_with_names = popular_products.join(articles, "article_id").select("article_id", "prod_name", "product_type_name", "count") \
    .orderBy(F.desc("count"))

popular_products_with_names.show(truncate=False)

+----------+-------------------------+-----------------+-----+
|article_id|prod_name                |product_type_name|count|
+----------+-------------------------+-----------------+-----+
|706016001 |Jade HW Skinny Denim TRS |Trousers         |50287|
|706016002 |Jade HW Skinny Denim TRS |Trousers         |35043|
|372860001 |7p Basic Shaftless       |Socks            |31718|
|610776002 |Tilly (1)                |T-shirt          |30199|
|759871002 |Tilda tank               |Vest top         |26329|
|464297007 |Greta Thong Mynta Low 3p |Underwear bottom |25025|
|372860002 |7p Basic Shaftless       |Socks            |24458|
|610776001 |Tilly (1)                |T-shirt          |22451|
|399223001 |Curvy Jeggings HW Ankle  |Trousers         |22236|
|706016003 |Jade HW Skinny Denim TRS |Trousers         |21241|
|720125001 |SUPREME RW tights        |Leggings/Tights  |21063|
|156231001 |Box 4p Tights            |Underwear Tights |21013|
|562245046 |Luna skinny RW           |Trousers         

In [18]:
numerator = transactions_indexed.select("customer_id", "article_id").distinct().count()
num_users = transactions_indexed.select("user_idx").distinct().count()
num_items = transactions_indexed.select("item_idx").distinct().count()

denominator = num_users * num_items
sparsity = (1.0 - (numerator * 1.0 / denominator)) * 100

print('User-item matrix sparsity:')
print(f"Matrix is: {100 - sparsity:.6f}% full")
print(f"Sparsity: {sparsity:.6f}%")

25/12/29 19:20:57 WARN DAGScheduler: Broadcasting large task binary with size 117.0 MiB
25/12/29 19:21:04 WARN DAGScheduler: Broadcasting large task binary with size 117.0 MiB
25/12/29 19:21:07 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB

User-item matrix sparsity:
Matrix is: 0.019173% full
Sparsity: 99.980827%


25/12/29 19:21:09 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
                                                                                

In [22]:
(train, test) = transactions_indexed.randomSplit([0.8, 0.2], seed=42)
# we need to add a rating column with constant value of 1.0 for implicit feedback
train = train.withColumn("rating", F.lit(1.0))
test = test.withColumn("rating", F.lit(1.0))

In [24]:
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="user_idx", 
    itemCol="item_idx", 
    ratingCol="rating", 
    implicitPrefs=True,
    coldStartStrategy="drop",
    nonnegative=True,
)

model = als.fit(train)
print("Model trained successfully")

25/12/29 19:32:29 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:32:33 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:32:48 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:32:54 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:33:00 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:33:04 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:33:09 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:33:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/12/29 19:33:12 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:33:15 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:33:19 WARN DAGScheduler: Broadcasting large task binary with size 121.4 MiB
25/12/29 19:3

Model trained successfully


In [25]:
user_recs = model.recommendForAllUsers(5)
user_recs.show(5, truncate=False)

25/12/29 19:38:03 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
25/12/29 19:43:03 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
[Stage 461:>                                                        (0 + 1) / 1]

+--------+--------------------------------------------------------------------------------------+
|user_idx|recommendations                                                                       |
+--------+--------------------------------------------------------------------------------------+
|28      |[{3, 0.38629615}, {7, 0.3730123}, {49, 0.3720497}, {45, 0.37145802}, {57, 0.32911897}]|
|31      |[{0, 0.5641865}, {4, 0.4799269}, {1, 0.460655}, {18, 0.4452774}, {8, 0.40456653}]     |
|34      |[{4, 0.440464}, {18, 0.39387068}, {22, 0.35760227}, {33, 0.34230834}, {8, 0.2984539}] |
|53      |[{3, 3.7996624}, {7, 3.2492824}, {40, 2.2350845}, {42, 2.0833542}, {32, 2.0828722}]   |
|65      |[{4, 0.72568905}, {18, 0.6584402}, {0, 0.5223078}, {8, 0.5037251}, {50, 0.46586284}]  |
+--------+--------------------------------------------------------------------------------------+
only showing top 5 rows


                                                                                

In [26]:
articles_indexed = indexer_model.stages[1].transform(articles)

In [28]:
recs_exploded = user_recs.withColumn("rec", explode("recommendations")).select("user_idx", "rec.item_idx", "rec.rating")

recs_with_info = recs_exploded.join(articles_indexed, "item_idx").select("user_idx", "article_id", "prod_name", "rating")

recs_with_info.show(25, truncate=False)

25/12/29 19:46:59 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
25/12/29 19:47:00 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
25/12/29 19:52:05 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
[Stage 511:>                                                        (0 + 1) / 1]

+--------+----------+----------------------------+----------+
|user_idx|article_id|prod_name                   |rating    |
+--------+----------+----------------------------+----------+
|28      |610776002 |Tilly (1)                   |0.38629615|
|28      |610776001 |Tilly (1)                   |0.3730123 |
|28      |678942001 |Harrison short sleeve top CN|0.3720497 |
|28      |749699002 |Chestnut strap top          |0.37145802|
|28      |749699001 |Chestnut strap top          |0.32911897|
|31      |706016001 |Jade HW Skinny Denim TRS    |0.5641865 |
|31      |759871002 |Tilda tank                  |0.4799269 |
|31      |706016002 |Jade HW Skinny Denim TRS    |0.460655  |
|31      |448509014 |Perrie Slim Mom Denim TRS   |0.4452774 |
|31      |399223001 |Curvy Jeggings HW Ankle     |0.40456653|
|34      |759871002 |Tilda tank                  |0.440464  |
|34      |448509014 |Perrie Slim Mom Denim TRS   |0.39387068|
|34      |160442007 |3p Sneaker Socks            |0.35760227|
|34     

                                                                                

In [29]:
model.save("file:///home/bartek/real-time-recommendation/models/als_model")
indexer_model.save("file:///home/bartek/real-time-recommendation/models/indexer_model")

25/12/29 19:55:19 WARN DAGScheduler: Broadcasting large task binary with size 121.7 MiB
25/12/29 19:55:24 WARN DAGScheduler: Broadcasting large task binary with size 121.7 MiB
25/12/29 19:55:27 WARN TaskSetManager: Stage 562 contains a task of very large size (96429 KiB). The maximum recommended task size is 1000 KiB.
25/12/29 19:55:28 WARN TaskSetManager: Stage 564 contains a task of very large size (2484 KiB). The maximum recommended task size is 1000 KiB.


In [30]:
item_factors = model.itemFactors

item_vectors_to_export = item_factors.join(articles_indexed, item_factors.id == articles_indexed.item_idx) \
    .select("article_id", "prod_name", "features")

item_vectors_to_export.write.mode("overwrite").parquet("file:///home/bartek/real-time-recommendation/data/processed/item_vectors.parquet")

25/12/29 19:59:51 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
25/12/29 19:59:52 WARN DAGScheduler: Broadcasting large task binary with size 121.7 MiB
                                                                                

In [32]:
item_factors.show(5, truncate=False)

25/12/29 20:00:33 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
[Stage 634:>                                                        (0 + 1) / 1]

+---+----------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                    |
+---+----------------------------------------------------------------------------------------------------------------------------+
|0  |[0.18760663, 0.05007008, 0.038590755, 0.03752847, 0.04380038, 0.046046425, 1.8137769, 0.061762534, 0.016788663, 0.091892295]|
|10 |[0.6114376, 0.04562196, 0.038101897, 0.03688076, 0.040458187, 0.24343899, 0.04603544, 0.33030567, 0.031573664, 0.096841425] |
|20 |[0.088714376, 0.009996864, 0.0033170704, 0.70789546, 0.015505002, 0.26316994, 0.0, 0.024749385, 0.018888019, 0.3161598]     |
|30 |[0.0015739873, 0.008382113, 0.0, 0.036186825, 0.966362, 0.062153075, 0.02325028, 0.0, 0.0, 0.041700616]                     |
|40 |[0.0, 0.9354742, 0.012773889, 0.021644885, 0.0, 0.0, 0.0, 0.0398406, 0.0, 0.09

                                                                                

In [33]:
recs_with_info.write.mode("overwrite").parquet("file:///home/bartek/real-time-recommendation/data/processed/batch_recommendations.parquet")

25/12/29 20:02:46 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
25/12/29 20:02:46 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
25/12/29 20:07:35 WARN DAGScheduler: Broadcasting large task binary with size 121.8 MiB
                                                                                

In [34]:
user_factors = model.userFactors

user_map = transactions_indexed.select("customer_id", "user_idx").distinct()

user_vectors_to_export = user_factors.join(user_map, user_factors.id == user_map.user_idx).select("customer_id", "features")

user_vectors_to_export.write.mode("overwrite").parquet("file:///home/bartek/real-time-recommendation/data/processed/user_vectors.parquet")

25/12/29 20:09:53 WARN DAGScheduler: Broadcasting large task binary with size 121.5 MiB
25/12/29 20:09:55 WARN DAGScheduler: Broadcasting large task binary with size 117.0 MiB
25/12/29 20:10:04 WARN DAGScheduler: Broadcasting large task binary with size 117.0 MiB
25/12/29 20:10:10 WARN DAGScheduler: Broadcasting large task binary with size 121.7 MiB
                                                                                