# Building a Recommendation System in PySpark - Lab

## Introduction

In this lab, we will implement a movie recommendation system using ALS in Spark programming environment. Spark's machine learning library `ml` comes packaged with a very efficient implementation of the ALS algorithm that we looked at in the previous lesson. The lab will require you to put into practice your Spark programming skills for creating and manipulating PySpark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and PySpark using the `MovieLens` dataset that we used in a previous lab.

Note: You are advised to refer to [PySpark documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 


## Objectives

In this lab you will: 

- Use Spark to train and cross-validate an ALS model 
- Introduce a new user with rating to a rating matrix and make recommendations for them 
- Create a function that will return the top n recommendations for a user 

## Building a Recommendation System

We have seen how recommendation systems have played an integral part in the success of Amazon (books, items), Pandora/Spotify (music), Google (news, search), YouTube (videos) etc.  For Amazon, these systems bring more than 30% of their total revenue. For Netflix, 75% of movies that people watch are based on some sort of recommendation.

> The goal of recommendation systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.


For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of a recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad 
2.    User B performs a search query for Game of Thrones 
3.    The system suggests Breaking Bad to user B from data collected about user A 


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the `MovieLens` dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Squares implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

### Importing the Data

* Initialize a `SparkSession` object
* Import the dataset found at `'./data/ratings.csv'` into a PySpark DataFrame

In [5]:
import pandas as pd
import numpy as np
df = pd.read_csv('panda_dataframes/user_rating_3col.csv')

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 904873 entries, 0 to 904872
Data columns (total 3 columns):
film_id     904873 non-null int64
username    904873 non-null object
rating      579707 non-null float64
dtypes: float64(1), int64(1), object(1)
memory usage: 20.7+ MB


In [11]:
threshold = 100 # Anything that occurs less than this will be removed.
value_counts = df['film_id'].value_counts() # Specific column 
to_remove = value_counts[value_counts <= threshold].index
df['film_id'].replace(to_remove, np.nan, inplace=True)

In [12]:
df.dropna(inplace=True)

In [13]:
df['film_id'] = df['film_id'].astype(int)
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 237964 entries, 4 to 904861
Data columns (total 3 columns):
film_id     237964 non-null int64
username    237964 non-null object
rating      237964 non-null float64
dtypes: float64(1), int64(1), object(1)
memory usage: 7.3+ MB


In [14]:
df['film_id'].value_counts()

426406    768
406775    607
475370    583
326279    568
353117    550
459564    546
51444     523
404266    519
397859    512
95113     507
34722     495
460830    495
51432     491
422682    490
251943    485
240344    484
433863    482
444600    478
51921     474
149857    468
51568     468
51896     466
41352     463
259441    455
114564    453
424348    453
333029    451
312205    450
438511    448
171384    447
         ... 
40276     102
45263     102
205114    102
43015     102
15077     102
51472     102
69453     102
46084     102
318913    102
19921     102
47062     102
353273    102
266628    102
635253    102
47345     102
46778     102
174195    101
153285    101
50953     101
44920     101
46263     101
47913     101
49886     101
51163     101
345176    101
276291    101
51470     101
50224     101
50390     101
422900    101
Name: film_id, Length: 1237, dtype: int64

In [15]:
# import necessary libraries
from pyspark.sql import SparkSession

# instantiate SparkSession object
# spark = SparkSession.builder.master('local').getOrCreate()

spark = SparkSession\
        .builder\
        .appName('ALSExample').config('spark.driver.host', 'localhost')\
        .getOrCreate()

In [18]:
df = df.assign(id=(df['username'].astype('category').cat.codes))

movie_ratings = spark.createDataFrame(data=df)


Check the data types of each of the columns to ensure that they are a type that makes sense given the column.

In [19]:
movie_ratings.dtypes

[('film_id', 'bigint'),
 ('username', 'string'),
 ('rating', 'double'),
 ('id', 'bigint')]

We aren't going to need the timestamp, so we can go ahead and remove that column.

In [4]:
# movie_ratings = movie_ratings.drop('timestamp')

### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Import `ALS` from `pyspark.ml.recommendation` module 
* Use the `.randomSplit()` method on the pyspark DataFrame to separate the dataset into training and test sets
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the `userCol`, `itemCol`, and `ratingCol` to the appropriate columns given this dataset. Then fit the data to the training set and assign it to a variable model  

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

# split into training and testing sets
(training, test) = movie_ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5,rank=4, regParam=0.01, userCol='id', itemCol='film_id', ratingCol='rating',
          coldStartStrategy='drop')

