## Recommender System with PySpark

### Data Info
The dataset that we are going to use for this chapter is a subset from a famous open sourced movie lens dataset and contains a total of 0.1 million records with three columns (User_Id,title,rating). We will train our recommender model using 75% of the data and test it on the rest of the 25% user ratings.

In [1]:
#import and create sparksession object
from pyspark.sql import SparkSession 
spark=SparkSession.builder.appName('rc').getOrCreate()

22/06/14 10:13:43 WARN Utils: Your hostname, mehrdad-Standard resolves to a loopback address: 127.0.1.1; using 128.179.176.230 instead (on interface wlp4s0)
22/06/14 10:13:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/14 10:13:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/14 10:13:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/14 10:13:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/06/14 10:13:44 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/06/14 10:13:44 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [2]:
#import the required functions and libraries
from pyspark.sql.functions import *

#### Read in the data

In [3]:
#load the dataset and create sprk dataframe
df=spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

In [4]:
#validate the shape of the data 
print((df.count(),len(df.columns)))

(100000, 3)


In [5]:
#check columns in dataframe
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



There is a total of three columns out of which two are numerical and the title is categorical. The critical thing with using
PySpark for building RS is that we need to have user_id and item_id in numerical form. Hence, we will convert the movie title
to numerical values later.

In [6]:
#validate few rows of dataframe in random order
df.orderBy(rand()).show(10,False)

+------+---------------------------------+------+
|userId|title                            |rating|
+------+---------------------------------+------+
|213   |Rear Window (1954)               |5     |
|604   |Abyss, The (1989)                |4     |
|375   |Mute Witness (1994)              |3     |
|697   |Time to Kill, A (1996)           |4     |
|711   |Heathers (1989)                  |4     |
|447   |Aliens (1986)                    |4     |
|394   |Terminator 2: Judgment Day (1991)|5     |
|267   |Bram Stoker's Dracula (1992)     |4     |
|676   |Scream (1996)                    |1     |
|816   |Liar Liar (1997)                 |5     |
+------+---------------------------------+------+
only showing top 10 rows



In [7]:
# The users who have have rated the most number of movies
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [8]:
#The users who have have rated the least number of movies
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|19    |20   |
|143   |20   |
|309   |20   |
|34    |20   |
|202   |20   |
|732   |20   |
|441   |20   |
|685   |20   |
|824   |20   |
|631   |20   |
+------+-----+
only showing top 10 rows



In [9]:
# Movies rated the most number of times 
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [10]:
# Movies rated the least number of times 
df.groupBy('title').count().orderBy('count',ascending=True).show(10,False)

+------------------------------------------+-----+
|title                                     |count|
+------------------------------------------+-----+
|Modern Affair, A (1995)                   |1    |
|Tigrero: A Film That Was Never Made (1994)|1    |
|JLG/JLG - autoportrait de d�cembre (1994) |1    |
|Target (1995)                             |1    |
|Vie est belle, La (Life is Rosey) (1987)  |1    |
|Fear, The (1995)                          |1    |
|Next Step, The (1995)                     |1    |
|Mad Dog Time (1996)                       |1    |
|Leopard Son, The (1996)                   |1    |
|Lashou shentan (1992)                     |1    |
+------------------------------------------+-----+
only showing top 10 rows



#### Feature Engineering: Transform movietitle to numeric

In [11]:
# import String indexer to convert string values to numeric values
from pyspark.ml.feature import StringIndexer,IndexToString

In [12]:
#creating string indexer to convert the movie title column values into numerical values
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

In [13]:
#applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)

In [14]:
#creating new dataframe with transformed values
indexed = model.transform(df)

In [15]:
#validate the numerical title values
indexed.show(10)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows



