## ** Building a large scale Movie Recommendation Engine using pyspark **

In [81]:
import os
import math
import datetime
import pyspark.sql.functions as sf
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import TimestampType

In [2]:
# conf = SparkConf().setAppName("MovieRecommendationEngine")
# sc = SparkContext(conf=conf)
sc

<pyspark.context.SparkContext at 0x7f9621678550>

In [3]:
# load Ratings file into df
datasets_path=os.getcwd() + "/RE_data"
ratings_file = os.path.join(datasets_path, 'ratings.csv')
ratings_raw_data = sc.textFile("file:///" + ratings_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]
ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),int(float(tokens[2])))).cache()

print "train size is ", ratings_data.count()

train size is  100004


In [5]:
# load courses file into df
movies_file = os.path.join(datasets_path, 'movies.csv')
movies_raw_data = sc.textFile("file:///" + movies_file)
movies_raw_data_header = movies_raw_data.take(1)[0]

movies_data = movies_raw_data.filter(lambda line: line!=movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1])).cache()
movies_data.take(3)

[(1, u'Toy Story (1995)'),
 (2, u'Jumanji (1995)'),
 (3, u'Grumpier Old Men (1995)')]

### Split data into train, validation and test datasets

In [15]:
rddTraining, rddValidating, rddTesting = ratings_data.randomSplit([6,2,2], seed=1001)

#Add user ratings in the training model
nbValidating = rddValidating.count()
nbTesting    = rddTesting.count()

print("Training: %d, validation: %d, test: %d" % (rddTraining.count(), nbValidating, rddTesting.count()))

Training: 59824, validation: 20190, test: 19990


### Model Training

In [16]:
#[START how_far]
def howFarAreWe(model, against, sizeAgainst):
  # Ignore the rating column  
    againstNoRatings = against.map(lambda x: (int(x[0]), int(x[1])) )

  # Keep the rating to compare against
    againstWiRatings = against.map(lambda x: ((int(x[0]),int(x[1])), int(x[2])) )

  # Make a prediction and map it for later comparison
  # The map has to be ((user,product), rating) not ((product,user), rating)
    predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p[0],p[1]), p[2]) )

  # Returns the pairs (prediction, rating)
    predictionsAndRatings = predictions.join(againstWiRatings).values()    
  # Returns the variance
    return sqrt(predictionsAndRatings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(sizeAgainst))
#[END how_far]

In [17]:
# Best results are not commented
ranks  = [5,10,15,20]
reguls = [0.1, 1,10]
iters  = [5,10,20]

finalModel = None
finalRank  = 0
finalRegul = float(0)
finalIter  = -1
finalDist   = float(100)

#[START train_model]
for cRank, cRegul, cIter in itertools.product(ranks, reguls, iters):
    model = ALS.train(rddTraining, cRank, cIter, float(cRegul))
    dist = howFarAreWe(model, rddValidating, nbValidating)
    if dist < finalDist:
        print("Best so far:%f" % dist)
        finalModel = model
        finalRank  = cRank
        finalRegul = cRegul
        finalIter  = cIter
        finalDist  = dist
        break
#[END train_model]

print("Rank %i" % finalRank) 
print("Regul %f" % finalRegul) 
print("Iter %i" % finalIter)  
print("Dist %f" % finalDist) 

"""Rank 5
Regul 0.100000
Iter 20
Dist 0.959326 """

Best so far:0.956083
Rank 5
Regul 0.100000
Iter 5
Dist 0.956083


'Rank 5\nRegul 0.100000\nIter 20\nDist 0.959326 '

In [18]:
# Calculate all predictions
rddTesting_withoutRating = rddTesting.map(lambda r: ((r[0], r[1])))
predictions = model.predictAll(rddTesting_withoutRating).map(lambda r: ((r[0], r[1]), (r[2])))
predictions.take(3)
# user id, node_id, actual ratings,pred ratings -> df below
rates_and_preds = rddTesting.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 
rates_and_preds.take(3)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.990409526588


In [19]:
from pyspark.sql import SparkSession
x = rates_and_preds.map(lambda x : (x[0][0],x[0][1],x[1][0],x[1][1]))
hasattr(x, "toDF")
x.toDF().show(4)

+---+-----+---+------------------+
| _1|   _2| _3|                _4|
+---+-----+---+------------------+
|652| 3466|4.0|3.0382546769711913|
|563|44191|4.0|3.9195343187988336|
|239| 2805|2.0|3.8174192794860202|
|468|55830|1.0| 2.272269418311961|
+---+-----+---+------------------+
only showing top 4 rows



### Get total rating and average rating given to each movie by different users

In [31]:
def get_counts_and_averages(ID_and_ratings_tuple):    
    nratings = len(ID_and_ratings_tuple[1]) 
    return ID_and_ratings_tuple[0], (nratings, sum([float(val) for val in ID_and_ratings_tuple[1]])/nratings)

In [63]:
movie_ID_with_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_ratings_RDD_updated = movie_ID_with_ratings_RDD.map(lambda x : (x[0], list(x[1])))
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD_updated.map(get_counts_and_averages)  # count and average rating
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (int(x[0]), x[1][0]))    # rating count per movie
movie_rating_counts_RDD.cache()
movie_rating_counts_RDD.take(3)

