

## MOVIE RECCOMENDER SYSTEM



In [0]:
#We are building a movie reccomendation system for the MovieLens dataset. 
#First, install spark and other necessary packages

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q http://apache.mirrors.pair.com/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

In [52]:
!ls

sample_data		   spark-2.4.4-bin-hadoop2.7.tgz
smallMovies.csv		   spark-2.4.4-bin-hadoop2.7.tgz.1
smallRatings.csv	   spark-2.4.4-bin-hadoop2.7.tgz.2
spark-2.4.4-bin-hadoop2.7


In [53]:
!tar -xvf spark-2.4.4-bin-hadoop2.7.tgz

spark-2.4.4-bin-hadoop2.7/
spark-2.4.4-bin-hadoop2.7/R/
spark-2.4.4-bin-hadoop2.7/R/lib/
spark-2.4.4-bin-hadoop2.7/R/lib/sparkr.zip
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/INDEX
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/html/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/html/R.css
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/html/00Index.html
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/aliases.rds
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/AnIndex
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/SparkR.rdx
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/SparkR.rdb
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/help/paths.rds
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/worker/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/worker/worker.R
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/worker/daemon.R
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/tests/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/tests/testthat/
spark-2.4.4-bin-hadoop2.7/R/lib/SparkR/tests/testthat/te

In [54]:
!ls 

sample_data		   spark-2.4.4-bin-hadoop2.7.tgz
smallMovies.csv		   spark-2.4.4-bin-hadoop2.7.tgz.1
smallRatings.csv	   spark-2.4.4-bin-hadoop2.7.tgz.2
spark-2.4.4-bin-hadoop2.7


In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [58]:
df = spark.createDataFrame([{"Google": "Colab","Spark": "Scala"} ,{"Google": "Dataproc","Spark":"Python"}])
df.show()



+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



**IMPORTING SPARK CONTEXT AND NECESSARY PACKAGES**

In [0]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import HiveContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [0]:
from pyspark.mllib.recommendation import ALS
import math
import pandas as pd
from time import time
from pyspark.mllib.recommendation import MatrixFactorizationModel

# **LOADING THE DATA**





In [0]:
#We will be using Alternating Least Squares algorithm for collaborative filtering, therefore, first we need to learn the parameters of the model.
#The files have been uploaded directly to this Google Colab notebook rather than Google Drive for ease of use.
#For this, we first load the small data set.

smallRatings_raw = sc.textFile('smallRatings.csv')

#Filter the header out
smallRatings_raw_header = smallRatings_raw.take(1)[0]                 

In [0]:
#Now we convert this data into a RDD using a mapper function which we learned in class

