# Movie Recommendation System on Movie Lens Dataset 



MovieLens data sets were collected by the GroupLens Research Project
at the University of Minnesota.
 
This data set consists of:
	* 100,000 ratings (1-5) from 943 users on 1682 movies. 
	* Each user has rated at least 20 movies. 

The data was collected through the MovieLens web site
(movielens.umn.edu) during the seven-month period from September 19th, 
1997 through April 22nd, 1998. This data has been cleaned up - users
who had less than 20 ratings or did not have complete demographic
information were removed from this data set.


CITATION
==============================================

To acknowledge use of the dataset in publications, please cite the
following paper:

F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets:
History and Context. ACM Transactions on Interactive Intelligent
Systems (TiiS) 5, 4, Article 19 (December 2015), 19 pages.
DOI=http://dx.doi.org/10.1145/2827872

ACKNOWLEDGEMENTS
==============================================

Thanks to Al Borchers for cleaning up this data and writing the
accompanying scripts.

dataset can also be downloaded at : (https://www.kaggle.com/rajmehra03/movielens100k?select=ratings.csv)

In [0]:
#import the required functions and libraries
from pyspark.sql.functions import *
#import String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Preparing Dataset

In [0]:
#load the dataset and create sprk dataframe
movies_df = spark.read.csv("dbfs:/FileStore/shared_uploads/movies.csv",inferSchema=True,header=True)

In [0]:
#validate the shape of the data 
print((movies_df.count(),len(movies_df.columns)))

(9125, 3)


In [0]:
#check columns in dataframe
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
#validate few rows of dataframe in random order
movies_df.orderBy(rand()).show(10, False)

+-------+------------------------------------------------+----------------------------+
|movieId|title                                           |genres                      |
+-------+------------------------------------------------+----------------------------+
|3239   |Isn't She Great? (2000)                         |Comedy                      |
|7062   |Birdman of Alcatraz (1962)                      |Drama                       |
|5481   |Austin Powers in Goldmember (2002)              |Comedy                      |
|104129 |Man of Tai Chi (2013)                           |Action|IMAX                 |
|8989   |Damn Yankees! (1958)                            |Comedy|Musical              |
|26007  |Unknown Soldier, The (Tuntematon sotilas) (1955)|Drama|War                   |
|7160   |Monster (2003)                                  |Crime|Drama                 |
|7938   |Winter Light (Nattvardsgästerna) (1963)         |Drama                       |
|92535  |Louis C.K.: Live at the

In [0]:
ratings_df = spark.read.csv("dbfs:/FileStore/shared_uploads/ratings.csv", inferSchema = True, header = True)

In [0]:
#validate the shape of the data 
print((ratings_df.count(),len(ratings_df.columns)))

(100004, 4)


In [0]:
#check columns in dataframe
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [0]:
#validate few rows of dataframe in random order
ratings_df.orderBy(rand()).show(10, False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|282   |3578   |4.0   |1111494384|
|230   |48774  |3.0   |1424846995|
|22    |5400   |2.5   |1131664567|
|181   |7153   |5.0   |1154542709|
|262   |2901   |3.0   |1433901228|
|56    |33794  |4.0   |1470350738|
|250   |1517   |4.0   |1469807395|
|434   |457    |5.0   |886374515 |
|506   |141    |3.0   |865395298 |
|15    |55247  |1.0   |1416120041|
+------+-------+------+----------+
only showing top 10 rows



### Join movies_df and ratings_df into df

In [0]:
# Create a movies temp from movies_df
movies_df.createOrReplaceTempView("movies") 

In [0]:
# Create a ratings temp from ratings_df
ratings_df.createOrReplaceTempView("ratings")

In [0]:
df = spark.sql("select ratings.userId, movies.title, ratings.rating from ratings inner join movies on ratings.movieId == movies.movieId")


In [0]:
#validate few rows of dataframe
df.show(10)

+------+--------------------+------+
|userId|               title|rating|
+------+--------------------+------+
|     1|Dangerous Minds (...|   2.5|
|     1|        Dumbo (1941)|   3.0|
|     1|     Sleepers (1996)|   3.0|
|     1|Escape from New Y...|   2.0|
|     1|Cinema Paradiso (...|   4.0|
|     1|Deer Hunter, The ...|   2.0|
|     1|      Ben-Hur (1959)|   2.0|
|     1|       Gandhi (1982)|   2.0|
|     1|Dracula (Bram Sto...|   3.5|
|     1|    Cape Fear (1991)|   2.0|
+------+--------------------+------+
only showing top 10 rows



In [0]:
#validate few rows of dataframe in random order
df.orderBy(rand()).show(10, False)

+------+------------------------------+------+
|userId|title                         |rating|
+------+------------------------------+------+
|15    |Lara Croft: Tomb Raider (2001)|1.0   |
|664   |Insomnia (2002)               |4.0   |
|306   |Muse, The (1999)              |3.0   |
|607   |Aliens (1986)                 |4.0   |
|509   |Time Code (2000)              |3.0   |
|313   |Suspect Zero (2004)           |2.5   |
|114   |True Lies (1994)              |5.0   |
|587   |Charade (1963)                |5.0   |
|475   |Lucky Number Slevin (2006)    |3.0   |
|615   |Serious Man, A (2009)         |4.0   |
+------+------------------------------+------+
only showing top 10 rows



In [0]:
#validate the shape of the data 
print((df.count(),len(df.columns)))

(100004, 3)


In [0]:
#check columns in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: double (nullable = true)



## Data Exploration

In [0]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|547   |2391 |
|564   |1868 |
|624   |1735 |
|15    |1700 |
|73    |1610 |
|452   |1340 |
|468   |1291 |
|380   |1063 |
|311   |1019 |
|30    |1011 |
+------+-----+
only showing top 10 rows



The user with the highest number of records has rated **2391** movies.

In [0]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|325   |20   |
|498   |20   |
|1     |20   |
|296   |20   |
|209   |20   |
|604   |20   |
|319   |20   |
|444   |20   |
|540   |20   |
|76    |20   |
+------+-----+
only showing top 10 rows



Each user has rated atleast **20** movies.

In [0]:
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Forrest Gump (1994)                      |341  |
|Pulp Fiction (1994)                      |324  |
|Shawshank Redemption, The (1994)         |311  |
|Silence of the Lambs, The (1991)         |304  |
|Star Wars: Episode IV - A New Hope (1977)|291  |
|Jurassic Park (1993)                     |274  |
|Matrix, The (1999)                       |259  |
|Toy Story (1995)                         |247  |
|Schindler's List (1993)                  |244  |
|Terminator 2: Judgment Day (1991)        |237  |
+-----------------------------------------+-----+
only showing top 10 rows



The Movie with the highest number of ratings is **Forrest Gump (1994)** and has been rated **341** times.

In [0]:
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)

+-----------------------+-----+
|title                  |count|
+-----------------------+-----+
|Morgan! (1966)         |1    |
|Dolphin Tale (2011)    |1    |
|Wicked Blood (2014)    |1    |
|Winnebago Man (2009)   |1    |
|Just Before I Go (2014)|1    |
|The Boy (2016)         |1    |
|10 Attitudes (2001)    |1    |
|Prom (2011)            |1    |
|Grudge 3, The (2009)   |1    |
|Paris, France (1993)   |1    |
+-----------------------+-----+
only showing top 10 rows



Each movie has been rated by atleast one user.

## Feature Engineering

In [0]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [0]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [0]:
#creating new dataframe with transformed values
transformed_df = model.transform(df)

In [0]:
#validate the numerical title values
transformed_df.show(10)

+------+--------------------+------+---------+
|userId|               title|rating|title_new|
+------+--------------------+------+---------+
|     1|Dangerous Minds (...|   2.5|    570.0|
|     1|        Dumbo (1941)|   3.0|    571.0|
|     1|     Sleepers (1996)|   3.0|    761.0|
|     1|Escape from New Y...|   2.0|    478.0|
|     1|Cinema Paradiso (...|   4.0|    502.0|
|     1|Deer Hunter, The ...|   2.0|    476.0|
|     1|      Ben-Hur (1959)|   2.0|    501.0|
|     1|       Gandhi (1982)|   2.0|    506.0|
|     1|Dracula (Bram Sto...|   3.5|    412.0|
|     1|    Cape Fear (1991)|   2.0|    617.0|
+------+--------------------+------+---------+
only showing top 10 rows



In [0]:
#number of times each numerical movie title has been rated 
transformed_df.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |341  |
|1.0      |324  |
|2.0      |311  |
|3.0      |304  |
|4.0      |291  |
|5.0      |274  |
|6.0      |259  |
|7.0      |247  |
|8.0      |244  |
|9.0      |237  |
+---------+-----+
only showing top 10 rows



In [0]:
transformed_df.select(mean("rating"), stddev("rating")).first()

Out[142]: Row(avg(rating)=3.543608255669773, stddev_samp(rating)=1.0580641091070326)

In [0]:
transformed_df.select(max("rating"), min("rating")).first()

Out[143]: Row(max(rating)=5.0, min(rating)=0.5)

In [0]:
mean_rating, sttdev_rating = transformed_df.select(mean("rating"), stddev("rating")).first()

max_rating,min_rating = transformed_df.select(max("rating"), min("rating")).first()


Norm_df=transformed_df.withColumn("rating_Normalized", (col("rating") - mean_rating) / sttdev_rating)

dff = Norm_df.withColumn("rating_minmax", (col("rating") - min_rating) /(max_rating-min_rating))



dff.show(5)

+------+--------------------+------+---------+-------------------+------------------+
|userId|               title|rating|title_new|  rating_Normalized|     rating_minmax|
+------+--------------------+------+---------+-------------------+------------------+
|     1|Dangerous Minds (...|   2.5|    570.0|-0.9863374503370503|0.4444444444444444|
|     1|        Dumbo (1941)|   3.0|    571.0|-0.5137762929399039|0.5555555555555556|
|     1|     Sleepers (1996)|   3.0|    761.0|-0.5137762929399039|0.5555555555555556|
|     1|Escape from New Y...|   2.0|    478.0|-1.4588986077341968|0.3333333333333333|
|     1|Cinema Paradiso (...|   4.0|    502.0|0.43134602185438903|0.7777777777777778|
+------+--------------------+------+---------+-------------------+------------------+
only showing top 5 rows



In [0]:
dff.select(max("rating")).first()[0]

Out[147]: 5.0

## Train Test Split For Modeling

In [0]:
#split the data into training and test datatset
train, test = dff.randomSplit([0.8, 0.2], seed=12345)


In [0]:
#count number of records in train set
train.count()

Out[149]: 79916

In [0]:
#count number of records in test set
test.count()

Out[150]: 20088

## Modeling - ALS

In [0]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

#Training the recommender model using train datatset
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [0]:
ALSExplicit = ALS( implicitPrefs=False, userCol="userId", itemCol="title_new", ratingCol="rating_minmax",
          coldStartStrategy="drop")

defaultModel = ALSExplicit.fit(train)

In [0]:
paramMapExplicit = ParamGridBuilder() \
                    .addGrid(ALSExplicit.rank, [ 8, 12]) \
                    .addGrid(ALSExplicit.maxIter, [5,10]) \
                    .addGrid(ALSExplicit.regParam, [0.01,0.001]) \
                    .addGrid(ALSExplicit.alpha, [2.0,3.0]) \
                    .build()

evaluatorR = RegressionEvaluator(metricName="rmse", predictionCol='prediction', labelCol="rating_minmax")


# Run cross-validation, and choose the best set of parameters.
CVALSExplicit = CrossValidator(estimator=ALSExplicit,
                            estimatorParamMaps=paramMapExplicit,
                            evaluator=evaluatorR,
                           numFolds=5)


CVModelEXplicit = CVALSExplicit.fit(train)

In [0]:
# Make predictions on test documents. cvModel uses the best model found (cvModelInplicit).
predsExplicit = CVModelEXplicit.bestModel.transform(test)


predictions=predsExplicit.withColumn("predictnew", ((max_rating-min_rating)*col("prediction") + min_rating) )


predictions =predictions.select([c for c in predictions.columns if c not in {'rating_Normalized','rating_minmax','prediction'}])

predictions.show(5)

+------+--------------------+------+---------+------------------+
|userId|               title|rating|title_new|        predictnew|
+------+--------------------+------+---------+------------------+
|   148|Almost Famous (2000)|   4.5|    151.0|  4.38981568813324|
|   148|American Beauty (...|   5.0|     14.0| 4.458598256111145|
|   148|Apocalypse Now (1...|   5.0|    113.0| 4.251522064208984|
|   148|Arsenic and Old L...|   5.0|    656.0|4.4873751401901245|
|   148|Being John Malkov...|   4.5|     80.0|4.2262991070747375|
+------+--------------------+------+---------+------------------+
only showing top 5 rows



In [0]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='predictnew',labelCol='rating')
#apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predictions)
#print RMSE error
print(rmse)

0.9756148981707347


The RMSE is not very high. We are making an error less than one point in actual rating and predicted rating.

In [0]:
#predicted vs actual ratings for test set 
predictions.orderBy(rand()).show(10)

+------+--------------------+------+---------+------------------+
|userId|               title|rating|title_new|        predictnew|
+------+--------------------+------+---------+------------------+
|   478|Saving Private Ry...|   4.5|     32.0|  4.41192165017128|
|   564|Desperately Seeki...|   4.0|    698.0|2.6751151382923126|
|   391|       Gandhi (1982)|   4.0|    506.0|3.3374635875225067|
|   468|Lars and the Real...|   3.0|   1608.0| 4.051849037408829|
|   285|My Blue Heaven (1...|   4.0|   1803.0|3.2457392811775208|
|   452|    King Kong (2005)|   4.0|   1088.0| 2.792784720659256|
|    61|William Shakespea...|   3.0|    363.0| 3.354631334543228|
|    48|Captain America: ...|   2.5|   1135.0|3.5060508847236633|
|    92|Mission: Impossib...|   5.0|     40.0|2.9821840822696686|
|    73| Midnight Run (1988)|   4.0|    863.0|  3.80600768327713|
+------+--------------------+------+---------+------------------+
only showing top 10 rows



After checking the performance of the model and tuning hyperparameters, we can move ahead to recommend top movies to users, which they have not seen and might like.

## Recommend top movies  which user might like

In [0]:
#create dataset of all distinct movies 
unique_movies=dff.select('title_new').distinct()
#number of unique movies
unique_movies.count()

Out[160]: 9064

We have **9064** distinct movies in the dataframe.

In [0]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')
user_id=85
#creating another dataframe which contains already watched movie by active user 
watched_movies=dff.filter(dff['userId'] == user_id).select('title_new').distinct()
#number of movies already rated 
watched_movies.count()

Out[161]: 107

There are a total **107** unique movies out of **9064** movies that this active user has already rated.

In [0]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')
#joining both tables on left join 
total_movies = a.join(b, a.title_new == b.title_new,how='left')
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|496.0    |null     |
|934.0    |null     |
|299.0    |null     |
|692.0    |null     |
|305.0    |null     |
|769.0    |null     |
|596.0    |null     |
|558.0    |null     |
|5858.0   |null     |
|1761.0   |null     |
+---------+---------+
only showing top 10 rows



In [0]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
#number of movies user is yet to rate 
remaining_movies.count()

Out[164]: 8957

So we will make recommendation on movies from the remaining **8957** movies.

In [0]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|496.0    |85    |
|934.0    |85    |
|299.0    |85    |
|692.0    |85    |
|305.0    |85    |
|769.0    |85    |
|596.0    |85    |
|558.0    |85    |
|5858.0   |85    |
|1761.0   |85    |
+---------+------+
only showing top 10 rows



In [0]:
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=CVModelEXplicit.bestModel.transform(remaining_movies).orderBy('prediction',ascending=False)

rec_df=recommendations.withColumn("predictnew", (sttdev_rating*col("prediction") + mean_rating) )

rec_df =rec_df.select([c for c in rec_df.columns if c not in {'rating_Normalized','rating_minmax','prediction'}])

rec_df.show(5, False)


+---------+------+-----------------+
|title_new|userId|predictnew       |
+---------+------+-----------------+
|2134.0   |85    |5.011948232291001|
|2541.0   |85    |4.971384984463425|
|1984.0   |85    |4.88924136785378 |
|2842.0   |85    |4.884972083371174|
|2080.0   |85    |4.881926144143733|
+---------+------+-----------------+
only showing top 5 rows



So the titles **2134** and **2541** have the highest predicted rating for this active user **85**.

In [0]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(rec_df)
final_recommendations.show(10,False)

+---------+------+-----------------+--------------------------------------+
|title_new|userId|predictnew       |title                                 |
+---------+------+-----------------+--------------------------------------+
|2134.0   |85    |5.011948232291001|Half Baked (1998)                     |
|2541.0   |85    |4.971384984463425|Pink Flamingos (1972)                 |
|1984.0   |85    |4.88924136785378 |Dead Man (1995)                       |
|2842.0   |85    |4.884972083371174|Clockwatchers (1997)                  |
|2080.0   |85    |4.881926144143733|White Christmas (1954)                |
|2884.0   |85    |4.879958625571169|Fitzcarraldo (1982)                   |
|3006.0   |85    |4.860971610967517|Pillow Book, The (1996)               |
|2120.0   |85    |4.849197401644464|Endless Summer, The (1966)            |
|2688.0   |85    |4.838729153427914|It's a Mad, Mad, Mad, Mad World (1963)|
|4558.0   |85    |4.821137652990993|Outside Providence (1999)             |
+---------+-

So the recommendations for the userId **85** are **Half Baked (1998)** and **Pink Flamingos (1972)**

## Predictions Functions for future recommendations

In [0]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=dff.filter(dff['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(rec_df)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)
top_movies(85,10)

+---------+------+-----------------+--------------------------------------+
|title_new|userId|predictnew       |title                                 |
+---------+------+-----------------+--------------------------------------+
|2134.0   |85    |5.011948232291001|Half Baked (1998)                     |
|2541.0   |85    |4.971384984463425|Pink Flamingos (1972)                 |
|1984.0   |85    |4.88924136785378 |Dead Man (1995)                       |
|2842.0   |85    |4.884972083371174|Clockwatchers (1997)                  |
|2080.0   |85    |4.881926144143733|White Christmas (1954)                |
|2884.0   |85    |4.879958625571169|Fitzcarraldo (1982)                   |
|3006.0   |85    |4.860971610967517|Pillow Book, The (1996)               |
|2120.0   |85    |4.849197401644464|Endless Summer, The (1966)            |
|2688.0   |85    |4.838729153427914|It's a Mad, Mad, Mad, Mad World (1963)|
|4558.0   |85    |4.821137652990993|Outside Providence (1999)             |
+---------+-

In [0]:
import pandas as pd

### Single Prediction
A prediction for a single userId and Title can be generated as follows

In [0]:
def movie_rating(userId, title_numeric):
    
    d = {'userId': [userId], 'title_new': [title_numeric]}
    df1 = pd.DataFrame(data=d)
    df2=spark.createDataFrame(df1)

    single_pred=CVModelEXplicit.bestModel.transform(df2)

    return single_pred.withColumn("prediction", (sttdev_rating*col("prediction") + mean_rating) ).show()

movie_rating(85, 2134.0)

+------+---------+-----------------+
|userId|title_new|       prediction|
+------+---------+-----------------+
|    85|   2134.0|5.011948232291001|
+------+---------+-----------------+



### Saving the model

In [0]:
#save to folder models which is created in the writing process
path="/models"


model = CVModelEXplicit
model.write().overwrite().save(path)


#load the model 

sameModel = CVModelEXplicit.load(path)
sameModel

Out[195]: CrossValidatorModel_b6fa93307b05