[(2, 107), (97328, 1), (4, 13)]

In [61]:
# get user-wise movies watched
all_users_ratings_RDD = ratings_data.map(lambda x: (x[0], x[1])).groupByKey()
all_users_ratings_RDD = all_users_ratings_RDD.map(lambda x : (x[0], list(x[1])))    # movies watched by each user

### finding unrated movies by each user- we will use this set for model's prediction/recommendations
movie_ids = set(movies_data.map(lambda x : x[0]).toLocalIterator()) # list of all movie ids
unrated_movies_RDD = all_users_df_ratings_RDD.map(lambda x: (x[0], list((movie_ids) - set(x[1]))))

# #create user_id and unrated course id pairs
unrated_usermovies_RDD = unrated_movies_RDD.flatMap(lambda x : [(x[0],i) for i in x[1]])

# # #model predictions for each user and unrated course pairs
print "model is ", model
recommendations_RDD = model.predictAll(unrated_usermovies_RDD)
recommended_movies_rating_RDD = recommendations_RDD.map(lambda x: (x.product,(x.user, x.rating)))
recommended_movies_rating_RDD.cache()
print recommended_movies_rating_RDD.take(10)

model is  <pyspark.mllib.recommendation.MatrixFactorizationModel object at 0x7f95e16625d0>
[(1084, (68, 3.2472970943697335)), (1084, (384, 3.267616431660904)), (1084, (440, 3.6721395703361432)), (1084, (4, 4.7272411413689905)), (1084, (600, 3.487342021216037)), (1084, (324, 3.456354607922227)), (1084, (180, 3.4447698460790717)), (1084, (340, 3.609319500741269)), (1084, (320, 3.3474256182772333)), (1084, (412, 2.812540921278155))]


### Joining movie title and total number of ratings received by each movie for further filtering recommendations

In [66]:
# #     # converting id into int for course_ratings_count RDD to perform join
# movie_rating_counts_RDD_updated = movie_rating_counts_RDD.map(lambda x: (int(x[0]), x[1]))

# join movie name with movie id, predicted rating for movie and total number of ratings received by each movie
recommendations_rating_title_and_count_RDD = recommended_movies_rating_RDD.join(movies_data).join(movie_rating_counts_RDD)
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0][0],round(r[1][0][0][1],2),r[1][1]))
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda x: (x[1],(x[0],x[2],x[3])))
recommendations_rating_title_and_count_RDD.take(3)

[(68, (u'"Great Mouse Detective', 2.9, 4)),
 (384, (u'"Great Mouse Detective', 2.83, 4)),
 (440, (u'"Great Mouse Detective', 3.13, 4))]

## ** Top 5 recommendations **

In [83]:
# filter only those courses which have been rated by atleast 20 users
# take only top5 courses by sorting based on predicted ratings
top_movies = recommendations_rating_title_and_count_RDD.groupBy(lambda x : x[0])\
                               .map(lambda x : list(x[1]))\
                               .map(lambda r: [i for i in r if i[1][2] > 20])\
                               .map(lambda a: [i for i in sorted(a, key=lambda x: -x[1][1])[:5]])   

#preparing dataframe to insert in Database
rec_movies_df = top_movies.map(lambda x: [(i[0],i[1][0],i[1][1],i[1][2]) for i in x]).flatMap(lambda x: x).toDF()\
                                .withColumnRenamed("_1", "user_id")\
                                .withColumnRenamed("_2", 'recommendations')\
                                .withColumnRenamed("_3", 'predicted_ratings')\
                                .withColumnRenamed("_4", "total_ratings")
                
#final recommendation engine dataframe to be saved in Database
final_df_rec_eng = rec_movies_df.withColumn("rec_date", sf.lit(datetime.datetime.now()).cast(TimestampType()))   
final_df_rec_eng = final_df_rec_eng.withColumn("rec_number", sf.row_number().over(Window.partitionBy("user_id").orderBy(desc("predicted_ratings"))))    

final_df_rec_eng.show(10)

+-------+--------------------+-----------------+-------------+--------------------+----------+
|user_id|     recommendations|predicted_ratings|total_ratings|            rec_date|rec_number|
+-------+--------------------+-----------------+-------------+--------------------+----------+
|     26|Midnight in Paris...|             4.46|           21|2018-02-25 15:34:...|         1|
|     26| Moulin Rouge (2001)|             4.09|           55|2018-02-25 15:34:...|         2|
|     26|      "African Queen|             4.08|           50|2018-02-25 15:34:...|         3|
|     26|      Serpico (1973)|             4.06|           27|2018-02-25 15:34:...|         4|
|     26|      Tangled (2010)|              4.0|           21|2018-02-25 15:34:...|         5|
|     29|   "Name of the Rose|             4.35|           23|2018-02-25 15:34:...|         1|
|     29|Midnight in Paris...|             4.31|           21|2018-02-25 15:34:...|         2|
|     29|"Lord of the Ring...|             4.29|  

### Saving the model

In [None]:
# from pyspark.mllib.recommendation import MatrixFactorizationModel

# model_path = os.path.join('..', 'models', 'movie_lens_als')

# # Save and load model
# model.save(sc, model_path)
# same_model = MatrixFactorizationModel.load(sc, model_path)