# This experiment was performed using cornac module which a very popular recommender module used for research based works in recommender field and is quickly gaining the popularity. Please check out their official documentation in the link mentioned below:  

**Link to Cornac Official Site:** https://cornac.readthedocs.io/en/latest/  
**Link to Cornac Doucmentation:** https://cornac.readthedocs.io/_/downloads/en/latest/pdf/ 

In [1]:
!pip install recommenders==1.1.0

Collecting recommenders==1.1.0
  Downloading recommenders-1.1.0-py3-none-manylinux1_x86_64.whl (335 kB)
     |████████████████████████████████| 335 kB 4.5 MB/s            
Collecting pandera[strategies]>=0.6.5
  Downloading pandera-0.9.0-py3-none-any.whl (197 kB)
     |████████████████████████████████| 197 kB 64.9 MB/s            
Collecting pyyaml<6,>=5.4.1
  Downloading PyYAML-5.4.1-cp37-cp37m-manylinux1_x86_64.whl (636 kB)
     |████████████████████████████████| 636 kB 68.6 MB/s            
Collecting category-encoders<2,>=1.3.0
  Downloading category_encoders-1.3.0-py2.py3-none-any.whl (61 kB)
     |████████████████████████████████| 61 kB 5.0 MB/s             
Collecting cornac<2,>=1.1.2
  Downloading cornac-1.14.2-cp37-cp37m-manylinux1_x86_64.whl (12.4 MB)
     |████████████████████████████████| 12.4 MB 61.3 MB/s            
Collecting nltk<4,>=3.4
  Downloading nltk-3.7-py3-none-any.whl (1.5 MB)
     |████████████████████████████████| 1.5 MB 54.5 MB/s            
Collecting power

In [2]:
!pip install pyspark==3.2.1

Collecting pyspark==3.2.1
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
     |████████████████████████████████| 281.4 MB 33 kB/s              
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
     |████████████████████████████████| 198 kB 41.7 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=57e07e26889e89b3f4af7887ec5d3e7873add4678f35adab6e42a9022782b3ad
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.4
    Uninstalling py4j-0.10.9.4:
      Successfully uninstalled py4j-0.10.9.4
Successfully installed py4j-0.10.9.3 p

In [3]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType, StringType, TimestampType

from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation, SparkRatingEvaluation
from recommenders.tuning.parameter_sweep import generate_param_grid
from recommenders.datasets.spark_splitters import spark_random_split

print("System version: {}".format(sys.version))
print("Pandas version: {}".format(pd.__version__))
print("PySpark version: {}".format(pyspark.__version__))

System version: 3.7.12 | packaged by conda-forge | (default, Oct 26 2021, 06:08:53) 
[GCC 9.4.0]
Pandas version: 1.3.5
PySpark version: 3.2.1


In [4]:
data_path = r'../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv'
customer_data_path = r'../input/h-and-m-personalized-fashion-recommendations/customers.csv'
article_data_path = r'../input/h-and-m-personalized-fashion-recommendations/articles.csv'
submission_data_path = r'../input/h-and-m-personalized-fashion-recommendations/sample_submission.csv'

In [5]:
sc = SparkSession.builder.appName("H&M_ALS_Recommender").config("spark.sql.files.maxPartitionBytes", 5000000).getOrCreate()
spark = SparkSession(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/06 22:52:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
transaction_sp_df = spark.read.option("header",True).csv(data_path)
transaction_sp_df.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [7]:
min_date, max_date = transaction_sp_df.select(F.min("t_dat"), F.max("t_dat")).first()
min_date, max_date

                                                                                

('2018-09-20', '2020-09-22')

In [8]:
transaction_sp_df =  transaction_sp_df.withColumn('t_dat', transaction_sp_df['t_dat'].cast('string'))
transaction_sp_df = transaction_sp_df.withColumn('date', F.from_unixtime(F.unix_timestamp('t_dat', 'yyyy-MM-dd')))

date_to_filter = F.to_date(F.lit('2020-08-21')).cast(TimestampType())

transaction_sp_df = transaction_sp_df.filter((transaction_sp_df['date']>date_to_filter))
transaction_sp_df.count()

                                                                                

1190911

In [11]:
transaction_sp_df =  transaction_sp_df.withColumn('price', transaction_sp_df['price'].cast('float'))
transaction_sp_df = transaction_sp_df.groupby(['customer_id','article_id']).agg(F.sum('price').alias('purchase_amount'), \
                                                                                F.count('t_dat').alias('purchase_count'))
transaction_sp_df.show()



+--------------------+----------+--------------------+--------------+
|         customer_id|article_id|     purchase_amount|purchase_count|
+--------------------+----------+--------------------+--------------+
|009f4e304a83016f8...|0903276002| 0.04083050787448883|             1|
|036e6dd5bf47b97e2...|0806388003|0.013542372733354568|             1|
|03dd3e86d9e9b3191...|0715624001| 0.02540677972137928|             1|
|0509509190fd57e3f...|0815004005|0.022016949951648712|             1|
|056729b03521f2526...|0852374013|0.033881355077028275|             1|
|05d5fd6625b0521f7...|0909924004|0.029999999329447746|             1|
|06ccd9c0f6b4a33f4...|0817401001|0.043389830738306046|             2|
|07752786feb0296c6...|0865932001|0.023152543231844902|             1|
|080f4e977c1bdf5a1...|0629758005|0.005067796446382999|             1|
|0b1d12e082618d454...|0682236001| 0.02540677972137928|             1|
|0c5da9abf14fe5b85...|0890197001|0.033881355077028275|             1|
|0ddd5f545d26b7d34..

                                                                                

In [12]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(transaction_sp_df.columns)-set(['purchase_amount','purchase_count'])) ]
pipeline = Pipeline(stages=indexer)
transaction_indexed_sp_df = pipeline.fit(transaction_sp_df).transform(transaction_sp_df)
transaction_indexed_sp_df.show()