smallRatings_data = smallRatings_raw.filter(lambda line: line!=smallRatings_raw_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [63]:
#Let's look at how this data looks. 
#Print the first 10 lines of this small ratings data
smallRatings_data.take(10)

[('1', '1', '4.0'),
 ('1', '3', '4.0'),
 ('1', '6', '4.0'),
 ('1', '47', '5.0'),
 ('1', '50', '5.0'),
 ('1', '70', '3.0'),
 ('1', '101', '5.0'),
 ('1', '110', '4.0'),
 ('1', '151', '5.0'),
 ('1', '157', '5.0')]

In [0]:
#Now we can perform the same steps with the movie dataset
smallMovies_raw = sc.textFile('smallMovies.csv')
#Filter the header out
smallMovies_raw_header = smallMovies_raw.take(1)[0]

In [65]:
#Now we convert this data into a RDD using a mapper function which we learned in class
smallMovies_data = smallMovies_raw.filter(lambda line: line!=smallMovies_raw_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
 #Viewing the first 10 lines of this small movies data set   
smallMovies_data.take(10)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)')]

## **SPLITTING DATA INTO TRAIN , VALIDATION AND TEST**

In [0]:

#We split the data into 70% training data, 15% validation data, 15% test data
trainRDD_small, valRDD_small, testRDD_small = smallRatings_data.randomSplit([7, 1.5, 1.5], seed=123)
validatePredict_small = valRDD_small.map(lambda x: (x[0], x[1]))
testPredict_small = testRDD_small.map(lambda x: (x[0], x[1]))

# **TRAINING THE MODEL**

**LEARN THE BEST ALS PARAMETERS**

In [0]:
seed = 123
iterations = 13
regularization_parameter = 0.025
ranks = [6, 8, 10]
errors = [0, 0, 0]
err = 0
tolerance = 0.01


minError = float('inf')
bestRank = -1
bestIteration = -1


**Collaborative Filtering**

In [68]:
#Here, we perform the iterative Alternative Least Square Procedure to predict the ratings of the movies
#We find the Root Mean Squared Error of the Model


for rank in ranks:
    model1 = ALS.train(trainRDD_small, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    preds = model1.predictAll(validatePredict_small).map(lambda r: ((r[0], r[1]), r[2]))    #predicting the ratings
    ratings_predictions = valRDD_small.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(preds)  #joining the actual ratings and predicted in the validation set
    error = math.sqrt(ratings_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean()) #Obtaining RMSE
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < minError:
        minError = error
        bestRank = rank

print('The best model was trained with rank %s' % bestRank)

For rank 6 the RMSE is 1.0111251560019323
For rank 8 the RMSE is 1.044171049363525
For rank 10 the RMSE is 1.060494299914958
The best model was trained with rank 6


In [69]:
#The ratings dataset had the columns: UserID | MovieID | Ratings 
#In the above loop, the preds variable holds the predictions for these 'Ratings' in the dataset
#Let us look at first 5 predictions for these ratings predicted by the ALS model

preds.take(5)   #Note that the ratings of the movie is out of 5

[((315, 1084), 2.9636507944147628),
 ((452, 3702), 4.058619884396285),
 ((18, 3702), 3.4063590681850293),
 ((290, 3702), 3.3933133154139217),
 ((380, 3702), 4.369506612892415)]

In [70]:
#In the above loop, we also joined the ratings and the predictions in our Validation dataset
#Let us look at this together.

ratings_predictions.take(5)

[((1, 733), (4.0, 4.5331440557202285)),
 ((1, 1377), (3.0, 3.865324057067811)),
 ((1, 2529), (5.0, 4.450967640137541)),
 ((1, 2641), (5.0, 4.053473053499157)),
 ((1, 2985), (4.0, 3.99912376414666))]

## **TESTING OUR MODEL**

In [71]:
#Here we will test the model that we selected based on lowest RMSE
model1_test = ALS.train(trainRDD_small, rank, seed=seed, iterations=iterations, 
                        lambda_=regularization_parameter)
preds_test = model1_test.predictAll(testPredict_small).map(lambda r: ((r[0], r[1]), r[2]))
rates_predictions_test = testRDD_small.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(preds_test)
error = math.sqrt(rates_predictions_test.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print('For the Test RDD the RMSE is %s' % (error))


For the Test RDD the RMSE is 1.0630334164757704


In [0]:
#The performance is not that great but it is not bad either, so we carry on to the next step.

## **BUILDING RECOMMENDER SYSTEM MODEL**

In [0]:
#We have trained and chosen the best model with the partial dataset
#Now to build our final recommender system model, we will work with the complete dataset.



**Load the complete dataset**

In [0]:
#Load the complete movies dataset
completeMovies_raw = sc.textFile('fullMovies.csv')
#Remove header of this data
completeMovies_raw_header = completeMovies_raw.take(1)[0]


In [0]:
#Convert the data into RDD
completeMovies_data = completeMovies_raw.filter(lambda line: line!=completeMovies_raw_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()


In [80]:
#We will not use this RDD unitl later but lets look at the first 15 lines
completeMovies_data.take(15)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)'),
 ('4', 'Waiting to Exhale (1995)'),
 ('5', 'Father of the Bride Part II (1995)'),
 ('6', 'Heat (1995)'),
 ('7', 'Sabrina (1995)'),
 ('8', 'Tom and Huck (1995)'),
 ('9', 'Sudden Death (1995)'),
 ('10', 'GoldenEye (1995)'),
 ('11', '"American President'),
 ('12', 'Dracula: Dead and Loving It (1995)'),
 ('13', 'Balto (1995)'),
 ('14', 'Nixon (1995)'),
 ('15', 'Cutthroat Island (1995)')]

In [117]:
completeMovies_titles = completeMovies_data.map(lambda x: (int(x[0]),x[1]))
completeMovies_titles.take(2)


[(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)')]

In [103]:
#Getting a count of the number of movies in this dataset.
print('The number of ratings observations in the complete dataset is: %s'%(completeMovies_titles.count()))

The number of ratings observations in the complete dataset is: 58098


In [0]:
#load the complete ratings dataset which is used for building the model
completeRatings_raw =sc.textFile('fullRatings.csv')

In [0]:
#Remove the header in this data
completeRatings_raw_header = completeRatings_raw.take(1)[0]

In [0]:
#Convert into RDD

completeRatings_data = completeRatings_raw.filter(lambda line: line!=completeRatings_raw_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()


In [84]:
#Lets look at first 15 lines of this data
completeRatings_data.take(15)

[('1', '307', '3.5'),
 ('1', '481', '3.5'),
 ('1', '1091', '1.5'),
 ('1', '1257', '4.5'),
 ('1', '1449', '4.5'),
 ('1', '1590', '2.5'),
 ('1', '1591', '1.5'),
 ('1', '2134', '4.5'),
 ('1', '2478', '4.0'),
 ('1', '2840', '3.0'),
 ('1', '2986', '2.5'),
 ('1', '3020', '4.0'),
 ('1', '3424', '4.5'),
 ('1', '3698', '3.5'),
 ('1', '3826', '2.0')]

In [85]:
#This data is huge so let us get the count of ratings dataset just for some information
print('The number of ratings observations in the complete dataset is: %s'%(completeRatings_data.count()))

The number of ratings observations in the complete dataset is: 27753444


 **TRAIN THE RECOMMENDER SYSTEM MODEL**

In [0]:
#First we split the data in train set and test set 
#Splot: Training 75% and Test 25%

train_complete , test_complete = completeRatings_data.randomSplit([7.5,2.5], seed=1)

#ALS model train
model2 = ALS.train(train_complete, bestRank, seed=seed, iterations=iterations, 
                   lambda_=regularization_parameter)


**TEST THE MODEL**

In [88]:
testPredict_complete = test_complete.map(lambda x: (x[0], x[1]))
preds2 = model2.predictAll(testPredict_complete).map(lambda r: ((r[0], r[1]), r[2]))   #predicting the ratings
rates_predictions_test2 = test_complete.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(preds2) #combining ratings and predicted ratings
error2 = math.sqrt(rates_predictions_test2.map(lambda r: (r[1][0] - r[1][1])**2).mean())  #Calculating RMSE

print ('Test Data RMSE = %s' % (error2))

Test Data RMSE = 0.8243178639739481


In [0]:
#Also consider the case when there is a new user
#We will first assign some general rating for this new user on our own

newUser_ID = 0
#The ratings dataset is of the form UserID | MovieID | Rating
#Randomly assign new ratings for movies
newUser_ratings = [(0,3,3), (0,1,5), (0,6,4), (0,47,3), (0,50,4), (0,70,5), (0,101,4), (0,110,5), (0,151,2) , (0,260,5) ]

newUser_ratings_RDD = sc.parallelize(newUser_ratings)

In [143]:
#Printing the first 10 lines of this RDD

newUser_ratings_RDD.take(10)

[(0, 3, 3),
 (0, 1, 5),
 (0, 6, 4),
 (0, 47, 3),
 (0, 50, 4),
 (0, 70, 5),
 (0, 101, 4),
 (0, 110, 5),
 (0, 151, 2),
 (0, 260, 5)]

In [0]:
CompleteData = completeRatings_data.union(newUser_ratings_RDD) #Adding new user ratings to originial data


In [0]:
#train the model for every new user
model3 = ALS.train(CompleteData, bestRank, seed = seed, iterations  = iterations, 
                   lambda_ = regularization_parameter)

## **GENERATE THE TOP RECOMMENDATION FOR USER**

In [0]:
newUser_ratingsID = map(lambda x: x[1], newUser_ratings )

#Now looking at the movies the new user has not rated
newUser_unratedMovies = (completeMovies_data.filter(lambda x: x[0] not in newUser_ratingsID ).map(lambda x: (newUser_ID, x[0])))


In [169]:
#Now let us focus on predicting these ratings for unrated movies
newUserRecommendations = model3.predictAll(newUser_unratedMovies)

newUserRecommendations.take(2)

[Rating(user=0, product=116688, rating=1.152886852958059),
 Rating(user=0, product=32196, rating=3.8005802511204148)]

In [0]:
#As we can see the format of the output isnt quite clear and right
#We can transform this into pairs of (MovieId , PredictedRating) instead of product and rating
newUser_recommendationsRating = newUserRecommendations.map(lambda x: (x.product, x.rating))


In [0]:
newUser_recommendationsTating_titleCount= newUser_recommendationsRating.join(completeMovies_titles)


In [172]:
#Generating top 10 recommended movies for the user
newUser_recommendationsTating_titleCount.take(10)

[(138348, (1.724403424193519, 'Niente paura (2010)')),
 (158088, (3.577841262469132, 'The Circle (1985)')),
 (82068, (1.573676378940314, 'Police (1985)')),
 (160104, (0.5469756473073182, 'Ribbit (2014)')),
 (1596, (2.1183117658983366, 'Career Girls (1997)')),
 (127176, (1.2999547807012686, 'The First Movie (2009)')),
 (145992, (1.9330663788785003, 'The House That Swift Built (1982)')),
 (1344, (3.7975602402937167, 'Cape Fear (1962)')),
 (7560, (3.608453253838011, 'Fail-Safe (1964)')),
 (50064, (2.1945220324476686, '"Good German'))]

In [173]:
#Extending this to generate individual ratings for one particular movie
#Say we want to get the prediction of MovieID 10

#We can do this now

movie = sc.parallelize([(0,10)])
movie_rating = model3.predictAll(newUser_unratedMovies)
movie_rating.take(1)

[Rating(user=0, product=116688, rating=1.152886852958059)]

In [0]:
#That movie rating has been predicted to have a rating of 1.152