# Recommender System

## Step 1: Create the SparkSession Object

We start the Jupyter Notebook and import `SparkSession` and create a new 
`SparkSession` object to use Spark:

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('recom').getOrCreate()

## Step 2: Read the Dataset

We then load and read the dataset within Spark using a dataframe.

In [3]:
df = spark.read.csv('movie_ratings_df.csv',inferSchema=True,header=True)

## Step 3: Exploratory Data Analysis

In this section, we explore the dataset by viewing the dataset, validating 
the shape of the dataset, and getting a count of the number of movies rated 
and the number of movies that each user rated

In [4]:
print((df.count(), len(df.columns)))

(100000, 3)


In [5]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



There is a total of three columns out of which two are numerical and 
the title is categorical. The critical thing with using PySpark for building 
RS is that we need to have user_id and item_id in numerical form. Hence, 
we will convert the movie title to numerical values later

In [9]:
from pyspark.sql.functions import rand 

df.orderBy(rand()).show(10,False)

+------+-------------------------------------------+------+
|userId|title                                      |rating|
+------+-------------------------------------------+------+
|102   |Boot, Das (1981)                           |1     |
|222   |My Life as a Dog (Mitt liv som hund) (1985)|2     |
|716   |His Girl Friday (1940)                     |5     |
|426   |Mr. Smith Goes to Washington (1939)        |4     |
|210   |I.Q. (1994)                                |3     |
|659   |Paris, Texas (1984)                        |4     |
|629   |Game, The (1997)                           |4     |
|747   |Star Trek: First Contact (1996)            |2     |
|156   |Paths of Glory (1957)                      |5     |
|151   |Ed Wood (1994)                             |3     |
+------+-------------------------------------------+------+
only showing top 10 rows



