In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

import pandas as pd
pd.set_option('display.max_columns', None)

In [3]:
spark = SparkSession.builder \
    .appName("Fashionable")\
    .master("local[*]")\
    .config("spark.driver.memory","25G")\
    .config("spark.kryoserializer.buffer.max", "512m")\
    .getOrCreate()

In [4]:
src_trans_pq = r'D:\data\h&m\transactions_train.parquet'
# src_article_pq = r'D:\data\h&m\articles.parquet'
# src_cust_pq = r'D:\data\h&m\customers.parquet'

In [5]:
df_trans = spark.read.parquet(src_trans_pq)

In [6]:
df_trans.limit(5).toPandas()

Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id
0,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,706016003,0.025288,2
1,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,706016001,0.025288,2
2,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,682236013,0.018983,2
3,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,706016016,0.025288,2
4,2019-11-29,aaa7a0483dd5b9e395d95324dcbfeb617af9800f39487d...,783335003,0.020322,2


In [7]:
df_trans.count()

31788324

In [8]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [9]:
# indexer = [StringIndexer(inputCol=column, outputCol=column+"_idx") for column in ['customer_id', 'article_id']]
# pipeline = Pipeline(stages=indexer)
# df_trans = pipeline.fit(df_trans).transform(df_trans)

custIndexer = StringIndexer().setInputCol('customer_id').setOutputCol('customer_id_idx')
articleIndexer = StringIndexer().setInputCol('article_id').setOutputCol('article_id_idx')
pipeline = Pipeline(stages=[custIndexer, articleIndexer])

df_trans = pipeline.fit(df_trans).transform(df_trans)

In [10]:
cust_map = df_trans.select('customer_id_idx', 'customer_id')
article_map = df_trans.select('article_id_idx', 'article_id')

In [11]:
df_trans.limit(5).toPandas()

Unnamed: 0,t_dat,customer_id,article_id,price,sales_channel_id,customer_id_idx,article_id_idx
0,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,706016003,0.025288,2,340255.0,9.0
1,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,706016001,0.025288,2,340255.0,0.0
2,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,682236013,0.018983,2,340255.0,17656.0
3,2019-11-29,aaa78c87aacba903d16f393da3edeca27d62e642b1a639...,706016016,0.025288,2,340255.0,6159.0
4,2019-11-29,aaa7a0483dd5b9e395d95324dcbfeb617af9800f39487d...,783335003,0.020322,2,1318380.0,44019.0


In [12]:
df_trans.printSchema()

root
 |-- t_dat: date (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)
 |-- customer_id_idx: double (nullable = false)
 |-- article_id_idx: double (nullable = false)



### Alternating Least Squares (ALS)

In [13]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [14]:
df_trans = df_trans.withColumn('rating', F.lit(1))

In [15]:
training, test = df_trans.randomSplit([0.8, 0.2])

In [16]:
als = ALS(nonnegative=True)\
    .setMaxIter(5)\
    .setRegParam(0.01)\
    .setUserCol('customer_id_idx')\
    .setItemCol('article_id_idx')\
    .setRatingCol('rating')\
    .setColdStartStrategy('drop')

In [17]:
print(als.explainParams())

alpha: alpha for implicit preference (default: 1.0)
blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLe

In [18]:
import time

In [19]:
start = time.time()
alsModel = als.fit(training)
end = time.time()
print(end - start)

177.25513100624084


In [20]:
start = time.time()
predictions = alsModel.transform(test)
end = time.time()
print(end - start)

0.053290367126464844


In [21]:
# user_rec = alsModel.recommendForAllUsers(12).selectExpr("customer_id_idx", "explode(recommendations)")

In [22]:
# start = time.time()
# alsModel.recommendForAllItems(10).selectExpr("article_id_idx", "explode(recommendations)").show()
# end = time.time()
# print(end - start)

In [31]:
predictions.printSchema()

root
 |-- t_dat: date (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)
 |-- customer_id_idx: double (nullable = false)
 |-- article_id_idx: double (nullable = false)
 |-- rating: integer (nullable = false)
 |-- prediction: float (nullable = false)



In [None]:
alsModel.recommendForAllUsers(12).write.mode('overwrite').parquet(r'D:\data\h&m\predictions_001.parquet')

In [25]:
from pyspark.ml.evaluation import RankingEvaluator

In [28]:
evaluator = RankingEvaluator()\
    .setPredictionCol('prediction')\
    .setLabelCol('rating')

In [None]:
group_pred = predictions\
    .groupBy('customer_id')\
    .agg(F.collect_set('prediction').alias('prediction'))

In [29]:
evaluator.evaluate(predictions, {evaluator.metricName: "precisionAtK", evaluator.k: 12})

IllegalArgumentException: requirement failed: Column prediction must be of type equal to one of the following types: [array<double>, array<double>] but was actually of type float.

In [None]:
metrics = RankingMetrics(predictionAndLabels)

In [None]:
evaluator = RankingEvaluator()\
    .setMetricName('rmse')\
    .setLabelCol('rating')\
    .setPredictionCol('prediction')

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator()\
    .setMetricName('rmse')\
    .setLabelCol('rating')\
    .setPredictionCol('prediction')

rmse = evaluator.evaluate(predictions)
print(f'RMSE: {rmse}')

In [None]:
alsModel.recommendForAllUsers(12).printSchema()

Get user recommendations

In [None]:
# user_rec = alsModel.recommendForAllUsers(12).select #.selectExpr("customer_id_idx", "explode(recommendations)")
user_rec = alsModel.recommendForAllUsers(12)\
    .select("customer_id_idx", F.explode("recommendations"))\
    .select(F.col('customer_id_idx'), F.col('col.article_id_idx'))\
    .join(cust_map, on='customer_id_idx')\
    .join(article_map, on='article_id_idx')\
    .select('customer_id', 'article_id')

In [None]:
user_rec.printSchema()

In [None]:
# from pyspark.ml.feature import IndexToString
# labelReverse = IndexToString().setInputCol('customer_id_idx')
# lr = labelReverse.transform(user_rec)

In [None]:
group_cust = user_rec\
    .groupBy('customer_id')\
    .agg(F.collect_set('article_id').alias('prediction'))

In [None]:
group_cust.write.parquet(r'D:\data\h&m\predictions_001.parquet')