<h1> ALS Recommendation model using pyspark </h1>
<br/>
ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called ‘factor’ matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix. <br/>
<h3>1. Import Dataset and libraries</h3>

In [0]:
# File location and type
file_location = "/FileStore/tables/interactions_train.csv"
file_type = "csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df =spark.read.csv(file_location,header=True,inferSchema=True)
df.show()

+-------+---------+-------------------+------+-----+------+
|user_id|recipe_id|               date|rating|    u|     i|
+-------+---------+-------------------+------+-----+------+
|   2046|     4684|2000-02-25 00:00:00|   5.0|22095| 44367|
|   2046|      517|2000-02-25 00:00:00|   5.0|22095| 87844|
|   1773|     7435|2000-03-13 00:00:00|   5.0|24732|138181|
|   1773|      278|2000-03-13 00:00:00|   4.0|24732| 93054|
|   2046|     3431|2000-04-07 00:00:00|   5.0|22095|101723|
|   2046|    13307|2000-05-21 00:00:00|   5.0|22095|134551|
|   2312|      780|2000-09-12 00:00:00|   5.0| 1674|127175|
|   2312|    51964|2000-09-26 00:00:00|   5.0| 1674|151793|
|   2312|     1232|2000-10-17 00:00:00|   4.0| 1674| 15498|
|   2312|     4397|2000-10-17 00:00:00|   5.0| 1674| 14380|
|   2625|      471|2000-10-18 00:00:00|   3.0|20667| 35144|
|   2312|      164|2000-10-20 00:00:00|   5.0| 1674| 96573|
|   2999|     3567|2000-10-23 00:00:00|   5.0|19047|118466|
|   2178|     3704|2000-10-30 00:00:00| 

In [0]:
df=df.drop('date','u','i')
df.show()

+-------+---------+------+
|user_id|recipe_id|rating|
+-------+---------+------+
|   2046|     4684|   5.0|
|   2046|      517|   5.0|
|   1773|     7435|   5.0|
|   1773|      278|   4.0|
|   2046|     3431|   5.0|
|   2046|    13307|   5.0|
|   2312|      780|   5.0|
|   2312|    51964|   5.0|
|   2312|     1232|   4.0|
|   2312|     4397|   5.0|
|   2625|      471|   3.0|
|   2312|      164|   5.0|
|   2999|     3567|   5.0|
|   2178|     3704|   3.0|
|   2178|     4366|   5.0|
|   3794|     7508|   4.0|
|   3794|      191|   5.0|
|   3794|     3525|   5.0|
|   2312|     3651|   5.0|
|   2695|      350|   1.0|
+-------+---------+------+
only showing top 20 rows



In [0]:
from pyspark.sql.types import *
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import requests, json, os, sys, time, re
from sklearn.metrics.pairwise import linear_kernel,cosine_similarity
import pandas as pd
import math
import datetime
import pyspark.sql.functions as sf
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import TimestampType
import hashlib


In [0]:
df=df.withColumn("rating",df.rating.cast('int'))
df.show()

+-------+---------+------+
|user_id|recipe_id|rating|
+-------+---------+------+
|   2046|     4684|     5|
|   2046|      517|     5|
|   1773|     7435|     5|
|   1773|      278|     4|
|   2046|     3431|     5|
|   2046|    13307|     5|
|   2312|      780|     5|
|   2312|    51964|     5|
|   2312|     1232|     4|
|   2312|     4397|     5|
|   2625|      471|     3|
|   2312|      164|     5|
|   2999|     3567|     5|
|   2178|     3704|     3|
|   2178|     4366|     5|
|   3794|     7508|     4|
|   3794|      191|     5|
|   3794|     3525|     5|
|   2312|     3651|     5|
|   2695|      350|     1|
+-------+---------+------+
only showing top 20 rows



In [0]:
df=df.filter(~(df['rating']==0))
df.show()

+-------+---------+------+
|user_id|recipe_id|rating|
+-------+---------+------+
|   2046|     4684|     5|
|   2046|      517|     5|
|   1773|     7435|     5|
|   1773|      278|     4|
|   2046|     3431|     5|
|   2046|    13307|     5|
|   2312|      780|     5|
|   2312|    51964|     5|
|   2312|     1232|     4|
|   2312|     4397|     5|
|   2625|      471|     3|
|   2312|      164|     5|
|   2999|     3567|     5|
|   2178|     3704|     3|
|   2178|     4366|     5|
|   3794|     7508|     4|
|   3794|      191|     5|
|   3794|     3525|     5|
|   2312|     3651|     5|
|   2695|      350|     1|
+-------+---------+------+
only showing top 20 rows



