# Building a Recommendation System in PySpark - Lab

## Introduction

In this lab, we will implement a movie recommendation system using ALS in Spark programming environment. Spark's machine learning library `ml` comes packaged with a very efficient implementation of the ALS algorithm that we looked at in the previous lesson. The lab will require you to put into practice your Spark programming skills for creating and manipulating PySpark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and PySpark using the `MovieLens` dataset that we used in a previous lab.

Note: You are advised to refer to [PySpark documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 


## Objectives

In this lab you will: 

- Use Spark to train and cross-validate an ALS model 
- Introduce a new user with rating to a rating matrix and make recommendations for them 
- Create a function that will return the top n recommendations for a user 

## Building a Recommendation System

We have seen how recommendation systems have played an integral part in the success of Amazon (books, items), Pandora/Spotify (music), Google (news, search), YouTube (videos) etc.  For Amazon, these systems bring more than 30% of their total revenue. For Netflix, 75% of movies that people watch are based on some sort of recommendation.

> The goal of recommendation systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.


For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of a recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad 
2.    User B performs a search query for Game of Thrones 
3.    The system suggests Breaking Bad to user B from data collected about user A 


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the `MovieLens` dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Squares implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

### Importing the Data

* Initialize a `SparkSession` object
* Import the dataset found at `'./data/ratings.csv'` into a PySpark DataFrame

In [1]:
# import necessary libraries
from pyspark.sql import SparkSession 

# instantiate SparkSession object
# spark = SparkSession.builder.master("local").getOrCreate()
spark = SparkSession.builder.master('local').getOrCreate()


In [2]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('./data/ratings.csv', header="true", inferSchema="true")
movie_ratings.show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



Check the data types of each of the columns to ensure that they are a type that makes sense given the column.

In [3]:
movie_ratings.columns

['userId', 'movieId', 'rating', 'timestamp']

We aren't going to need the timestamp, so we can go ahead and remove that column.

In [4]:
movie_ratings = movie_ratings.drop('timestamp')
movie_ratings.show(3)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
+------+-------+------+
only showing top 3 rows



### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Import `ALS` from `pyspark.ml.recommendation` module 
* Use the `.randomSplit()` method on the pyspark DataFrame to separate the dataset into training and test sets
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the `userCol`, `itemCol`, and `ratingCol` to the appropriate columns given this dataset. Then fit the data to the training set and assign it to a variable model  

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS 

In [6]:
# split into training and testing sets
train, test = movie_ratings.randomSplit(weights=[0.8, 0.2], seed=1)

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')

# fit the ALS model to the training set
model = als.fit(train)

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* Import `RegressionEvalutor` from `pyspark.ml.evaluation` 
* Generate predictions with your model for the test set by using the `.transform()` method on your ALS model 
* Evaluate your model and print out the RMSE from your test set 

In [7]:
# importing appropriate library
from pyspark.ml.evaluation import RegressionEvaluator

In [8]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [9]:
#evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='rating')
evaluator = RegressionEvaluator(metricName='rmse', 
                                labelCol='rating',
                                predictionCol='prediction')

In [10]:
#base_rmse = evaluator.evaluate(predictions, {evaluator.metricName:"rmse"})
base_rmse = evaluator.evaluate(predictions)

print(f'Root-mean-square error = {base_rmse}')

Root-mean-square error = 0.8755214350833381


### Cross-validation to Find the Optimal Model

Let's now find the optimal values for the parameters of the ALS model. Use the built-in `CrossValidator` in PySpark with a suitable param grid and determine the optimal model. Try with the parameters:

* regularization = [0.01, 0.001, 0.1]
* rank = [4, 10, 50]


In [11]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [12]:
# initialize the ALS model
als_model = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')

In [13]:
# create the parameter grid              
params = ParamGridBuilder()\
                .addGrid(als_model.regParam, [0.01, 0.001, 0.1])\
                .addGrid(als_model.rank, [4, 10, 50]).build()

In [14]:
# instantiating crossvalidator estimator
cv = CrossValidator(estimator=als_model, estimatorParamMaps=params,evaluator=evaluator,parallelism=4)

In [15]:
best_model = cv.fit(movie_ratings)  

In [16]:
# We see the best model has a rank of 50, so we will use that in our future models with this dataset
best_model.bestModel.rank
### I did not get 50. My bestModel rank=4...

50

In [17]:
### try to remove .rank 
best_model.bestModel
### still not getting 50

ALSModel: uid=ALS_aa3802253eac, rank=50

In [18]:
### automatically select greatest from best_model.avgMetrics
import numpy as np
i = np.argmax(best_model.avgMetrics)
print(i)

params[i]
### There it is. best_model has params: regParam=0.001, rank=50

5


{Param(parent='ALS_aa3802253eac', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
 Param(parent='ALS_aa3802253eac', name='rank', doc='rank of the factorization'): 50}

In [19]:
#### I'm lazy so I want this in copy paste format
def best_params_easy(best_params):
    base = [i for i in best_params.keys()][0].parent.split('_')[0]
    
    strings=[]
    for tup in [(k.name, v) for k,v in best_params.items()]:
        left=tup[0]
        right=tup[1]
        string = left+'='+str(right)
        strings.append(string)
        
    for i in range(len(strings)):
        if i == 0:
            print_this = base+"("+strings[i]
        else:
            print_this = print_this+", "+strings[i]
            
    return print_this+")"   

best_params_easy(params[5])

'ALS(regParam=0.001, rank=50)'

### Incorporating the names of the movies

When we make recommendations, it would be ideal if we could have the actual name of the movie used rather than just an ID. There is another file called `'./data/movies.csv'` that contains all of the names of the movies matched up to the `movie_id` that we have in the ratings dataset.

* Import the data into a Spark DataFrame 
* Look at the first 5 rows

In [20]:
movie_titles = spark.read.csv('./data/movies.csv', header='true', inferSchema='true')

movie_titles.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

We will eventually be matching up the movie ids with the movie titles. In the cell below, create a function `name_retriever()` that takes in a `movie_id` and returns a string that represents the movie title. 

> Hint: It's possible to do this operation in one line with the `df.where()` or the `df.filter()` methods 

In [21]:
##### Experimenting how to extract title

In [22]:
movie_titles.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [23]:
movie_titles.where(movie_titles['movieId']==1).show()

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
+-------+----------------+--------------------+



In [24]:
movie_titles.where(movie_titles['movieId']==1).take(1)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')]

In [25]:
movie_titles.where(movie_titles['movieId']==1).take(1)[0]

Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy')

In [26]:
movie_titles.where(movie_titles['movieId']==1).take(1)[0].title

'Toy Story (1995)'

In [27]:
##### End extraction

In [28]:
def name_retriever(movie_id, movie_title_df):
    return movie_title_df.where(movie_title_df['movieId']==movie_id).take(1)[0].title

In [29]:
print(name_retriever(1023, movie_titles))

Winnie the Pooh and the Blustery Day (1968)


## Getting Recommendations

Now it's time to actually get some recommendations! The ALS model has built-in methods called `.recommendForUserSubset()` and `.recommendForAllUsers()`. We'll start off with using a subset of users. 

In [30]:
users = movie_ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

We can now see we have a list of rows with recommended items. Now try and get the name of the top recommended movie by way of the function you just created, using number one item for this user.

In [31]:
# use indexing to obtain the movie id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

# use the name retriever function to get the values
name_retriever(first_recommendation,movie_titles)

'The Big Bus (1976)'

Of course, you can also make recommendations for everyone, although this will take longer. In the next line, we are creating an RDD with the top 5 recommendations for every user and then selecting one user to find out his predictions:

In [32]:
recommendations = model.recommendForAllUsers(5)
recommendations.where(recommendations.userId == 3).collect()

[Row(userId=3, recommendations=[Row(movieId=70946, rating=5.7106852531433105), Row(movieId=6835, rating=4.943024158477783), Row(movieId=5746, rating=4.943024158477783), Row(movieId=5919, rating=4.943024158477783), Row(movieId=5181, rating=4.892704010009766)])]

In [33]:
########## experimenting with extracting recos in 'reader-friendly manner' for 
# function below

In [34]:
recos = recommendations.where(recommendations.userId == 3).collect()[0][1]
recos

[Row(movieId=70946, rating=5.7106852531433105),
 Row(movieId=6835, rating=4.943024158477783),
 Row(movieId=5746, rating=4.943024158477783),
 Row(movieId=5919, rating=4.943024158477783),
 Row(movieId=5181, rating=4.892704010009766)]

In [35]:
recos[0] # number 1 reco

Row(movieId=70946, rating=5.7106852531433105)

In [36]:
recos[0][0] # number 1 reco, movie_id

70946

In [37]:
recos[0][1] # number 1 reco, predicted_rating

5.7106852531433105

In [38]:
# n_recommendation = int
# movie_id = recos[n_recommendation][0]
# title = name_retriever(movie_id, movie_title_df)
# predicted_rating = recos[n_recommendation][1]

n_recommendation = 0
movie_id = recos[n_recommendation][0]
title = name_retriever(movie_id, movie_titles)
predicted_rating = recos[n_recommendation][1]

print(recos[0])
print()

print(f'Number {n_recommendation+1} recommendation:')
print(f'\tMovie: \t{title}')
print(f'\tPredicted rating: {predicted_rating}')

Row(movieId=70946, rating=5.7106852531433105)

Number 1 recommendation:
	Movie: 	Troll 2 (1990)
	Predicted rating: 5.7106852531433105


In [39]:
# try to convert to for loop to print all recos
for i in range(len(recos)):
    movie_id = recos[i][0]
    title = name_retriever(movie_id, movie_titles)
    predicted_rating = recos[i][1]
    
    print(f'Number {i+1} recommendation:')
    print(f'\tMovie: \t{title}')
    print(f'\tPredicted rating: {round(predicted_rating,2)}')
    

Number 1 recommendation:
	Movie: 	Troll 2 (1990)
	Predicted rating: 5.71
Number 2 recommendation:
	Movie: 	Alien Contamination (1980)
	Predicted rating: 4.94
Number 3 recommendation:
	Movie: 	Galaxy of Terror (Quest) (1981)
	Predicted rating: 4.94
Number 4 recommendation:
	Movie: 	Android (1982)
	Predicted rating: 4.94
Number 5 recommendation:
	Movie: 	Hangar 18 (1980)
	Predicted rating: 4.89


In [40]:
##########

### Getting Predictions for a New User

Now, it's time to put together all that you've learned in this section to create a function that will take in a new user and some movies they've rated and then return $n$ number of highest recommended movies. This function will have multiple different steps to it:

* Adding the new ratings into the DataFrame (hint: look into using the `.union()` method) 
* Fitting the ALS model  
* Make recommendations for the user of choice 
* Print out the names of the top $n$ recommendations in a reader-friendly manner 

The function should take in the parameters: 

* `user_id` : int 
* `new_ratings` : list of tuples in the format (user_id, item_id, rating)
* `rating_df` : spark DF containing ratings
* `movie_title_df` : spark DF containing movie titles
* `num_recs` : int

Rate new movies

```python
[Row(movieId=3253, title='Wayne's World (1992)', genres='Comedy'),
 Row(movieId=2459, title='Texas Chainsaw Massacre, The (1974)', genres='Horror'),
 Row(movieId=2513, title='Pet Sematary (1989)', genres='Horror'),
 Row(movieId=6502, title='28 Days Later (2002)', genres='Action|Horror|Sci-Fi'),
 Row(movieId=1091, title='Weekend at Bernie's (1989)', genres='Comedy'),
Row(movieId=441, title='Dazed and Confused (1993)', genres='Comedy'),
Row(movieId=370, title='Naked Gun 33 1/3: The Final Insult (1994)', genres='Action|Comedy')]

```

In [41]:
def new_user_recs(user_id, new_ratings, rating_df, movie_title_df, num_recs):  
    
    # turn the new_ratings list into a spark DataFrame
    # df = spark.createDataFrame(data, column_names)
    new_recos_df = spark.createDataFrame(new_ratings, rating_df.columns)
    
    # combine the new_recos_df df with the (existing) rating_df
    combine = rating_df.union(new_recos_df)
  
    # create an ALS model and fit it
    # remember to use the params from grid search
    als = ALS(regParam=0.001, 
                rank=50, 
                userCol='userId', 
                itemCol='movieId', 
                ratingCol='rating', 
                coldStartStrategy='drop')
    model = als.fit(combine)

    # make recommendations for all users using the recommendForAllUsers method
    recommendations = model.recommendForAllUsers(num_recs)

    # get recommendations specifically for the new user that has been added to the DataFrame
    recos = recommendations.where(recommendations.userId == user_id).collect()[0][1]  
    
    # use above format to extract n_recommendation, movie_id, title, and predicted rating
    for i in range(num_recs):
        movie_id = recos[i][0]
        title = name_retriever(movie_id, movie_title_df)
        predicted_rating = recos[i][1]

        print(f'Number {i+1} recommendation:')
        print(f'\tMovie: \t{title}')
        print(f'\tPredicted rating: {round(predicted_rating,2)}')

In [42]:
# try out your function with the movies listed above
user_id = 100000
user_ratings_1 = [(user_id,3253,5),
                  (user_id,2459,5),
                  (user_id,2513,4),
                  (user_id,6502,5),
                  (user_id,1091,5),
                  (user_id,441,4)]


new_user_recs(user_id,
             new_ratings=user_ratings_1,
             rating_df=movie_ratings,
             movie_title_df=movie_titles,
             num_recs = 10)

Number 1 recommendation:
	Movie: 	Back to the Future (1985)
	Predicted rating: 5.71
Number 2 recommendation:
	Movie: 	Princess Bride, The (1987)
	Predicted rating: 5.68
Number 3 recommendation:
	Movie: 	Hot Fuzz (2007)
	Predicted rating: 5.6
Number 4 recommendation:
	Movie: 	Inception (2010)
	Predicted rating: 5.59
Number 5 recommendation:
	Movie: 	Silence of the Lambs, The (1991)
	Predicted rating: 5.57
Number 6 recommendation:
	Movie: 	Prestige, The (2006)
	Predicted rating: 5.56
Number 7 recommendation:
	Movie: 	Young Frankenstein (1974)
	Predicted rating: 5.55
Number 8 recommendation:
	Movie: 	Singin' in the Rain (1952)
	Predicted rating: 5.53
Number 9 recommendation:
	Movie: 	Kingsman: The Secret Service (2015)
	Predicted rating: 5.52
Number 10 recommendation:
	Movie: 	Seven (a.k.a. Se7en) (1995)
	Predicted rating: 5.47


So here we have it! Our recommendation system is generating recommendations for the top 10 movies. 




## Level up (Optional)


* Create a user interface to allow users to easily choose items and get recommendations 

* Use IMDB links to scrape user reviews from IMDB and using basic NLP techniques, create extra embeddings for ALS model  

* Create a hybrid recommender system using features like genre 

## Summary

In this lab, you built a model using Spark, performed some parameter selection, and updated the model every time new user preferences came in. You looked at how Spark's ALS implementation can be used to build a scalable and efficient recommendation system. You also saw that such systems can become computationally expensive and using them with an online system could be a problem with traditional computational platforms. Spark's distributed computing architecture provides a great solution to deploy such recommendation systems for real-world applications (think Amazon, Spotify).

In [43]:
spark.stop()