In [74]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

/kaggle/input/movie-lens-small-latest-dataset/ratings.csv
/kaggle/input/movie-lens-small-latest-dataset/links.csv
/kaggle/input/movie-lens-small-latest-dataset/tags.csv
/kaggle/input/movie-lens-small-latest-dataset/README.txt
/kaggle/input/movie-lens-small-latest-dataset/movies.csv


In [75]:
! pip install pyspark
import pyspark



# Building a Recommendation System in PySpark - Lab

## Introduction

In this last lab, we will implement a a movie recommendation system using ALS in Spark programming environment. Spark's machine learning libraray `ml` comes packaged with a very efficient imeplementation of ALS algorithm that we looked at in the previous lesson. The lab will require you to put into pratice your spark programming skills for creating and manipulating pyspark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and pyspark using the MovieLens Dataset that we used in a previous lab.

Note: You are advised to refer to [PySpark Documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 


## Objectives

You will be able to:

* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark RDDs, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Building a Recommendation System

We have seen how recommender/Recommendation Systems have played an  integral parts in the success of Amazon (Books, Items), Pandora/Spotify (Music), Google (News, Search), YouTube (Videos) etc.  For Amazon these systems bring more than 30% of their total revenues. For Netflix service, 75% of movies that people watch are based on some sort of recommendation.

> The goal of Recommendation Systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.


For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad.
2.    User B performs a search query for Game of Thrones.
3.    The system suggests Breaking Bad to user B from data collected about user A.


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the MovieLens dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Saqures implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

### Importing the Data
To begin with:
* initialize a SparkSession object
* import the dataset found at './data/ratings.csv' into a pyspark DataFrame

In [76]:
# import necessary libraries
from pyspark.sql import SparkSession


# instantiate SparkSession object
spark = SparkSession.builder.master("local").getOrCreate()



In [77]:
# Set a path variable for data 
file = '/kaggle/input/movie-lens-small-latest-dataset/ratings.csv'
# Code here 
file

'/kaggle/input/movie-lens-small-latest-dataset/ratings.csv'

In [111]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv(file, header = 'true', inferSchema = 'true')

Check the data types of each of the values to ensure that they are a type that makes sense given the column.

In [79]:
movie_ratings.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'double'),
 ('timestamp', 'int')]

In [80]:
movie_ratings.head(5)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247),
 Row(userId=1, movieId=6, rating=4.0, timestamp=964982224),
 Row(userId=1, movieId=47, rating=5.0, timestamp=964983815),
 Row(userId=1, movieId=50, rating=5.0, timestamp=964982931)]

We aren't going to need the time stamp, so we can go ahead and remove that column.

In [127]:
movie_ratings = movie_ratings.drop('timestamp')

### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Import the ALS module from pyspark.ml.recommendation.
* Use the randomSplit method on the pyspark DataFrame to separate the dataset into a training and test set
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the userCol, itemCol, and ratingCol to the appropriate names given this dataset. Then fit the data to the training set and assign it to a variable model. 

In [82]:
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

help(ALS())

Help on ALS in module pyspark.ml.recommendation object:

class ALS(pyspark.ml.wrapper.JavaEstimator, pyspark.ml.param.shared.HasCheckpointInterval, pyspark.ml.param.shared.HasMaxIter, pyspark.ml.param.shared.HasPredictionCol, pyspark.ml.param.shared.HasRegParam, pyspark.ml.param.shared.HasSeed, pyspark.ml.util.JavaMLWritable, pyspark.ml.util.JavaMLReadable)
 |  Alternating Least Squares (ALS) matrix factorization.
 |  
 |  ALS attempts to estimate the ratings matrix `R` as the product of
 |  two lower-rank matrices, `X` and `Y`, i.e. `X * Yt = R`. Typically
 |  these approximations are called 'factor' matrices. The general
 |  approach is iterative. During each iteration, one of the factor
 |  matrices is held constant, while the other is solved for using least
 |  squares. The newly-solved factor matrix is then held constant while
 |  solving for the other factor matrix.
 |  
 |  This is a blocked implementation of the ALS factorization algorithm
 |  that groups the two sets of factor

In [83]:

# split into training and testing sets