In [10]:
df.groupBy('userId').count().orderBy('count',ascending=False).show(10,False) 

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [11]:
df.groupBy('userId').count().orderBy('count',ascending=True).show(10,False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|636   |20   |
|631   |20   |
|93    |20   |
|685   |20   |
|572   |20   |
|596   |20   |
|926   |20   |
|34    |20   |
|300   |20   |
+------+-----+
only showing top 10 rows



The user with the highest number of records has rated 737 movies, and 
each user has rated at least 20 movies.

In [12]:
df.groupBy('title').count().orderBy('count',ascending=False).show(10,False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



The movie with highest number of ratings is Star Wars (1977) and has 
been rated 583 times, and each movie has been rated by at least by 1 user

## Step 4: Feature Engineering

We now convert the movie title column from categorical to numerical 
values using `StringIndexer`

In [15]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString

In [17]:
stringIndexer = StringIndexer(inputCol="title", outputCol="title_new")

model = stringIndexer.fit(df)
indexed = model.transform(df)

In [19]:
indexed.orderBy(rand()).show(10)

+------+--------------------+------+---------+
|userId|               title|rating|title_new|
+------+--------------------+------+---------+
|   183|101 Dalmatians (1...|     1|    308.0|
|   487|Farewell My Concu...|     5|    631.0|
|   852|    Boot, Das (1981)|     5|    116.0|
|   848|      Aladdin (1992)|     5|     95.0|
|   588|Angels in the Out...|     3|    709.0|
|   210|One Flew Over the...|     5|     57.0|
|   650|Miracle on 34th S...|     3|    330.0|
|   367|      Contact (1997)|     4|      1.0|
|   655|  Bitter Moon (1992)|     3|   1114.0|
|   207|Apocalypse Now (1...|     3|     90.0|
+------+--------------------+------+---------+
only showing top 10 rows



In [20]:
indexed.groupBy('title_new').count().orderBy('count', ascending=False).show(10,False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



## Step 5: Splitting the Dataset

Now that we have prepared the data for building the recommender model, 
we can split the dataset into training and test sets. We split it into a 75 to 25 
ratio to train the model and test its accuracy.

In [21]:
train,test=indexed.randomSplit([0.75,0.25])

In [22]:
train.count()

75178

In [23]:
test.count()

24822

## Step 6: Build and Train Recommender Model

We import the ALS function from the PySpark ml library and build the 
model on the training dataset. There are multiple hyperparameters 
that can be tuned to improve the performance of the model. Two of the 
important ones are nonnegative =‘True’ doesn’t create negative ratings in 
recommendations and coldStartStrategy=‘drop’ to prevent any NaN ratings 
predictions.

In [25]:
from pyspark.ml.recommendation import ALS

rec=ALS(maxIter=10,
        regParam=0.01,
        userCol='userId',
        itemCol='title_new', 
        ratingCol='rating', 
        nonnegative=True, 
        coldStartStrategy="drop")

In [26]:
rec_model=rec.fit(train)

## Step 7: Predictions and Evaluation on Test Data

The final part of the entire exercise is to check the performance of the 
model on unseen or test data. We use the transform function to make 
predictions on the test data and RegressionEvaluate to check the RMSE 
value of the model on test data

In [27]:
predicted_ratings = rec_model.transform(test)

In [28]:
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [30]:
predicted_ratings.orderBy(rand()).show(10)

+------+--------------------+------+---------+----------+
|userId|               title|rating|title_new|prediction|
+------+--------------------+------+---------+----------+
|   493|    Rock, The (1996)|     5|     17.0|  4.167579|
|   344|My Best Friend's ...|     4|    157.0| 2.7621906|
|   124|Die Hard: With a ...|     4|    201.0| 3.4011817|
|   621|Alice in Wonderla...|     4|    409.0| 3.3459125|
|   158|Quick and the Dea...|     4|    619.0| 3.7941067|
|   751|Snow White and th...|     4|    158.0| 4.2226357|
|   381|Unbearable Lightn...|     5|    365.0| 3.7957087|
|   276|   Braveheart (1995)|     5|     37.0|  4.352621|
|   254|Swiss Family Robi...|     4|    722.0| 2.9655714|
|   553|E.T. the Extra-Te...|     3|     33.0|  4.063393|
+------+--------------------+------+---------+----------+
only showing top 10 rows



In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction',labelCol='rating')
rmse = evaluator.evaluate(predicted_ratings)
print(rmse)

1.0199921693241498


The RMSE is not very high; we are making an error of one point in the 
actual rating and predicted rating. This can be improved further by tuning 
the model parameters and using the hybrid approach

## Step 8: Recommend Top Movies That Active User Might Like

After checking the performance of the model and tuning the hyperparameters, 
we can move ahead to recommend top movies to users that they have not 
seen and might like. The first step is to create a list of unique movies in the 
dataframe

In [35]:
unique_movies = indexed.select('title_new').distinct()
unique_movies.count()

1664

In [37]:
a = unique_movies.alias('a')

In [38]:
user_id = 85
watched_movies = indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()
watched_movies.count()

287

So, there are total of 287 unique movies out of 1,664 movies that this 
active user has already rated. So, we would want to recommend movies 
from the remaining 1,377 items. We now combine both the tables to find 
the movies that we can recommend by filtering null values from the joined 
table

In [39]:
b = watched_movies.alias('b')
total_movies = a.join(b, a.title_new == b.title_new, how='left')
total_movies.show(10,False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|305.0    |305.0    |
|596.0    |null     |
|299.0    |null     |
|769.0    |null     |
|692.0    |null     |
|934.0    |null     |
|1051.0   |null     |
|496.0    |null     |
|558.0    |558.0    |
|170.0    |null     |
+---------+---------+
only showing top 10 rows



In [41]:
remaining_movies = total_movies.where(col("b.title_new").isNull()).select(a.title_new).distinct()
remaining_movies.count()

1377

In [43]:
remaining_movies=remaining_movies.withColumn("userId", lit(int(user_id)))
remaining_movies.show(10,False)

+---------+------+
|title_new|userId|
+---------+------+
|596.0    |85    |
|299.0    |85    |
|769.0    |85    |
|692.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|496.0    |85    |
|170.0    |85    |
|184.0    |85    |
|576.0    |85    |
+---------+------+
only showing top 10 rows



In [44]:
recommendations = rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)

In [45]:
recommendations.show(5,False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1347.0   |85    |5.2065744 |
|1411.0   |85    |5.149152  |
|1277.0   |85    |4.931091  |
|1370.0   |85    |4.863292  |
|1289.0   |85    |4.7471066 |
+---------+------+----------+
only showing top 5 rows



In [47]:
movie_title = IndexToString(inputCol="title_new", outputCol="title",labels=model.labels)
final_recommendations=movie_title.transform(recommendations)

In [48]:
final_recommendations.show(10,False)

+---------+------+----------+--------------------------------------+
|title_new|userId|prediction|title                                 |
+---------+------+----------+--------------------------------------+
|1347.0   |85    |5.2065744 |Angel Baby (1995)                     |
|1411.0   |85    |5.149152  |Boys, Les (1997)                      |
|1277.0   |85    |4.931091  |Mina Tannenbaum (1994)                |
|1370.0   |85    |4.863292  |Harlem (1993)                         |
|1289.0   |85    |4.7471066 |World of Apu, The (Apur Sansar) (1959)|
|302.0    |85    |4.6862745 |Close Shave, A (1995)                 |
|1075.0   |85    |4.666614  |Man in the Iron Mask, The (1998)      |
|1518.0   |85    |4.59214   |Some Mother's Son (1996)              |
|1410.0   |85    |4.5367236 |Bitter Sugar (Azucar Amargo) (1996)   |
|1470.0   |85    |4.4922724 |Butcher Boy, The (1998)               |
+---------+------+----------+--------------------------------------+
only showing top 10 rows

