## Recommender System with PySpark

### Data Info
The dataset that we are going to use for this chapter is a subset from a famous open sourced movie lens dataset and contains a total of 0.1 million records with three columns (User_Id,title,rating). We will train our recommender model using 75% of the data and test it on the rest of the 25% user ratings.

In [46]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

In [47]:
#import the required functions and libraries
from pyspark.sql.functions import *

#### Read in the data

In [20]:
#load the dataset and create sprk dataframe
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [21]:
#validate the shape of the data 
print((df.count(),len(df.columns)))

(100000, 3)


In [22]:
#check columns in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



There is a total of three columns out of which two are numerical and the title is categorical. The critical thing with using
PySpark for building RS is that we need to have user_id and item_id in numerical form. Hence, we will convert the movie title
to numerical values later.

In [23]:
#validate few rows of dataframe in random order
df.orderBy(rand()).show(10,False)

+------+--------------------------------------------+------+
|userId|title                                       |rating|
+------+--------------------------------------------+------+
|291   |Frighteners, The (1996)                     |4     |
|13    |Boogie Nights (1997)                        |2     |
|872   |Willy Wonka and the Chocolate Factory (1971)|2     |
|828   |Blood & Wine (1997)                         |3     |
|234   |Singin' in the Rain (1952)                  |5     |
|886   |Serial Mom (1994)                           |2     |
|918   |Mediterraneo (1991)                         |4     |
|150   |Leaving Las Vegas (1995)                    |5     |
|716   |Raging Bull (1980)                          |3     |
|706   |Liar Liar (1997)                            |4     |
+------+--------------------------------------------+------+
only showing top 10 rows



In [24]:
# The users who have have rated the most number of movies
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [25]:
#The users who have have rated the least number of movies
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|636   |20   |
|631   |20   |
|685   |20   |
|93    |20   |
|596   |20   |
|572   |20   |
|34    |20   |
|926   |20   |
|300   |20   |
+------+-----+
only showing top 10 rows



In [48]:
# Movies rated the most number of times 
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [49]:
# Movies rated the least number of times 
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Aiqing wansui (1994)                     |1    |
|Leopard Son, The (1996)                  |1    |
|Mad Dog Time (1996)                      |1    |
|Fear, The (1995)                         |1    |
|Lashou shentan (1992)                    |1    |
|Target (1995)                            |1    |
|JLG/JLG - autoportrait de d�cembre (1994)|1    |
|Vie est belle, La (Life is Rosey) (1987) |1    |
|Modern Affair, A (1995)                  |1    |
|Next Step, The (1995)                    |1    |
+-----------------------------------------+-----+
only showing top 10 rows



#### Feature Engineering: Transform movietitle to numeric

In [50]:
# import String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString

In [29]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [30]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [31]:
#creating new dataframe with transformed values
indexed = model.transform(df)

In [32]:
#validate the numerical title values
indexed.show(10)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows



In [33]:
#number of times each numerical movie title has been rated 
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



#### Split data into train and test:

In [34]:
#split the data into training and test datatset
train,test=indexed.randomSplit([0.75,0.25])

In [35]:
#count number of records in train set
train.count()

74945

In [36]:
#count number of records in test set
test.count()

25055

#### Training the model:

In [51]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

In [52]:
#Training the recommender model using train datatset
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [53]:
#fit the model on train set
rec_model=rec.fit(train)

In [54]:
#making predictions on test set 
predicted_ratings=rec_model.transform(test)