# fit the ALS model to the training set
model = als.fit(training)

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* Import `RegressionEvalutor` from `pyspark.ml.evaluation` 
* Generate predictions with your model for the test set by using the `.transform()` method on your ALS model 
* Evaluate your model and print out the RMSE from your test set 

In [22]:
# importing appropriate library
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('Root-mean-square error = ' + str(rmse))


Root-mean-square error = 0.7105219401355057


### Cross-validation to Find the Optimal Model

Let's now find the optimal values for the parameters of the ALS model. Use the built-in `CrossValidator` in PySpark with a suitable param grid and determine the optimal model. Try with the parameters:

* regularization = [0.01, 0.001, 0.1]
* rank = [4, 10, 50]


In [24]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# initialize the ALS model
als_model = ALS(userCol='id', itemCol='film_id', 
                ratingCol='rating', coldStartStrategy='drop')

# create the parameter grid                 
params = ParamGridBuilder()\
          .addGrid(als_model.regParam, [0.01, 0.001, 0.1])\
          .addGrid(als_model.rank, [4, 10, 50]).build()


# instantiating crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params,evaluator=evaluator,parallelism=4)
best_model = cv.fit(movie_ratings)    

# We see the best model has a rank of 50, so we will use that in our future models with this dataset
best_model.bestModel.rank


50

### Incorporating the names of the movies

When we make recommendations, it would be ideal if we could have the actual name of the movie used rather than just an ID. There is another file called `'./data/movies.csv'` that contains all of the names of the movies matched up to the `movie_id` that we have in the ratings dataset.

* Import the data into a Spark DataFrame 
* Look at the first 5 rows

In [25]:
movie_titles = spark.read.csv('panda_dataframes/letterboxd_film_data_director.csv',
                              header='true',inferSchema='true')

movie_titles.head(5)

[Row(film_name='Parasite', lb_id=426406, lb_link='https://letterboxd.com//film/parasite-2019/', tmdb_id=496243, movie_tv='movie', release_year='2019', director='Bong Joon-ho'),
 Row(film_name='Joker', lb_id=406775, lb_link='https://letterboxd.com//film/joker-2019/', tmdb_id=475557, movie_tv='movie', release_year='2019', director='Todd Phillips'),
 Row(film_name='Knives Out', lb_id=475370, lb_link='https://letterboxd.com//film/knives-out-2019/', tmdb_id=546554, movie_tv='movie', release_year='2019', director='Rian Johnson'),
 Row(film_name='Pulp Fiction', lb_id=51444, lb_link='https://letterboxd.com//film/pulp-fiction/', tmdb_id=680, movie_tv='movie', release_year='1994', director='Quentin Tarantino'),
 Row(film_name='Inception', lb_id=34722, lb_link='https://letterboxd.com//film/inception/', tmdb_id=27205, movie_tv='movie', release_year='2010', director='Christopher Nolan')]

We will eventually be matching up the movie ids with the movie titles. In the cell below, create a function `name_retriever()` that takes in a `movie_id` and returns a string that represents the movie title. 

> Hint: It's possible to do this operation in one line with the `df.where()` or the `df.filter()` methods 

In [87]:
movie_ratings.head(5)

[Row(film_id=460155, username='xbrookelee', rating=4.5, id=1106),
 Row(film_id=441862, username='xbrookelee', rating=3.0, id=1106),
 Row(film_id=268380, username='xbrookelee', rating=4.0, id=1106),
 Row(film_id=318913, username='xbrookelee', rating=5.0, id=1106),
 Row(film_id=228628, username='xbrookelee', rating=5.0, id=1106)]

In [89]:
movie_titles.head(5)

