In [2]:
#Importing pyspark through findspark
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\Spark\\spark-3.0.0-bin-hadoop2.7'

In [3]:
#Loading the required libraries
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import udf,col,when
from IPython.display import Image
from IPython.display import display
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
#Initializing SPark Session
spark = SparkSession.builder.appName('MovieRecommender').getOrCreate()

In [5]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [6]:
#Loading the data containing moview ratings into ratings dataframe
ratingsDf = spark.read.csv('ratings.csv', inferSchema=True, header=True)
ratingsDf.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [7]:
#To use select querries
ratingsDf.registerTempTable("RatingsTable")
spark.sql("SELECT * FROM RatingsTable").show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [8]:
#Moading the movies data into the movies dataframe
moviesDf = spark.read.csv('movies.csv', inferSchema=True, header=True)
moviesDf.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [9]:
moviesDf.registerTempTable("MoviesTable")
spark.sql("select * from MoviesTable WHERE movieId IN (33649, 4429, 3379, 102217, 92494)").show(5, False)

+-------+------------------------------+--------------------+
|movieId|title                         |genres              |
+-------+------------------------------+--------------------+
|3379   |On the Beach (1959)           |Drama               |
|4429   |Moby Dick (1956)              |Drama               |
|33649  |Saving Face (2004)            |Comedy|Drama|Romance|
|92494  |Dylan Moran: Monster (2004)   |Comedy|Documentary  |
|102217 |Bill Hicks: Revelations (1993)|Comedy              |
+-------+------------------------------+--------------------+



In [10]:
#Loading the data linking the movie ids to the imbdId and tmdbId into links dataframe
linksDf = spark.read.csv("links.csv", inferSchema=True, header=True)
linksDf.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [11]:
linksDf.registerTempTable("LinksTable")
spark.sql("SELECT * FROM LinksTable").show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [12]:
trainDf, validationDF = ratingsDf.randomSplit([0.8,0.2])

**ALS Algorithm:**
Here we are using the alternate least square algorithm which is a matrix factorization algorithm. Here in the original ratings matrix, the userID is taken on the Y-axis and the MovieID is taken on the X-axis and the corresponding ratings are populated in the matrix. In the ALS algorithm, a ratings matrix of this dimensions is randomly initialized and broken down into two smaller matrices namely the user matrix and the movie matrix, based on the parameter "rank". Further, based on the L2 regularization parameter called "regParam", the algorithm alternatively adjusts the values in the user matrix and the movie matrix using gradient descent on each iteration defined by the parameter "maxIter" in order to reduce the error between the expected values and the predicted values of the ratings. 

In [13]:
#Defining a parameter grid to train the model in order to pick the best parameters reducing the RMSE. Lesser the RMSE, better the model. 
errors = []
err = 0
als = ALS(maxIter=10,userCol="userId", itemCol="movieId", ratingCol="rating")
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.1, 0.01, 0.18]) \
    .addGrid(als.rank, range(4,10)) \
    .build()
        
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
crossVal = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossVal.fit(trainDf)

In [15]:
bestModel = cvModel.bestModel
predictions = bestModel.transform(validationDF)
filteredPredictions = predictions.filter(col('prediction') != np.nan)
rmse = evaluator.evaluate(filteredPredictions)
print("Root mean Squared Error is", str(rmse))

Root mean Squared Error is 0.8819449456636101


In [16]:
filteredPredictions.show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   603|    471|   4.0| 954482443| 3.1469011|
|   500|    471|   1.0|1005528017| 3.4345999|
|    57|    471|   3.0| 969753604| 3.7028093|
|   217|    471|   2.0| 955943727| 3.3162818|
|   448|    471|   4.0|1178980875| 3.5546632|
|   216|    471|   3.0| 975212641| 2.9665217|
|    32|    471|   3.0| 856737165|  4.231817|
|   469|    471|   5.0| 965425364| 3.6266844|
|   260|    471|   4.5|1109409455|   3.66671|
|   492|    833|   4.0| 863976674| 1.5273117|
+------+-------+------+----------+----------+
only showing top 10 rows



In [17]:
filteredPredictions.registerTempTable("PredictionsTable")

In [29]:
#Generating top 5 recommendation for a user with user ID 498
spark.sql("SELECT userID, p.movieId, rating, title, genres, tmdbId, prediction FROM ((PredictionsTable p INNER JOIN MoviesTable m on p.movieId == m.movieId) INNER JOIN LinksTable l on p.movieId == l.movieId) WHERE userID = 498").show(5, False)

+------+-------+------+-----------------------------+----------------------------------------+------+----------+
|userID|movieId|rating|title                        |genres                                  |tmdbId|prediction|
+------+-------+------+-----------------------------+----------------------------------------+------+----------+
|498   |296    |4.0   |Pulp Fiction (1994)          |Comedy|Crime|Drama|Thriller             |680   |4.043855  |
|498   |161    |5.0   |Crimson Tide (1995)          |Drama|Thriller|War                      |8963  |4.3655796 |
|498   |380    |3.0   |True Lies (1994)             |Action|Adventure|Comedy|Romance|Thriller|36955 |3.9908948 |
|498   |434    |3.0   |Cliffhanger (1993)           |Action|Adventure|Thriller               |9350  |3.8969991 |
|498   |585    |4.0   |Brady Bunch Movie, The (1995)|Comedy                                  |9066  |3.1676679 |
+------+-------+------+-----------------------------+----------------------------------------+--

We can observe that the the user with ID 498 tends to like Action/Thriller movies and the top 5 recommended movies for him/her are Pulp Fiction, Crimson Tide, True Lies, CLiffhanger and THe Brady Bunch Movie.

In [19]:
#Generating top 5 recommendations for each user.
userRecommendations = bestModel.recommendForAllUsers(5)
userRecommendations.registerTempTable("RecommendationTable")
spark.sql("SELECT userID, recommendations.movieID FROM RecommendationTable").show(10, False)

+------+-----------------------------------+
|userID|movieID                            |
+------+-----------------------------------+
|471   |[8477, 6818, 3379, 102217, 92494]  |
|463   |[5075, 33649, 84847, 3379, 104875] |
|496   |[3379, 25947, 8477, 2070, 26326]   |
|148   |[104875, 33649, 67618, 5075, 84273]|
|540   |[3379, 136469, 59018, 33649, 32892]|
|392   |[59018, 3379, 141718, 32892, 69524]|
|243   |[5075, 84847, 72171, 59018, 131724]|
|31    |[104875, 33649, 3379, 8477, 92494] |
|516   |[51931, 6201, 4495, 25906, 77846]  |
|580   |[5075, 32892, 59018, 141718, 1696] |
+------+-----------------------------------+
only showing top 10 rows



In [21]:
#Ratings predictions of a specific movie for specific users
movieIDs = [1580, 2366, 1590, 3175]
userIDs = [543, 543, 543, 543]
newUserPreds = sqlContext.createDataFrame(zip(movieIDs,userIDs), schema=['movieId','userId'])
newPredictions = bestModel.transform(newUserPreds)
newPredictions.show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1580|   543|  4.545098|
|   2366|   543| 3.5906284|
|   3175|   543| 4.3188796|
|   1590|   543|  2.389799|
+-------+------+----------+

