In [0]:
# importing and creating sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('RecSystem_ALS_Pyspark_app1').getOrCreate()

In [0]:
# importing the required functions and libraries
from pyspark.sql.functions import *

In [0]:
# loading the dataset and creating spark dataframe with naming convention s_df = spark_df
s_df = spark.read.csv('/FileStore/tables/movie_ratings_df.csv',inferSchema=True,header=True)

In [0]:
# validating the shape of the data 
print((s_df.count(),len(s_df.columns)))

(100000, 3)


In [0]:
# checking columns in dataframe
s_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [0]:
# validating few rows of dataframe in random order
s_df.orderBy(rand()).show(7,False)

+------+---------------------------------------------+------+
|userId|title                                        |rating|
+------+---------------------------------------------+------+
|806   |Heat (1995)                                  |4     |
|886   |Star Trek VI: The Undiscovered Country (1991)|3     |
|327   |Sleepers (1996)                              |2     |
|308   |Searching for Bobby Fischer (1993)           |4     |
|507   |Deconstructing Harry (1997)                  |5     |
|299   |Star Trek III: The Search for Spock (1984)   |3     |
|601   |Little Princess, A (1995)                    |3     |
+------+---------------------------------------------+------+
only showing top 7 rows



In [0]:
# check number of ratings by each user
s_df.groupBy('userId').count().orderBy('count',ascending=False).show(7,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
+------+-----+
only showing top 7 rows



In [0]:
# check number of ratings by each user
# minimum count 20 for any user, means any user rated atleast 20 movies
s_df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|19    |20   |
|143   |20   |
|309   |20   |
|34    |20   |
|202   |20   |
|732   |20   |
|441   |20   |
|685   |20   |
|824   |20   |
|631   |20   |
+------+-----+
only showing top 10 rows



In [0]:
# number of times movie been rated 
s_df.groupBy('title').count().orderBy('count',ascending=False).show(7,False)

+---------------------------+-----+
|title                      |count|
+---------------------------+-----+
|Star Wars (1977)           |583  |
|Contact (1997)             |509  |
|Fargo (1996)               |508  |
|Return of the Jedi (1983)  |507  |
|Liar Liar (1997)           |485  |
|English Patient, The (1996)|481  |
|Scream (1996)              |478  |
+---------------------------+-----+
only showing top 7 rows



In [0]:
s_df.groupBy('title').count().orderBy('count',ascending=True).show(7,False)

+------------------------------------------+-----+
|title                                     |count|
+------------------------------------------+-----+
|Next Step, The (1995)                     |1    |
|Marlene Dietrich: Shadow and Light (1996) |1    |
|Mad Dog Time (1996)                       |1    |
|Target (1995)                             |1    |
|Leopard Son, The (1996)                   |1    |
|Modern Affair, A (1995)                   |1    |
|Lashou shentan (1992)                     |1    |
+------------------------------------------+-----+
only showing top 7 rows



In [0]:
# importing String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString

In [0]:
# creating string indexer to convert the movie title column values into numerical values
si_object = StringIndexer(inputCol="title", outputCol="title_new")

In [0]:
# applying stringindexer object on dataframe movie title column
si_model = si_object.fit(s_df)

In [0]:
# creating new dataframe with transformed values
s_df_indexed = si_model.transform(s_df)

In [0]:
# validate the numerical title values
s_df_indexed.orderBy(rand()).show(7,False)

+------+--------------------------------------------+------+---------+
|userId|title                                       |rating|title_new|
+------+--------------------------------------------+------+---------+
|701   |Game, The (1997)                            |3     |67.0     |
|707   |Mary Poppins (1964)                         |3     |145.0    |
|843   |Escape from New York (1981)                 |4     |369.0    |
|70    |Young Frankenstein (1974)                   |4     |117.0    |
|526   |Devil's Advocate, The (1997)                |2     |130.0    |
|291   |Halloween: The Curse of Michael Myers (1995)|3     |855.0    |
|751   |New Age, The (1994)                         |1     |1606.0   |
+------+--------------------------------------------+------+---------+
only showing top 7 rows



In [0]:
#validate the numerical title values using take method
s_df_indexed.take(7)

Out[111]: [Row(userId=196, title='Kolya (1996)', rating=3, title_new=287.0),
 Row(userId=63, title='Kolya (1996)', rating=3, title_new=287.0),
 Row(userId=226, title='Kolya (1996)', rating=5, title_new=287.0),
 Row(userId=154, title='Kolya (1996)', rating=3, title_new=287.0),
 Row(userId=306, title='Kolya (1996)', rating=5, title_new=287.0),
 Row(userId=296, title='Kolya (1996)', rating=4, title_new=287.0),
 Row(userId=34, title='Kolya (1996)', rating=5, title_new=287.0)]

In [0]:
#number of times each numerical movie title has been rated 
s_df_indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(7,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
+---------+-----+
only showing top 7 rows



In [0]:
# spliting the data into training and test datatset
# This will still make sure that 3:1 ratio for any user ratings will be split in both dataset or dfs, this is very important to understand
s_df_train, s_df_test = s_df_indexed.randomSplit([0.75,0.25])

In [0]:
# count number of records in train set
s_df_train.count()

Out[114]: 74759

In [0]:
# count number of records in test set
s_df_test.count()

Out[115]: 25241

In [0]:
# importingg ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

In [0]:
# Training the recommender model using train datatset 
# OR # Preparing rec object using ALS method with paramaeters using s_df_train coloumn names
rec_object = ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [0]:
# fit the model on train set # OR # Fitting it on s_df_train dataframe, later to transform on it 
rec_model1 = rec_object.fit(s_df_train)

In [0]:
# making predictions on test set # OR # making predictions on s_df_test dataframe
s_df_predicted_ratings = rec_model1.transform(s_df_test)

In [0]:
# columns in predicted ratings dataframe
s_df_predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: integer (nullable = false)
 |-- prediction: float (nullable = false)



In [0]:
# predicted vs actual ratings for test set , some of ratings predicitons may go over 5 since, it's regression, if it were multinomial regression/classification then result could have been probabilities for target label/ result with highest probability class being shown.
s_df_predicted_ratings.orderBy(rand()).show(7)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|     5|Fish Called Wanda...|     5|       69|   2.77064|
|   102|     Hercules (1997)|     2|      495|   2.68609|
|   928|      Amadeus (1984)|     5|       50| 4.4491286|
|   369|Dead Poets Societ...|     5|       65| 4.9465523|
|   682|  Clean Slate (1994)|     2|     1118| 2.8859372|
|    26|     Basquiat (1996)|     2|      648|  3.430551|
|   586|River Wild, The (...|     3|      213| 3.0809932|
+------+--------------------+------+---------+----------+
only showing top 7 rows



In [0]:
# importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# creating Regressor evaluator object for measuring accuracy
re_object = RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [0]:
# applying the re_object on s_df_predictions dataframe to calculate RMSE : between predictionCol and LableCol
rmse1 = re_object.evaluate(s_df_predicted_ratings)

In [0]:
# print RMSE error
print(rmse1)

1.0200979709845475


In [0]:
# Recommend top movies  which user might like as shown in few below cells

In [0]:
# creating dataset of all distinct movies 
s_df_unique_movies = s_df_indexed.select('title_new').distinct()

In [0]:
# number of unique movies
s_df_unique_movies.count()

Out[128]: 1664

In [0]:
# assigning alias name 's_df_a' to unique movies df
s_df_a = s_df_unique_movies.alias('s_df_a')

In [0]:
user_id = 93

In [0]:
# creating another dataframe which contains already watched movie by active user with user_id = 93
s_df_watched_movies = s_df_indexed.filter(s_df_indexed['userId'] == user_id).select('title_new').distinct()

In [0]:
# number of movies already rated 
s_df_watched_movies.count()

Out[132]: 20

In [0]:
# assigning alias name 's_df_b' to watched movies df
s_df_b = s_df_watched_movies.alias('s_df_b')

In [0]:
# joining both tables on left join 
s_df_total_movies = s_df_a.join(s_df_b, s_df_a.title_new == s_df_b.title_new, how='left')


In [0]:
s_df_total_movies.show(7, False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|413.0    |null     |
|287.0    |null     |
|85.0     |null     |
|123.0    |null     |
|259.0    |null     |
|1045.0   |null     |
|38.0     |null     |
+---------+---------+
only showing top 7 rows



In [0]:
# selecting movies which active user is yet to rate or watch
s_df_remaining_movies = s_df_total_movies.where(col("s_df_b.title_new").isNull()).select(s_df_a.title_new).distinct()

In [0]:
# number of movies active user user_id = 93, is yet to rate 
s_df_remaining_movies.count()

Out[137]: 1644

In [0]:
# adding new column of user_Id of active user to  s_df_remaining_movies_df 
s_df_remaining_movies = s_df_remaining_movies.withColumn("userId",lit(int(user_id)))


In [0]:
s_df_remaining_movies.show(7,False)

+---------+------+
|title_new|userId|
+---------+------+
|305.0    |93    |
|596.0    |93    |
|299.0    |93    |
|769.0    |93    |
|692.0    |93    |
|934.0    |93    |
|1051.0   |93    |
+---------+------+
only showing top 7 rows



In [0]:
# making recommendations using ALS recommender model and selecting only top 'n' movies
s_df_recommendations = rec_model1.transform(s_df_remaining_movies).orderBy('prediction',ascending=False)

In [0]:
# As mentioned above, some of ratings predicitons may go over 5 since, it's regression. These are predictions for user_id = 93, ALS model used previously learnt, U factor, I factor, multiply U_factor_u_id_93 with I_factor for selected_Unseen_movies and ratings are produced
s_df_recommendations.show(10,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1289     |93    |14.980225 |
|1103     |93    |12.038351 |
|974      |93    |9.184722  |
|1290     |93    |9.08625   |
|1092     |93    |8.703954  |
|1114     |93    |8.375067  |
|1323     |93    |8.291331  |
|953      |93    |8.284655  |
|971      |93    |8.244253  |
|708      |93    |8.160689  |
+---------+------+----------+
only showing top 10 rows



In [0]:
# converting title_new values back to movie titles
s_df_movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=si_model.labels)

s_df_final_recommendations = s_df_movie_title.transform(s_df_recommendations)


In [0]:
s_df_final_recommendations.show(7,False)

+---------+------+----------+--------------------------------------+
|title_new|userId|prediction|title                                 |
+---------+------+----------+--------------------------------------+
|1289     |93    |14.980225 |World of Apu, The (Apur Sansar) (1959)|
|1103     |93    |12.038351 |Stalker (1979)                        |
|974      |93    |9.184722  |Farinelli: il castrato (1994)         |
|1290     |93    |9.08625   |Zeus and Roxanne (1997)               |
|1092     |93    |8.703954  |American Buffalo (1996)               |
|1114     |93    |8.375067  |Bitter Moon (1992)                    |
|1323     |93    |8.291331  |Panther (1995)                        |
+---------+------+----------+--------------------------------------+
only showing top 7 rows



In [0]:
# Before using below function, one needs to have, s_df_unique_movies(distinct_movies_df), s_df_indexed(indexed_df)

In [0]:
# creating function to recommend top 'n' movies to any particular user
def top_movies_rec(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 's_df_a' to unique movies df
    s_df_a = s_df_unique_movies.alias('s_df_a')
    
    #creating another dataframe which contains already watched movie by active user 
    s_df_watched_movies = s_df_indexed.filter(s_df_indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 's_df_b' to watched movies df
    s_df_b = s_df_watched_movies.alias('s_df_b')
    
    #joining both tables on left join 
    s_df_total_movies = s_df_a.join(s_df_b, s_df_a.title_new == s_df_b.title_new,how='left')
    
    #selecting movies which active user has not seen/watched/rated
    s_df_remaining_movies = s_df_total_movies.where(col("s_df_b.title_new").isNull()).select(s_df_a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    s_df_remaining_movies = s_df_remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model previously above created and selecting only top 'n' movies
    s_df_recommendations = rec_model1.transform(s_df_remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    s_df_movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=si_model.labels)
    s_df_final_recommendations = s_df_movie_title.transform(s_df_recommendations)
    
    #return the recommendations to active user
    return s_df_final_recommendations.show(n, False)

In [0]:
# Top 7 recommendation for user_id = 93 is as same as above shown without top_movies_rec function created
top_movies_rec(93, 7)

+---------+------+----------+--------------------------------------+
|title_new|userId|prediction|title                                 |
+---------+------+----------+--------------------------------------+
|1289     |93    |14.980225 |World of Apu, The (Apur Sansar) (1959)|
|1103     |93    |12.038351 |Stalker (1979)                        |
|974      |93    |9.184722  |Farinelli: il castrato (1994)         |
|1290     |93    |9.08625   |Zeus and Roxanne (1997)               |
|1092     |93    |8.703954  |American Buffalo (1996)               |
|1114     |93    |8.375067  |Bitter Moon (1992)                    |
|1323     |93    |8.291331  |Panther (1995)                        |
+---------+------+----------+--------------------------------------+

