# <p style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;font-size:150%;text-align:center;border-radius:10px 10px;border-style:solid;border-color:#d90b1c;">Recommendation system for H and M Fashion</p>

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">Terminologies</h1>

There are certain terminologies which needs to be understood before moving forward.

**Apache Spark:** Apache Spark is an open-source distributed general-purpose cluster-computing framework.It can be used with Hadoop too.

**Collaborative filtering:** Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users. Consider example if a person A likes item 1, 2, 3 and B like 2,3,4 then they have similar interests and A should like item 4 and B should like item 1.

**Alternating least square(ALS) matrix factorization:** The idea is basically to take a large (or potentially huge) matrix and factor it into some smaller representation of the original matrix through alternating least squares. We end up with two or more lower dimensional matrices whose product equals the original one.ALS comes inbuilt in Apache Spark.

**PySpark:** PySpark is the collaboration of Apache Spark and Python. PySpark is the Python API for Spark.

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">1.Initialize spark session</h1>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
     |████████████████████████████████| 310.8 MB 8.7 kB/s             
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     |████████████████████████████████| 200 kB 53.3 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317143 sha256=1c6ec3dd91c997b6a8091cba738aed3153194ec170e8bf6e6a4edd36ba7f0adc
  Stored in directory: /root/.cache/pip/wheels/06/51/98/f7a41aad64c08302d6c26c90650e713c3dfeb5cdec4946db00
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.3
    Uninstalling py4j-0.10.9.3:
      Successfully uninstalled py4j-0.10.9.3
Successfully installed py4j-0.10.9.7 pyspark-


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">2-Load libraries</h1>

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

sc = SparkSession.builder.appName("Recommendations").config("spark.sql.files.maxPartitionBytes", 5000000).getOrCreate()
spark = SparkSession(sc)



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/07 01:18:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">3-Load Dataset in Apache Spark</h1>

In [3]:
transaction = spark.read.option("header",True) \
              .csv("../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv")
transaction.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [4]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
min_date, max_date = transaction.select(min("t_dat"), max("t_dat")).first()
min_date, max_date

                                                                                

('2018-09-20', '2020-09-22')

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">5-Select data for recommendation</h1>

In this transaction dataset we have 31,788,324 rows and 5 columns.Let's capture first what are the most recently bought articles.For recommendation I am selecting only date 2020-09-22 which is the last transaction date.</h1>

In [5]:

hm =  transaction.withColumn('t_dat', transaction['t_dat'].cast('string'))
hm = hm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
hm = hm.withColumn('year', year(col('date')))
hm = hm.withColumn('month', month(col('date')))
hm = hm.withColumn('day', date_format(col('date'), "d"))

hm = hm[hm['year'] == 2020]
hm = hm[hm['month'] == 9]
hm = hm[hm['day'] == 22]
transaction.unpersist()

# Prepare the dataset
hm = hm.groupby('customer_id', 'article_id').count()
hm.show(5)



+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|00f7bc5c0df4c615b...|0780418013|    1|
|02094817e46f3b692...|0791587001|    1|
|0333e5dda0257e9f4...|0839332002|    2|
|07c7a1172caf8fb97...|0573085043|    1|
|081373184e601470c...|0714790020|    1|
+--------------------+----------+-----+
only showing top 5 rows



                                                                                

In [6]:
print((hm.count(), len(hm.columns)))



(29486, 3)


                                                                                

In [None]:
# Count the total number of article in the dataset
numerator = hm.select("count").count()

# Count the number of distinct customerid and distinct articleid
num_users = hm.select("customer_id").distinct().count()
num_articles = hm.select("article_id").distinct().count()

# Set the denominator equal to the number of customer multiplied by the number of articles
denominator = num_users * num_articles

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("Sparsity: ", "%.2f" % sparsity + "%.")



In [None]:
userId_count = hm.groupBy("customer_id").count().orderBy('count', ascending=False)
userId_count.show()

In [None]:
articleId_count = hm.groupBy("article_id").count().orderBy('count', ascending=False)
articleId_count.show()

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">5-Importing important modules</h1>

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">6-Converting String to index</h1>

Before making an ALS model it needs to be clear that ALS only accepts integer value as parameters. Hence we need to convert customer_id and article_id column in index form.

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(hm.columns)-set(['count'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(hm).transform(hm)
transformed.show()


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">7-Creating training and test data</h1>

In [None]:
(training,test)=transformed.randomSplit([0.8, 0.2])


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">8-Creating ALS model and fitting data</h1>

To build the model explicitly specify the columns. Set nonnegative as ‘True’, since we are looking count greater than 0. The model also gives an option to select implicit ratings. Since we are working with explicit, set it to ‘False’ or by default it takes explicit.

When using simple random splits as in Spark’s CrossValidator or TrainValidationSplit, it is actually very common to encounter users and/or items in the evaluation set that are not in the training set. By default, Spark assigns NaN predictions during ALSModel.transform when a user and/or item factor is not present in the model.We set cold start strategy to ‘drop’ to ensure we don’t get NaN evaluation metrics.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


#create ALS model
als=ALS(userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)

#tune model using ParamGridBuilder
param_grid = ParamGridBuilder()\
            .addGrid(als.rank, [15,20,25])\
            .addGrid(als.maxIter,[5,10,15])\
            .addGrid(als.regParam,[0.09,0.14,0.19])\
            .build()
#define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = "rmse",labelCol = 'count', predictionCol = 'prediction')

#Build cross validation using CrossValidator
cv = CrossValidator(estimator=als,estimatorParamMaps=param_grid, evaluator=evaluator,numFolds=3)


#Fit ALS model to training data
model = cv.fit(training)

In [None]:
""""""als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)""""""


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">9-Evaluate rmse</h1>

In [None]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

#Generate predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
#print evaluation metrics and model parameters
print("RMSE =" + str(rmse))
print("**Best Model**")
print("Rank : {}".format(best_model.rank))
print("MaxIter: {}".format(best_model._java_obj.parent().getMaxIter()))
print("RegParam: {}".format(best_model._java_obj.parent().getRegParam()))

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">10-Providing Recommendations by Article id</h1>

In [None]:
user_recs=best_model.recommendForAllItems(10).show(10)


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">11-Providing Recommendations by Customer id</h1>

In [None]:
df_recom = best_model.recommendForAllUsers(10)
df_recom.show(10)

In [None]:
df_recom = df_recom.select("customer_id_index","recommendations.article_id_index")
df_recom.show(10)
df_recom = df_recom.toPandas()

In [None]:
df_recom.sort_values('customer_id_index')

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">12-Converting back to string form</h1>

As seen in above image the results are in integer form we need to convert it back to its original name.The code is little bit longer given so many conversions.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pandas as pd
md=transformed.select(transformed['article_id'],transformed['article_id_index'],transformed['customer_id'],transformed['customer_id_index'])
md=md.toPandas()
md

In [None]:
dict1 =dict(zip(md['article_id_index'],md['article_id']))
dict2=dict(zip(md['customer_id_index'],md['customer_id']))
df_recom['article_id'] = df_recom['article_id_index'].map(lambda x: [dict1[y] for y in x if y in dict1])
df_recom['customer_id']=df_recom['customer_id_index'].map(dict2)
df_recom

In [None]:
recom_final = df_recom.drop(['customer_id_index','article_id_index'], axis = 1)
finalpre=recom_final[['customer_id','article_id']]
finalpre

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">13-Export the prediction</h1>

In [None]:
my_pred = finalpre.toPandas()
my_pred.to_csv('my_pred.csv',index=False)