# Movie Recommendation with Pyspark

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
import numpy as np
import pandas as pd
import random 
import os

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean,col,split,col,regexp_extract,when,lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer


In [4]:
#Create a SparkSession

spark=SparkSession.builder.appName('recommendater_system').getOrCreate()



In [5]:
df=spark.read.csv(r"C:\Users\FATHIMA SHEMEEMA\Desktop\pyspark\movie_ratings_df.csv", inferSchema=True,header=True)
df.limit(5).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5
3,154,Kolya (1996),3
4,306,Kolya (1996),5


In [6]:
#printSchema() to quick overview of features datatype

df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [8]:
#As we can see, the title column is stored as string type. To work with pyspark Mlib library, we need to convert string type to numeric values

from pyspark.ml.feature import StringIndexer, IndexToString

stringIndexer=StringIndexer(inputCol="title", outputCol="title_new")

# Applying stringindexer object on dataframe movie title column
model=stringIndexer.fit(df)
#creating new dataframe with transformed values
indexed=model.transform(df)
#validate the numerical title values
indexed.limit(5).toPandas()



Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


In [None]:
#We use Alternating least squares (ALS) algorithm in Pyspark Ml library for recommendation.
#To read more, you can visit at
#https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html



In [10]:
# split the data into training and test datatset

train,test=indexed.randomSplit([0.75,0.25])
from pyspark.ml.recommendation import ALS

#Training the recommender model using train datatset
rec=ALS(maxIter=10,
       regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',
       nonnegative=True,coldStartStrategy="drop")

In [11]:
#fit the model on train set
rec_model=rec.fit(train)



In [15]:
##making predictions on test set
predicted_rating=rec_model.transform(test)
predicted_rating.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,148,"Deer Hunter, The (1978)",1,280.0,5.172204
1,148,Fantasia (1940),5,153.0,5.202687
2,148,Free Willy (1993),1,761.0,3.955402
3,148,Gone with the Wind (1939),5,162.0,4.380488
4,148,"Good, The Bad and The Ugly, The (1966)",2,223.0,4.963204


# Evaluate the training



In [22]:
# Importing Regression Evaluator to measure RMSE

from pyspark.ml.evaluation import RegressionEvaluator

# create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

                              
# apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_rating)

#print RMSE error
print(rmse)
                              
                              

1.0173988274136665


In [24]:
#recommend top movies which user might like


# First we need to create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [27]:
# Test: recommend 5 movies for user of id=60
top_movies(6,20)

+---------+------+----------+----------------------------------------------------------+
|title_new|userId|prediction|title                                                     |
+---------+------+----------+----------------------------------------------------------+
|1347.0   |6     |6.728834  |Angel Baby (1995)                                         |
|917.0    |6     |5.899893  |Top Hat (1935)                                            |
|1198.0   |6     |5.5329914 |Pather Panchali (1955)                                    |
|428.0    |6     |5.4884787 |Shadowlands (1993)                                        |
|1252.0   |6     |5.4807067 |Year of the Horse (1997)                                  |
|663.0    |6     |5.259537  |When We Were Kings (1996)                                 |
|1122.0   |6     |5.1895137 |Faithful (1996)                                           |
|1155.0   |6     |5.176357  |Horseman on the Roof, The (Hussard sur le toit, Le) (1995)|
|465.0    |6     |4.9