In [16]:
#number of times each numerical movie title has been rated 
indexed.groupBy('title_new').count().orderBy('count',ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



#### Split data into train and test:

In [17]:
#split the data into training and test datatset
train,test=indexed.randomSplit([0.75,0.25])

In [18]:
#count number of records in train set
train.count()

75106

In [19]:
#count number of records in test set
test.count()

24894

#### Training the model:

In [20]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS

In [21]:
#Training the recommender model using train datatset
rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_new',ratingCol='rating',nonnegative=True,coldStartStrategy="drop")

In [22]:
#fit the model on train set
rec_model=rec.fit(train)

In [23]:
#making predictions on test set 
predicted_ratings=rec_model.transform(test)

In [24]:
#columns in predicted ratings dataframe
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [25]:
#predicted vs actual ratings for test set 
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   468|      Amadeus (1984)|     4|     50.0| 4.9573684|
|   807|Return of the Jed...|     5|      3.0| 4.6208773|
|   141|      Kingpin (1996)|     4|    182.0|  4.202782|
|   419|Return of the Jed...|     4|      3.0| 4.1473756|
|   497|Event Horizon (1997)|     4|    254.0| 2.5687022|
|   703|      Twister (1996)|     5|     43.0|  3.632878|
|   178|Graduate, The (1967)|     2|     78.0| 3.6445382|
|   336|       Grease (1978)|     2|    168.0| 2.8548625|
|   429|Strictly Ballroom...|     4|    320.0| 3.8368697|
|    18|Schindler's List ...|     5|     36.0|  4.466964|
+------+--------------------+------+---------+----------+
only showing top 10 rows



#### Measuring Model Performamce:

In [26]:
predicted_ratings_witherr=predicted_ratings.withColumn('err',abs(predicted_ratings.prediction - predicted_ratings.rating))


predicted_ratings_witherr.show()


+------+--------------------+------+---------+----------+----------+
|userId|               title|rating|title_new|prediction|       err|
+------+--------------------+------+---------+----------+----------+
|   148|Beauty and the Be...|     4|    114.0|  4.666982| 0.6669822|
|   148|  Being There (1979)|     5|    290.0|  4.806283|  0.193717|
|   148| Blade Runner (1982)|     5|     52.0|  4.684685|0.31531477|
|   148|     Fantasia (1940)|     5|    153.0| 2.7503147| 2.2496853|
|   148|   Free Willy (1993)|     1|    761.0| 5.2160954| 4.2160954|
|   148|Grand Day Out, A ...|     4|    494.0|  4.606607|0.60660696|
|   148|       Hamlet (1996)|     2|    373.0| 4.2836447| 2.2836447|
|   148|Homeward Bound: T...|     1|    533.0| 4.6918488| 3.6918488|
|   148|Lion King, The (1...|     5|     93.0|  4.766523|0.23347712|
|   148|      Othello (1995)|     3|    453.0|  1.981492|  1.018508|
|   148|Pink Floyd - The ...|     5|    298.0|  3.150952| 1.8490479|
|   148| Pulp Fiction (1994)|     

In [27]:
df.groupBy('rating').count().orderBy('rating',ascending=True).show()


+------+-----+
|rating|count|
+------+-----+
|     1| 6110|
|     2|11370|
|     3|27145|
|     4|34174|
|     5|21201|
+------+-----+



In [28]:
predicted_ratings_witherr.groupBy('rating').agg({'err':'mean'}).orderBy('rating',ascending=True).show()

+------+------------------+
|rating|          avg(err)|
+------+------------------+
|     1|1.6221745321625158|
|     2|1.0483186148732349|
|     3|0.6372452690720227|
|     4|0.5694578637062283|
|     5|0.9588863216076843|
+------+------------------+



In [29]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [30]:
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')

In [31]:
#apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)

In [32]:
#print RMSE error
print(rmse)

1.0312237329706802


#### Recommending movies: 

In [33]:
#create dataset of all distinct movies 
unique_movies=indexed.select('title_new').distinct()

In [34]:
#number of unique movies
unique_movies.count()

1664

In [35]:
#assigning alias name 'a' to unique movies df
a = unique_movies.alias('a')

In [36]:
user_id=100

In [37]:
#creating another dataframe which contains already watched movie by active user 
watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()

In [38]:
#number of movies already rated 
watched_movies.count()

59

In [39]:
#assigning alias name 'b' to watched movies df
b=watched_movies.alias('b')

In [40]:
#joining both tables on left join 
total_movies = a.join(b, a.title_new == b.title_new,how='left')


In [41]:
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|305.0    |null     |
|596.0    |null     |
|299.0    |299.0    |
|769.0    |null     |
|692.0    |692.0    |
|934.0    |null     |
|1051.0   |null     |
|496.0    |null     |
|558.0    |null     |
|170.0    |170.0    |
+---------+---------+
only showing top 10 rows



In [42]:
#selecting movies which active user is yet to rate or watch
remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()

In [43]:
#number of movies user is yet to rate 
remaining_movies.count()

1605

In [44]:
#adding new column of user_Id of active useer to remaining movies df 
remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))


