In [340]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

In [341]:
#import the required functions and libraries
from pyspark.sql.functions import *

In [342]:
#load the dataset and create sprk dataframe
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [343]:
#validate the shape of the data 
print((df.count(),len(df.columns)))

(100000, 3)


In [344]:
#check columns in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [346]:
#validate few rows of dataframe in random order
df.orderBy(rand()).show(10,False)

+------+-----------------------------+------+
|userId|title                        |rating|
+------+-----------------------------+------+
|840   |Amistad (1997)               |4     |
|711   |Grand Day Out, A (1992)      |5     |
|311   |Casper (1995)                |2     |
|717   |Ulee's Gold (1997)           |4     |
|389   |Miracle on 34th Street (1994)|5     |
|416   |Cool Runnings (1993)         |3     |
|449   |Withnail and I (1987)        |5     |
|796   |To Kill a Mockingbird (1962) |4     |
|658   |Toy Story (1995)             |4     |
|345   |Shining, The (1980)          |4     |
+------+-----------------------------+------+
only showing top 10 rows



In [347]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [348]:
#check number of ratings by each user
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|636   |20   |
|631   |20   |
|926   |20   |
|93    |20   |
|596   |20   |
|572   |20   |
|34    |20   |
|685   |20   |
|300   |20   |
+------+-----+
only showing top 10 rows



In [349]:
#number of times movie been rated 
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [350]:
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Lashou shentan (1992)                    |1    |
|Fear, The (1995)                         |1    |
|Aiqing wansui (1994)                     |1    |
|Mad Dog Time (1996)                      |1    |
|Leopard Son, The (1996)                  |1    |
|Next Step, The (1995)                    |1    |
|Target (1995)                            |1    |
|Vie est belle, La (Life is Rosey) (1987) |1    |
|Modern Affair, A (1995)                  |1    |
|JLG/JLG - autoportrait de d�cembre (1994)|1    |
+-----------------------------------------+-----+
only showing top 10 rows



In [291]:
#import String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString

In [292]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [293]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [294]:
#creating new dataframe with transformed values
indexed = model.transform(df)

In [295]:
#validate the numerical title values
indexed.show(10)

+------+--------------------+------+---------+
|userId|               title|rating|title_new|
+------+--------------------+------+---------+
|   932|    Cape Fear (1991)|     3|    161.0|
|   721|   Piano, The (1993)|     3|    173.0|
|   642|Low Down Dirty Sh...|     2|   1115.0|
|   798|That Darn Cat! (1...|     4|    686.0|
|   535|African Queen, Th...|     4|    199.0|
|   765|Stealing Beauty (...|     5|    521.0|
|   927|Poison Ivy II (1995)|     3|   1041.0|
|   544|    G.I. Jane (1997)|     3|    152.0|
|   788|Godfather: Part I...|     4|    108.0|
|   706|Birdcage, The (1996)|     4|     43.0|
+------+--------------------+------+---------+
only showing top 10 rows



In [296]:
#number of times each numerical movie title has been rated 
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



In [297]:
#split the data into training and test datatset
train,test=indexed.randomSplit([0.75,0.25])

In [298]:
#count number of records in train set
train.count()

75104

In [299]:
#count number of records in test set
test.count()

24876

In [300]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

In [301]:
#Training the recommender model using train datatset
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [302]:
#fit the model on train set
rec_model=rec.fit(train)

In [303]:
#making predictions on test set 
predicted_ratings=rec_model.transform(test)

In [337]:
#columns in predicted ratings dataframe
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [304]:
#predicted vs actual ratings for test set 
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|    92|Tie Me Up! Tie Me...|     4|    766.0| 3.1512196|
|   222|       Batman (1989)|     3|    116.0|  3.503284|
|   178|Beauty and the Be...|     4|    114.0| 4.1487904|
|   303|Jerry Maguire (1996)|     5|     15.0|  4.348913|
|   134|      Flubber (1997)|     2|    579.0| 2.5635276|
|   295|      Henry V (1989)|     4|    268.0| 4.2598643|
|   889|Adventures of Pri...|     2|    305.0| 2.9040515|
|   374| Men in Black (1997)|     3|     31.0|  3.602631|
|   559|Killing Fields, T...|     4|    276.0|   4.55797|
|   290|Star Trek: The Mo...|     1|    286.0| 3.2992659|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [305]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [306]:
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [307]:
#apply the RE on predicted ratings dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)

In [308]:
#print RMSE error
print(rmse)

1.0293574739493354


In [309]:
#Recommend top movies  which user might like 

In [310]:
#create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

In [311]:
#number of unique movies
unique_movies.count()

1664

In [312]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')

In [336]:
user_id=85

In [321]:
#creating another dataframe which contains already watched movie by active user 
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [322]:
#number of movies already rated 
watched_movies.count()

287

In [323]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')

In [324]:
#joining both tables on left join 
total_movies = a.join(b, a.title_new == b.title_new,how='left')


In [325]:
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|299.0    |null     |
|558.0    |null     |
|305.0    |305.0    |
|596.0    |null     |
|1051.0   |null     |
|934.0    |null     |
|496.0    |496.0    |
|769.0    |null     |
|692.0    |null     |
|720.0    |null     |
+---------+---------+
only showing top 10 rows



In [326]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [327]:
#number of movies user is yet to rate 
remaining_movies.count()

1377

In [328]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))