22/04/06 22:59:32 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
[Stage 30:>                                                         (0 + 1) / 1]

+--------------------+----------+--------------------+--------------+----------------+-----------------+
|         customer_id|article_id|     purchase_amount|purchase_count|article_id_index|customer_id_index|
+--------------------+----------+--------------------+--------------+----------------+-----------------+
|009f4e304a83016f8...|0903276002| 0.04083050787448883|             1|          1832.0|          10737.0|
|036e6dd5bf47b97e2...|0806388003|0.013542372733354568|             1|           401.0|         142309.0|
|03dd3e86d9e9b3191...|0715624001| 0.02540677972137928|             1|            17.0|          77338.0|
|0509509190fd57e3f...|0815004005|0.022016949951648712|             1|         13955.0|          16642.0|
|056729b03521f2526...|0852374013|0.033881355077028275|             1|          2446.0|         104661.0|
|05d5fd6625b0521f7...|0909924004|0.029999999329447746|             1|          1996.0|         142815.0|
|06ccd9c0f6b4a33f4...|0817401001|0.043389830738306046| 

                                                                                

In [13]:
implicit_feedback_columns = ['customer_id_index','article_id_index','purchase_count']
train_sp_df, test_sp_df = spark_random_split(transaction_indexed_sp_df.select(*implicit_feedback_columns), ratio=0.75, seed=42)

In [14]:
RANK = 10
MAX_ITER = 15
REG_PARAM = 0.05
K=12

In [15]:
als = ALS(
    maxIter=MAX_ITER, 
    rank=RANK,
    regParam=REG_PARAM, 
    userCol='customer_id_index', 
    itemCol='article_id_index', 
    ratingCol='purchase_count', 
    coldStartStrategy="drop",
    nonnegative=True
)

model = als.fit(train_sp_df)

22/04/06 23:01:37 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:01:40 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:01:43 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:01:46 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:01:51 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:01:53 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:01:59 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:02:02 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:02:06 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:02:11 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:02:14 WARN DAGScheduler: Broadcasting large task binary with size 23.0 MiB
22/04/06 23:02:20 WARN DAGScheduler: Broadc

In [20]:
# Get the cross join of all user-item pairs and score them.
users = train_sp_df.select('customer_id_index').distinct()
items = train_sp_df.select('article_id_index').distinct()
user_item = users.crossJoin(items)

preds_sp_df = model.transform(user_item)

# Remove seen items.
preds_sp_df_exclude_train = preds_sp_df.alias("pred").join(
    train_sp_df.alias("train"),
    (preds_sp_df['customer_id_index'] == train_sp_df['customer_id_index']) & (preds_sp_df['article_id_index'] == train_sp_df['article_id_index']),
    how='outer'
)

preds_final_sp_df = preds_sp_df_exclude_train.filter(preds_sp_df_exclude_train["train.purchase_count"].isNull()) \
    .select('pred.' + 'customer_id_index', 'pred.' + 'article_id_index', 'pred.' + "prediction")

preds_final_sp_df.show(10)

In [None]:
evaluations = SparkRankingEvaluation(
    test_sp_df, 
    preds_final_sp_df,
    col_user='customer_id_index',
    col_item='article_id_index',
    col_rating='purchase_count',
    col_prediction='prediction',
    k=K
)

print(
    "Precision@k = {}".format(evaluations.precision_at_k()),
    "Recall@k = {}".format(evaluations.recall_at_k()),
    "NDCG@k = {}".format(evaluations.ndcg_at_k()),
    "Mean average precision = {}".format(evaluations.map_at_k()),
    sep="\n"
)