training,testing = movie_ratings.randomSplit([.8,.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

als = ALS(maxIter = 5, rank = 4,userCol = 'userId', itemCol = 'movieId', coldStartStrategy = 'drop')

# fit the ALS model to the training set
model = als.fit(training)

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.
> 
* import `RegressionEvalutor` from pyspark.ml.evaluation
* generate predictions with your model for the test set by using the `transform` method on your ALS model
* evaluate your model and print out the RMSE from your test set

In [84]:
# importing appropriate library
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model by computing the RMSE on the test data

preds = model.transform(testing)
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')

rmse = evaluator.evaluate(preds)
rmse

0.8993699392546267

### Cross Validation to Find the Optimal Model

Let's now find the optimal values for the parameters of the ALS model. Use the built-in Cross Validator in pyspark with a suitable param grid and determine the optimal model. Try with the parameters:

* regularization = [0.01,0.001,0.1])
* rank = [4,10,50]



In [85]:
movie_ratings.head(5)

[Row(userId=1, movieId=1, rating=4.0),
 Row(userId=1, movieId=3, rating=4.0),
 Row(userId=1, movieId=6, rating=4.0),
 Row(userId=1, movieId=47, rating=5.0),
 Row(userId=1, movieId=50, rating=5.0)]

In [86]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# initialize the ALS model

als_1 = ALS(userCol = 'userId', itemCol= 'movieId', ratingCol = 'rating', coldStartStrategy = 'drop')

# create the parameter grid              

params = ParamGridBuilder().addGrid(als_1.regParam, [.01, .001, .1]).addGrid(als_1.rank, [4,10,50]).build()

## instantiating crossvalidator estimator

cv = CrossValidator(estimator = als_1, estimatorParamMaps = params, evaluator = evaluator, parallelism = 4)
best = cv.fit(movie_ratings)
# We see the best model has a rank of 50, so we will use that in our future models with this dataset

top_rank = best.bestModel.rank

In [87]:
top_rank

50

### Incorporating the names of the movies

When we make recommendations, it would be ideal if we could have the actual name of the movie be used rather than just an ID. There is another file called './data/movies.csv' that contains all of the names of the movies matched up to the movie_id that we have in the ratings dataset.

* import the data into a Spark DataFrame
* look at the first 5 rows

In [88]:
file_1 = '/kaggle/input/movie-lens-small-latest-dataset/movies.csv'

In [89]:
movie_titles = spark.read.csv(file_1, header = 'true', inferSchema = 'true')

movie_titles.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

We will eventually be matching up the movie_ids with the movie titles. In the cell below, create a function `name_retriever` that takes in a movie_id and returns a string that. 

> Hint: It's possible to do this operation in one line with the `df.where` or the `df.filter` method

In [90]:
movie_titles.where(movie_titles.movieId == 1023).take(1)[0]['title']

'Winnie the Pooh and the Blustery Day (1968)'

In [91]:
movie_ratings.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

In [92]:
def name_retriever(movie_id,movie_title_df):
    title = movie_title_df.where(movie_title_df.movieId == movie_id).take(1)[0]['title']
    return title

In [93]:
print(name_retriever(1023,movie_titles))

Winnie the Pooh and the Blustery Day (1968)


## Getting Recommendations

Now it's time to actually get some recommendations! The ALS model has built in methods called `recommendForUserSubset` and `recommendForAllUsers`. We'll start off with using a subset of users.

In [94]:
users = movie_ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

AnalysisException: "cannot resolve '`userId`' given input columns: [movieId, title, genres];;\n'Project ['userId]\n+- Relation[movieId#8207,title#8208,genres#8209] csv\n"

In [95]:
recs[0][1]

[Row(movieId=6818, rating=5.494475841522217),
 Row(movieId=4789, rating=5.1264801025390625),
 Row(movieId=8477, rating=5.039873123168945),
 Row(movieId=3379, rating=4.932860374450684),
 Row(movieId=136469, rating=4.907209396362305),
 Row(movieId=102217, rating=4.8987627029418945),
 Row(movieId=92494, rating=4.8987627029418945),
 Row(movieId=89904, rating=4.854540824890137),
 Row(movieId=148881, rating=4.851578235626221),
 Row(movieId=947, rating=4.824190139770508)]

In [96]:
recs

[Row(userId=148, recommendations=[Row(movieId=6818, rating=5.494475841522217), Row(movieId=4789, rating=5.1264801025390625), Row(movieId=8477, rating=5.039873123168945), Row(movieId=3379, rating=4.932860374450684), Row(movieId=136469, rating=4.907209396362305), Row(movieId=102217, rating=4.8987627029418945), Row(movieId=92494, rating=4.8987627029418945), Row(movieId=89904, rating=4.854540824890137), Row(movieId=148881, rating=4.851578235626221), Row(movieId=947, rating=4.824190139770508)])]

We can now see we have a list of rows with recommended items. Now try and get the name of the top recommended movie by way of the function you just created, using number one item for this user.

In [97]:
# use indexing to obtain the movie id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

# use the name retriever function to get the values
name_retriever(first_recommendation,movie_titles)

'Come and See (Idi i smotri) (1985)'

Of course, you can also make recommendations for everyone, although this will take longer. In the next line, we are creating an RDD with the top 5 recommendations for every user and then selecting one user to find out his predictions:

In [98]:
recommendations.where(recommendations.userId == 3).collect()[0][1][0][0]

158783

In [99]:
recommendations = model.recommendForAllUsers(5)
name_retriever(recommendations.where(recommendations.userId == 3).collect()[0][1][0][0], movie_titles)


'Skin I Live In, The (La piel que habito) (2011)'

### Getting Predictions for a New User

Now, it's time to put together all that you've learned in this section to create a function that will take in a new user and some movies they've rated and then return n number of highest recommended movies. This function will have multiple different steps to it:

* adding the new ratings into the dataframe (hint: look into using the union df method)
* fitting the als model to
* make recommendations for the user of choice
* print out the names of the top n recommendations in a reader-friendly manner

The function should take in the parameters:
* user_id : int 
* new_ratings : list of tuples in the format (user_id,item_id,rating)
* rating_df : spark DF containing ratings
* movie_title_df : spark DF containing movie titles
* num_recs : int

Rate these new movies

```python
[Row(movieId=3253, title="Wayne's World (1992)", genres='Comedy'), - 2
 Row(movieId=2459, title='Texas Chainsaw Massacre, The (1974)', genres='Horror'), - 1
 Row(movieId=2513, title='Pet Sematary (1989)', genres='Horror'), - 2
 Row(movieId=6502, title='28 Days Later (2002)', genres='Action|Horror|Sci-Fi'), - 1
 Row(movieId=1091, title="Weekend at Bernie's (1989)", genres='Comedy'), - 3
Row(movieId=441, title='Dazed and Confused (1993)', genres='Comedy'), - 4
Row(movieId=370, title='Naked Gun 33 1/3: The Final Insult (1994)', genres='Action|Comedy')] - 4

```

In [139]:
def new_user_recs(user_id,new_ratings,rating_df,movie_title_df,num_recs):
    # turn the new_recommendations list into a spark DataFrame
    new_ratings = spark.createDataFrame(new_ratings, rating_df.columns)
    
    # combine the new ratings df with the rating_df
    movie_ratings_updated = rating_df.union(new_ratings)
    
    # create an ALS model and fit it
    als = ALS(maxIter = 5, rank = top_rank, regParam = .01, userCol = 'userId', itemCol = 'movieId', coldStartStrategy = 'drop')
    
    model = als.fit(movie_ratings_updated)
    # make recommendations for all users using the recommendForAllUsers method

    recommendations = model.recommendForAllUsers(num_recs)
    # get recommendations specifically for the new user that has been added to the DataFrame
    
    print (recommendations)
    
    recs_for_new_user = recommendations.where(recommendations.userId == user_id).take(1)
    
    for ranking, (movie_id, rating) in enumerate(recs_for_new_user[0]['recommendations']):
        title = name_retriever(movie_id, movie_title_df)
        print ('Recommended {} - Movie: {}. Score = {}'.format(ranking + 1, title, rating))
        

In [140]:
movie_ratings.columns

['userId', 'movieId', 'rating']

In [141]:
# try out your function with the movies listed above

user_id = 100000
ratings_1 = [(user_id, 3253, 3),(user_id, 2459, 1), (user_id, 2513, 1), (user_id, 6502, 1), (user_id, 1091, 3) , (user_id, 441, 5)]
new_user_recs(user_id, new_ratings = ratings_1, rating_df = movie_ratings, movie_title_df = movie_titles, num_recs = 10)

DataFrame[userId: int, recommendations: array<struct<movieId:int,rating:float>>]
Recommended 1 - Movie: Dazed and Confused (1993). Score = 4.988752841949463
Recommended 2 - Movie: Sixteen Candles (1984). Score = 3.6716043949127197
Recommended 3 - Movie: Ferris Bueller's Day Off (1986). Score = 3.632672071456909
Recommended 4 - Movie: Young Frankenstein (1974). Score = 3.626786947250366
Recommended 5 - Movie: Big Chill, The (1983). Score = 3.514853000640869
Recommended 6 - Movie: Verdict, The (1982). Score = 3.4933922290802
Recommended 7 - Movie: M*A*S*H (a.k.a. MASH) (1970). Score = 3.4788670539855957
Recommended 8 - Movie: Goonies, The (1985). Score = 3.4779281616210938
Recommended 9 - Movie: Mary Poppins (1964). Score = 3.4263033866882324
Recommended 10 - Movie: Blood Simple (1984). Score = 3.423994541168213


So here we have it! Our recommendation system is generating recommendations for the top 10 movies. 




## Level up - Optional 


* Create a user interface to allow users to easily choose items and get recommendations.

* Use IMDB links to scrape user reviews from IMDB and using basic NLP techniques, create extra embeddings for ALS model. 

* Create a hybrid recommender system using features like genre

## Summarym

In this lab, we learned how to build a model using Spark, how to perform some parameter selection, and how to update the model every time that new user preferences come in. We looked at how Spark's ALS implementation can be be used to build a scalable and efficient recommendation system. We also saw that such systems can become computationally expensive and using them with an online system could be a problem with traditional computational platforms. Spark's distributed computing architecture provides a great solution to deploy such recommendation systems for real world applications (think Amazon, Spotify).