In [329]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|299.0    |85    |
|558.0    |85    |
|596.0    |85    |
|1051.0   |85    |
|934.0    |85    |
|769.0    |85    |
|692.0    |85    |
|720.0    |85    |
|576.0    |85    |
|810.0    |85    |
+---------+------+
only showing top 10 rows



In [333]:
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [332]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1433.0   |85    |4.9689837 |
|1322.0   |85    |4.6927013 |
|1271.0   |85    |4.605163  |
|1470.0   |85    |4.5409293 |
|705.0    |85    |4.532007  |
+---------+------+----------+



In [334]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)


In [335]:
final_recommendations.show(10,False)

+---------+------+----------+----------------------------+
|title_new|userId|prediction|title                       |
+---------+------+----------+----------------------------+
|1433.0   |85    |4.9689837 |Boys, Les (1997)            |
|1322.0   |85    |4.6927013 |Faust (1994)                |
|1271.0   |85    |4.605163  |Whole Wide World, The (1996)|
|1470.0   |85    |4.5409293 |Some Mother's Son (1996)    |
|705.0    |85    |4.532007  |Laura (1944)                |
|303.0    |85    |4.5236835 |Close Shave, A (1995)       |
|1121.0   |85    |4.4936523 |Crooklyn (1994)             |
|1195.0   |85    |4.4636283 |Pather Panchali (1955)      |
|285.0    |85    |4.456875  |Wrong Trousers, The (1993)  |
|638.0    |85    |4.4495435 |Shall We Dance? (1996)      |
+---------+------+----------+----------------------------+
only showing top 10 rows



In [338]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [339]:
top_movies(85,10)

+---------+------+----------+----------------------------+
|title_new|userId|prediction|title                       |
+---------+------+----------+----------------------------+
|1433.0   |85    |4.9689837 |Boys, Les (1997)            |
|1322.0   |85    |4.6927013 |Faust (1994)                |
|1271.0   |85    |4.605163  |Whole Wide World, The (1996)|
|1470.0   |85    |4.5409293 |Some Mother's Son (1996)    |
|705.0    |85    |4.532007  |Laura (1944)                |
|303.0    |85    |4.5236835 |Close Shave, A (1995)       |
|1121.0   |85    |4.4936523 |Crooklyn (1994)             |
|1195.0   |85    |4.4636283 |Pather Panchali (1955)      |
|285.0    |85    |4.456875  |Wrong Trousers, The (1993)  |
|638.0    |85    |4.4495435 |Shall We Dance? (1996)      |
+---------+------+----------+----------------------------+