In [45]:
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|305.0    |100   |
|596.0    |100   |
|769.0    |100   |
|934.0    |100   |
|1051.0   |100   |
|496.0    |100   |
|558.0    |100   |
|184.0    |100   |
|576.0    |100   |
|147.0    |100   |
+---------+------+
only showing top 10 rows



In [46]:
#making recommendations using ALS recommender model and selecting only top 'n' movies
recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [47]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1103.0   |100   |5.8623085 |
|1120.0   |100   |5.596802  |
|1057.0   |100   |5.3697386 |
|1191.0   |100   |5.3175635 |
|905.0    |100   |5.1367264 |
+---------+------+----------+
only showing top 5 rows



In [48]:
#converting title_new values back to movie titles
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)

final_recommendations=movie_title.transform(recommendations)


In [49]:
final_recommendations.show(10,False)

+---------+------+----------+------------------------------------------------------------------+
|title_new|userId|prediction|title                                                             |
+---------+------+----------+------------------------------------------------------------------+
|1103.0   |100   |5.8623085 |Stalker (1979)                                                    |
|1120.0   |100   |5.596802  |Crooklyn (1994)                                                   |
|1057.0   |100   |5.3697386 |Safe (1995)                                                       |
|1191.0   |100   |5.3175635 |Jason's Lyric (1994)                                              |
|905.0    |100   |5.1367264 |Welcome To Sarajevo (1997)                                        |
|892.0    |100   |5.076649  |Double vie de V�ronique, La (Double Life of Veronique, The) (1991)|
|1385.0   |100   |4.9750447 |Maya Lin: A Strong Clear Vision (1994)                            |
|884.0    |100   |4.971722  |R

#### Wrapping everything into a function for reuse:

In [50]:
#create function to recommend top 'n' movies to any particular user
def top_movies(user_id,n):
    """
    This function returns the top 'n' movies that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique movies df
    a = unique_movies.alias('a')
    
    #creating another dataframe which contains already watched movie by active user 
    watched_movies=indexed.filter(indexed['userId'] == user_id).select('title_new')
    
    #assigning alias name 'b' to watched movies df
    b=watched_movies.alias('b')
    
    #joining both tables on left join 
    total_movies = a.join(b, a.title_new == b.title_new,how='left')
    
    #selecting movies which active user is yet to rate or watch
    remaining_movies=total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
    
    
    #adding new column of user_Id of active useer to remaining movies df 
    remaining_movies=remaining_movies.withColumn("userId",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' movies
    recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of movie titles in recommendations
    movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
    final_recommendations=movie_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [51]:
top_movies(100,10)

+---------+------+----------+------------------------------------------------------------------+
|title_new|userId|prediction|title                                                             |
+---------+------+----------+------------------------------------------------------------------+
|1103.0   |100   |5.8623085 |Stalker (1979)                                                    |
|1120.0   |100   |5.596802  |Crooklyn (1994)                                                   |
|1057.0   |100   |5.3697386 |Safe (1995)                                                       |
|1191.0   |100   |5.3175635 |Jason's Lyric (1994)                                              |
|905.0    |100   |5.1367264 |Welcome To Sarajevo (1997)                                        |
|892.0    |100   |5.076649  |Double vie de V�ronique, La (Double Life of Veronique, The) (1991)|
|1385.0   |100   |4.9750447 |Maya Lin: A Strong Clear Vision (1994)                            |
|884.0    |100   |4.971722  |R

In [52]:
top_movies(405,10)

+---------+------+----------+---------------------------------------+
|title_new|userId|prediction|title                                  |
+---------+------+----------+---------------------------------------+
|691.0    |405   |4.64808   |Some Folks Call It a Sling Blade (1993)|
|1208.0   |405   |4.633911  |Ayn Rand: A Sense of Life (1997)       |
|998.0    |405   |4.2558103 |Selena (1997)                          |
|1205.0   |405   |4.1107774 |Walking and Talking (1996)             |
|1335.0   |405   |4.109911  |Stripes (1981)                         |
|558.0    |405   |4.005119  |Bananas (1971)                         |
|5.0      |405   |3.9411144 |English Patient, The (1996)            |
|1483.0   |405   |3.9395828 |Everest (1998)                         |
|157.0    |405   |3.8434207 |My Best Friend's Wedding (1997)        |
|120.0    |405   |3.8082979 |Good Will Hunting (1997)               |
+---------+------+----------+---------------------------------------+