In [0]:
df.groupBy("rating").count().show(100)
df.registerTempTable("df");
spark.sql("select count(*) , rating from df group by rating order by rating").show(100)

+------+------+
|rating| count|
+------+------+
|     1|  3341|
|     3| 25781|
|     5|518568|
|     4|127402|
|     2|  6852|
+------+------+

+--------+------+
|count(1)|rating|
+--------+------+
|    3341|     1|
|    6852|     2|
|   25781|     3|
|  127402|     4|
|  518568|     5|
+--------+------+



In [0]:
rddTraining, rddValidating, rddTesting = df.randomSplit([6.0,2.0,2.0], seed=1001)

In [0]:
df.take(10)

Out[52]: [Row(user_id=2046, recipe_id=4684, rating=5),
 Row(user_id=2046, recipe_id=517, rating=5),
 Row(user_id=1773, recipe_id=7435, rating=5),
 Row(user_id=1773, recipe_id=278, rating=4),
 Row(user_id=2046, recipe_id=3431, rating=5),
 Row(user_id=2046, recipe_id=13307, rating=5),
 Row(user_id=2312, recipe_id=780, rating=5),
 Row(user_id=2312, recipe_id=51964, rating=5),
 Row(user_id=2312, recipe_id=1232, rating=4),
 Row(user_id=2312, recipe_id=4397, rating=5)]

In [0]:


# reference: https://github.com/Akxay/recommendation_engine/blob/master/Jobs_RE_spark.ipynb
#Using function to compare the best possible parameters
def howFarAreWe(model, against, sizeAgainst):
    againstNoRatings = against.rdd.map(lambda x: (int(x[0]), int(x[1])) )
    againstWiRatings = against.rdd.map(lambda x: ((int(x[0]),int(x[1])), int(x[2])) )
    predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p[0],p[1]), p[2]) )
    predictionsAndRatings = predictions.join(againstWiRatings).values()    
    return sqrt(predictionsAndRatings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(sizeAgainst))

<h3> 2. Finding the best parameters </h3>

In [0]:
#finding best set of parameters
ranks  = [10,20]
iters  = [10,20]
alpha = [0.02,0.03]

finalModel = None
finalRank  = 0
finalIter  = -1
finalDist   = float(300)
finalAlpha = float(0)

#[START train_model]
for cRank, cIter, cAlpha in itertools.product(ranks, iters, alpha):
    model = ALS.trainImplicit(rddTraining, cRank, cIter, alpha=float(cAlpha))
    dist = howFarAreWe(model, rddValidating, rddValidating.count())
    if dist < finalDist:
        print(cIter, cRank,cAlpha)
        print("Best so far:%f" % dist)
        finalModel = model
        finalRank  = cRank
        finalIter  = cIter
        finalDist  = dist
        finalAlpha  = cAlpha 
#[END train_model]

print("Rank %i" % finalRank)  
print("Iter %i" % finalIter)  
print("Dist %f" % finalDist) 
print("Alpha %f" % finalAlpha)

10 10 0.02
Best so far:4.340819
10 10 0.03
Best so far:4.340572
20 10 0.03
Best so far:4.340524
10 20 0.03
Best so far:4.340310
Rank 20
Iter 10
Dist 4.340310
Alpha 0.030000


<h3> Training the model </h3>

In [0]:


rank = 20   
# The size of the feature vector used; the minimum value is 10, the minimum value of the feature vector, 
#the better the model produced, but it also costs more calculation cost
numIterations = 10
#Iteration numbers
alpha=0.03
#model = ALS.trainImplicit(rddTraining, 10, 10,alpha=0.01) #18.99407447555247
#model = ALS.trainImplicit(rddTraining, 10, 10,alpha=0.5) #18.88809845032351
#model = ALS.train(rddTraining, 10, 10) #29.13044908080498
model = ALS.trainImplicit(rddTraining, rank, numIterations, alpha) #18.96886550953952




<h3> Model Evaluation </h3>

In [0]:

rddTesting_withoutrating = rddTesting.rdd. map(lambda r: ((r[0], r[1])))
predictions = model.predictAll(rddTesting_withoutrating).map(lambda r: ((r[0], r[1]), (r[2])))
predictions.take(3)

Out[56]: [((503238, 408472), 7.603174417487346e-05),
 ((37449, 296296), 0.000269961991145495),
 ((27643, 15392), 0.00017068636375966264)]

In [0]:
rates_and_preds = rddTesting.rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 

In [0]:


error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) 
print ('For testing data the RMSE is %s' % (error))



For testing data the RMSE is 4.738276611676369