In [55]:
#columns in predicted ratings dataframe
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [56]:
#predicted vs actual ratings for test set 
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   640|Apocalypse Now (1...|     5|     91.0|  4.938202|
|    23|Mars Attacks! (1996)|     1|    100.0|  3.057589|
|   880|Sleepless in Seat...|     3|    103.0| 3.4481475|
|   303|      Alien 3 (1992)|     4|    337.0| 2.0629964|
|    26|        Nixon (1995)|     3|    372.0| 2.6245914|
|   503|    Chinatown (1974)|     5|    212.0| 4.1231394|
|    11|Michael Collins (...|     4|    362.0|  3.506594|
|   741|  French Kiss (1995)|     4|    399.0| 2.7031903|
|   416|Higher Learning (...|     3|    795.0|  4.323633|
|   339|Wizard of Oz, The...|     5|     70.0|  4.281655|
+------+--------------------+------+---------+----------+
only showing top 10 rows



#### Measuring Model Performamce:

In [77]:
predicted_ratings_witherr=predicted_ratings.withColumn('err',abs(predicted_ratings.prediction - predicted_ratings.rating))


predicted_ratings_witherr.show()


+------+--------------------+------+---------+----------+-----------+
|userId|               title|rating|title_new|prediction|        err|
+------+--------------------+------+---------+----------+-----------+
|   593|That Thing You Do...|     3|    148.0| 4.2289877|  1.2289877|
|   193|That Thing You Do...|     4|    148.0| 2.8238962|  1.1761038|
|   642|That Thing You Do...|     5|    148.0| 3.3591642|  1.6408358|
|   101|That Thing You Do...|     3|    148.0| 3.7438922|  0.7438922|
|   406|That Thing You Do...|     3|    148.0| 3.1977828| 0.19778275|
|   332|That Thing You Do...|     3|    148.0| 4.4566007|  1.4566007|
|   271|That Thing You Do...|     1|    148.0| 3.4159834|  2.4159834|
|   388|That Thing You Do...|     4|    148.0|  4.574866|  0.5748658|
|   360|That Thing You Do...|     3|    148.0| 3.0426192| 0.04261923|
|   416|That Thing You Do...|     4|    148.0|   4.31872| 0.31871986|
|   665|That Thing You Do...|     4|    148.0|  3.318143|  0.6818571|
|   435|That Thing Y

In [83]:
df.groupBy('rating').count().orderBy('rating',ascending=True).show()


+------+-----+
|rating|count|
+------+-----+
|     1| 6110|
|     2|11370|
|     3|27145|
|     4|34174|
|     5|21201|
+------+-----+



In [78]:
predicted_ratings_witherr.groupBy('rating').agg({'err':'mean'}).orderBy('rating',ascending=True).show()

+------+------------------+
|rating|          avg(err)|
+------+------------------+
|     1|1.5874335425510455|
|     2|1.0766927925360656|
|     3|0.6339440821210108|
|     4| 0.571613173802319|
|     5|0.9554858117940743|
+------+------------------+



In [57]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [58]:
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [59]:
#apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)

In [60]:
#print RMSE error
print(rmse)

1.028274688597927


#### Recommending movies: 

In [91]:
#create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

In [92]:
#number of unique movies
unique_movies.count()

1664

In [93]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')

In [110]:
user_id=100

In [111]:
#creating another dataframe which contains already watched movie by active user 
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [112]:
#number of movies already rated 
watched_movies.count()

59

In [114]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')

In [115]:
#joining both tables on left join 
total_movies = a.join(b, a.title_new == b.title_new,how='left')


In [116]:
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|558.0    |null     |
|305.0    |null     |
|299.0    |null     |
|596.0    |null     |
|769.0    |null     |
|934.0    |null     |
|496.0    |null     |
|1051.0   |null     |
|692.0    |null     |
|810.0    |null     |
+---------+---------+
only showing top 10 rows



In [117]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [118]:
#number of movies user is yet to rate 
remaining_movies.count()

1605

In [119]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))


In [120]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|558.0    |100   |
|305.0    |100   |
|299.0    |100   |
|596.0    |100   |
|769.0    |100   |
|934.0    |100   |
|496.0    |100   |
|1051.0   |100   |
|692.0    |100   |
|810.0    |100   |
+---------+------+
only showing top 10 rows



In [121]:
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [122]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1141.0   |100   |5.3275824 |
|1372.0   |100   |5.1144414 |
|1266.0   |100   |5.0234303 |
|1233.0   |100   |4.813241  |
|1178.0   |100   |4.803429  |
+---------+------+----------+
only showing top 5 rows



In [123]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)


In [124]:
final_recommendations.show(10,False)

+---------+------+----------+------------------------------+
|title_new|userId|prediction|title                         |
+---------+------+----------+------------------------------+
|1141.0   |100   |5.3275824 |Unhook the Stars (1996)       |
|1372.0   |100   |5.1144414 |Schizopolis (1996)            |
|1266.0   |100   |5.0234303 |Hollow Reed (1996)            |
|1233.0   |100   |4.813241  |Man of No Importance, A (1994)|
|1178.0   |100   |4.803429  |Little Princess, The (1939)   |
|82.0     |100   |4.7329516 |It's a Wonderful Life (1946)  |
|10.0     |100   |4.703681  |Raiders of the Lost Ark (1981)|
|266.0    |100   |4.6893234 |Great Escape, The (1963)      |
|1433.0   |100   |4.636163  |Boys, Les (1997)              |
|329.0    |100   |4.6015935 |Miracle on 34th Street (1994) |
+---------+------+----------+------------------------------+
only showing top 10 rows



#### Wrapping everything into a function for reuse:

In [125]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [126]:
top_movies(100,10)

+---------+------+----------+------------------------------+
|title_new|userId|prediction|title                         |
+---------+------+----------+------------------------------+
|1141.0   |100   |5.3275824 |Unhook the Stars (1996)       |
|1372.0   |100   |5.1144414 |Schizopolis (1996)            |
|1266.0   |100   |5.0234303 |Hollow Reed (1996)            |
|1233.0   |100   |4.813241  |Man of No Importance, A (1994)|
|1178.0   |100   |4.803429  |Little Princess, The (1939)   |
|82.0     |100   |4.7329516 |It's a Wonderful Life (1946)  |
|10.0     |100   |4.703681  |Raiders of the Lost Ark (1981)|
|266.0    |100   |4.6893234 |Great Escape, The (1963)      |
|1433.0   |100   |4.636163  |Boys, Les (1997)              |
|329.0    |100   |4.6015935 |Miracle on 34th Street (1994) |
+---------+------+----------+------------------------------+



In [127]:
top_movies(405,10)

+---------+------+----------+-------------------------------------------------+
|title_new|userId|prediction|title                                            |
+---------+------+----------+-------------------------------------------------+
|1166.0   |405   |4.228817  |In the Realm of the Senses (Ai no corrida) (1976)|
|1005.0   |405   |4.2038856 |Haunted World of Edward D. Wood Jr., The (1995)  |
|1113.0   |405   |4.1378055 |Wonderland (1997)                                |
|698.0    |405   |3.897505  |Charade (1963)                                   |
|946.0    |405   |3.8461444 |Gridlock'd (1997)                                |
|478.0    |405   |3.806411  |Bringing Up Baby (1938)                          |
|768.0    |405   |3.6552014 |Fast, Cheap & Out of Control (1997)              |
|297.0    |405   |3.6219516 |Rosewood (1997)                                  |
|1321.0   |405   |3.6044958 |Chasers (1994)                                   |
|1233.0   |405   |3.5489368 |Man of No I