[Row(film_name='Parasite', lb_id=426406, lb_link='https://letterboxd.com//film/parasite-2019/', tmdb_id=496243, movie_tv='movie', release_year='2019', director='Bong Joon-ho'),
 Row(film_name='Joker', lb_id=406775, lb_link='https://letterboxd.com//film/joker-2019/', tmdb_id=475557, movie_tv='movie', release_year='2019', director='Todd Phillips'),
 Row(film_name='Knives Out', lb_id=475370, lb_link='https://letterboxd.com//film/knives-out-2019/', tmdb_id=546554, movie_tv='movie', release_year='2019', director='Rian Johnson'),
 Row(film_name='Pulp Fiction', lb_id=51444, lb_link='https://letterboxd.com//film/pulp-fiction/', tmdb_id=680, movie_tv='movie', release_year='1994', director='Quentin Tarantino'),
 Row(film_name='Inception', lb_id=34722, lb_link='https://letterboxd.com//film/inception/', tmdb_id=27205, movie_tv='movie', release_year='2010', director='Christopher Nolan')]

In [92]:
def username_retriever(user_id, rating_df):
    return rating_df.where(rating_df.id == user_id).take(1)[0]['username']

In [123]:
def id_retriever(username, rating_df):
    return rating_df.where(rating_df.username == username).take(1)[0]['id']

In [32]:
def name_retriever(movie_id, movie_title_df):
    return movie_title_df.where(movie_title_df.film_id == movie_id).take(1)[0]['title']

In [33]:
print(name_retriever(51444, movie_titles))

Pulp Fiction


In [94]:
print(username_retriever(1106, movie_ratings))

xbrookelee


In [124]:
print(id_retriever('xbrookelee', movie_ratings))

1106


## Getting Recommendations

Now it's time to actually get some recommendations! The ALS model has built-in methods called `.recommendForUserSubset()` and `.recommendForAllUsers()`. We'll start off with using a subset of users. 

In [34]:
users = movie_ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

We can now see we have a list of rows with recommended items. Now try and get the name of the top recommended movie by way of the function you just created, using number one item for this user.

In [35]:
# use indexing to obtain the movie id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

# use the name retriever function to get the values
name_retriever(first_recommendation,movie_titles)

'Columbus'

Of course, you can also make recommendations for everyone, although this will take longer. In the next line, we are creating an RDD with the top 5 recommendations for every user and then selecting one user to find out his predictions:

In [136]:
recommendations = model.recommendForAllUsers(5)
first_five = recommendations.where(recommendations.id == 3).collect()

In [59]:
[Row(id=3, recommendations=[Row(film_id=43015, rating=4.676036357879639), 
                            Row(film_id=51700, rating=4.672168731689453), 
                            Row(film_id=37004, rating=4.65564489364624), 
                            Row(film_id=36192, rating=4.6344380378723145), 
                            Row(film_id=426406, rating=4.625179290771484)])]

[Row(id=3, recommendations=[Row(film_id=43015, rating=4.676036357879639), Row(film_id=51700, rating=4.672168731689453), Row(film_id=37004, rating=4.65564489364624), Row(film_id=36192, rating=4.6344380378723145), Row(film_id=426406, rating=4.625179290771484)])]

In [137]:
test = first_five[0]

In [138]:
test = test[1]
test


[Row(film_id=43015, rating=4.676036357879639),
 Row(film_id=51700, rating=4.672168731689453),
 Row(film_id=37004, rating=4.65564489364624),
 Row(film_id=36192, rating=4.6344380378723145),
 Row(film_id=426406, rating=4.625179290771484)]

In [140]:
test[0][1]

4.676036357879639

In [147]:
def first_five_rec(username):
    user_id = id_retriever(username, movie_ratings)
    recommendations = model.recommendForAllUsers(5)
    first_five = recommendations.where(recommendations.id == user_id).collect()
    temp = first_five[0]
    temp = temp[1]
    
    print(f'First 5 Recommendations for {username}: \n')
    for x in range(len(temp)):
        print(f'{name_retriever(temp[x][0], movie_titles)}, | predicted score = {round(temp[x][1], 2)}')

In [148]:
first_five_rec('4dollarshrimp')

First 5 Recommendations for 4dollarshrimp: 

Harakiri, | predicted score = 4.68
12 Angry Men, | predicted score = 4.67
Stop Making Sense, | predicted score = 4.66
Come and See, | predicted score = 4.63
Parasite, | predicted score = 4.63


### Getting Predictions for a New User

Now, it's time to put together all that you've learned in this section to create a function that will take in a new user and some movies they've rated and then return $n$ number of highest recommended movies. This function will have multiple different steps to it:

* Adding the new ratings into the DataFrame (hint: look into using the `.union()` method) 
* Fitting the ALS model  
* Make recommendations for the user of choice 
* Print out the names of the top $n$ recommendations in a reader-friendly manner 

The function should take in the parameters: 

* `user_id` : int 
* `new_ratings` : list of tuples in the format (user_id, item_id, rating)
* `rating_df` : spark DF containing ratings
* `movie_title_df` : spark DF containing movie titles
* `num_recs` : int

Rate new movies

```python
[Row(movieId=3253, title='Wayne's World (1992)', genres='Comedy'),
 Row(movieId=2459, title='Texas Chainsaw Massacre, The (1974)', genres='Horror'),
 Row(movieId=2513, title='Pet Sematary (1989)', genres='Horror'),
 Row(movieId=6502, title='28 Days Later (2002)', genres='Action|Horror|Sci-Fi'),
 Row(movieId=1091, title='Weekend at Bernie's (1989)', genres='Comedy'),
Row(movieId=441, title='Dazed and Confused (1993)', genres='Comedy'),
Row(movieId=370, title='Naked Gun 33 1/3: The Final Insult (1994)', genres='Action|Comedy')]

```

In [81]:
def new_user_recs(user_id, new_ratings, rating_df, movie_title_df, num_recs):

    # turn the new_recommendations list into a spark DataFrame
    new_user_ratings = spark.createDataFrame(new_ratings,rating_df.columns)
    
    # combine the new ratings df with the rating_df
    movie_ratings_combined = rating_df.union(new_user_ratings)
    
    # split the dataframe into a train and test set
#     (training, test) = movie_ratings_combined.randomSplit([0.8, 0.2],seed=0)
    
    # create an ALS model and fit it
    als = ALS(maxIter=5,rank=50, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
    model = als.fit(movie_ratings_combined)
    
    # make recommendations for all users using the recommendForAllUsers method
    recommendations = model.recommendForAllUsers(num_recs)
    
    # get recommendations specifically for the new user that has been added to the DataFrame
    recs_for_user = recommendations.where(recommendations.userId == user_id).take(1)
    
    for ranking, (movie_id, rating) in enumerate(recs_for_user[0]['recommendations']):
        movie_string = name_retriever(movie_id,movie_title_df)
        print('Recommendation {}: {}  | predicted score :{}'.format(ranking+1,movie_string,rating))
        

In [82]:
# try out your function with the movies listed above




Recommendation 1: Star Wars: Episode IV - A New Hope (1977)  | predicted score :5.517341136932373
Recommendation 2: Usual Suspects, The (1995)  | predicted score :5.442122936248779
Recommendation 3: In the Name of the Father (1993)  | predicted score :5.3851237297058105
Recommendation 4: Star Wars: Episode V - The Empire Strikes Back (1980)  | predicted score :5.381286144256592
Recommendation 5: Fight Club (1999)  | predicted score :5.361552715301514
Recommendation 6: Monty Python and the Holy Grail (1975)  | predicted score :5.347217559814453
Recommendation 7: Willy Wonka & the Chocolate Factory (1971)  | predicted score :5.328979969024658
Recommendation 8: Who Framed Roger Rabbit? (1988)  | predicted score :5.324649810791016
Recommendation 9: Clerks (1994)  | predicted score :5.305201530456543
Recommendation 10: Office Space (1999)  | predicted score :5.297811985015869


So here we have it! Our recommendation system is generating recommendations for the top 10 movies. 




## Level up (Optional)


* Create a user interface to allow users to easily choose items and get recommendations 

* Use IMDB links to scrape user reviews from IMDB and using basic NLP techniques, create extra embeddings for ALS model  

* Create a hybrid recommender system using features like genre 

## Summary

In this lab, you built a model using Spark, performed some parameter selection, and updated the model every time new user preferences came in. You looked at how Spark's ALS implementation can be used to build a scalable and efficient recommendation system. You also saw that such systems can become computationally expensive and using them with an online system could be a problem with traditional computational platforms. Spark's distributed computing architecture provides a great solution to deploy such recommendation systems for real-world applications (think Amazon, Spotify).