# Building a Movie Recommendation System in PySpark - Lab Code-along
![images of vhs tapes on shelf](img/movies.jpg)

## Introduction

In this last lab, we will implement a a movie recommendation system using Alternating Least Squares (ALS) in Spark programming environment.<br> Spark's machine learning libraray `ml` comes packaged with a very efficient imeplementation of ALS algorithm. 

The lab will require you to put into pratice your spark programming skills for creating and manipulating pyspark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and pyspark using the MovieLens Dataset.

Note: You are advised to refer to [PySpark Documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 

## Objectives

You will be able to:

* Identify the key components of the ALS 
* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark DataFrame, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Building a Recommendation System

We have seen how recommender/Recommendation Systems have played an  integral parts in the success of Amazon (Books, Items), Pandora/Spotify (Music), Google (News, Search), YouTube (Videos) etc.  For Amazon these systems bring more than 30% of their total revenues. For Netflix service, 75% of movies that people watch are based on some sort of recommendation.

> The goal of Recommendation Systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.

### We sort of get the concept

For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad.
2.    User B performs a search query for Game of Thrones.
3.    The system suggests Breaking Bad to user B from data collected about user A.


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the MovieLens dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Saqures implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

## Will Nightengale like Toy Story?

Collaborative filtering and matrix decomposition allows us to use the history of others ratings, along with the entire community of ratings, to answer that question.

![image1](img/collab.png)


## Person vs vegetable

It's important to realize that there are two sides to recommendation

![image2](img/item_user_based.png)

## Code for model

If we wanted, we could jump to the code right now to make this happen.

But would we understand it?
```python
from pyspark.ml.recommendation import ALS

als = ALS(
    rank=10,
    maxIter=10,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
)

als_model = als.fit(movie_ratings)
```

## Documentation Station

Let's explore the [documentation](http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#module-pyspark.ml.recommendation) together to maybe get a better idea of what is happening. 

- which parameters make sense?
- which are completely foreign?

## Rank

What's all this rank of the factorization business?<br>
[the source code documentation](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala) describes that variable as the "Rank of the feature matrices"

## Assumptions

Matrix decomposition is built on the theory that every individual (user, movie) score is actually the **dot product** of two separate vectors:
- user characteristics 
- movie characteristics

Wait, do you mean like gender, whether the movie is sci-fi or action? do we have that data?

![beyonce-gif](img/beyonce.gif)

## The hidden matrices 
![image4](img/matrix_decomp.png)

## Embeddings

Embeddings are low dimensional hidden factors for items and users.

For e.g. say we have 5 dimensional (i.e., **rank** = 5) embeddings for both items and users (5 chosen randomly, this could be any number - as we saw with PCA and dim. reduction).

For user-X & movie-A, we can say those 5 numbers might represent 5 different characteristics about the movie e.g.:

- How much movie-A is political
- How recent is the movie
- How much special effects are in movie A
- How dialogue driven is the movie
- How linear is the narrative in the movie

In a similar way, 5 numbers in the user embedding matrix might represent:

- How much does user-X like sci-fi movies
- How much does user-X like recent movies … and so on.

But we have *no actual idea* what those factors actually represent.

### If we knew the feature embeddings in advance, it would look something like this:

In [1]:
import numpy as np

# the original matrix of rankings
R = np.array([[2, np.nan, np.nan, 1, 4],
              [5, 1, 2, np.nan, 2],
              [3, np.nan, np.nan, 3, np.nan],
              [1, np.nan, 4, 2, 1]])

# users X factors
P = np.array([[-0.63274434,  1.33686735, -1.55128517],
              [-2.23813661,  0.5123861,  0.14087293],
              [-1.0289794,  1.62052691,  0.21027516],
              [-0.06422255,  1.62892864,  0.33350709]])

# factors X items
Q = np.array([[-2.09507374,  0.52351075,  0.01826269],
              [-0.45078775, -0.07334991,  0.18731052],
              [-0.34161766,  2.46215058, -0.18942263],
              [-1.0925736,  1.04664756,  0.69963111],
              [-0.78152923,  0.89189076, -1.47144019]])

What about that `np.nan` in the third row, last column? How will that item be reviewed by that user?

In [2]:
print(P[2])
print(Q.T[:,4])
P[2].dot(Q.T[:,4])

[-1.0289794   1.62052691  0.21027516]
[-0.78152923  0.89189076 -1.47144019]


1.9401031341455333

## Wait, I saw a transpose in there - what's the actual formula?

Terms:<br>
$R$ is the full user-item rating matrix

$P$ is a matrix that contains the users and the k factors represented as (user,factor)

$Q^T$ is a matrix that contains the items and the k factors represented as

$r̂_{u,i}$ represents our prediction for the true rating $r_{ui}$ In order to get an individual rating, you must take the dot product of a row of P and a column of Q

for the entire matrix:
$$ R = PQ^T $$ 

or for individual ratings

$$r̂_{u,i}=q_i^⊤p_u $$ 





### Let's get the whole matrix!

In [3]:
P @ Q.T

array([[ 1.99717984, -0.10339773,  3.80157388,  1.00522135,  3.96947118],
       [ 4.95987359,  0.99772807,  1.9994742 ,  3.08017572,  1.99887552],
       [ 3.00799117,  0.38437256,  4.30166793,  2.96747131,  1.94010313],
       [ 0.99340337, -0.02806164,  3.96943336,  2.00841398,  1.01228247]])

### Look at those results

Are they _exactly_ correct?
![check](img/check.gif)

## ALS benefit: Loss Function

The Loss function $L$ can be calculated as:

$$ L = \sum_{u,i ∈ \kappa}(r_{u,i}− q_i^T p_u)^2 + λ( ||q_i||^2 + |p_u||^2)$$

Where $\kappa$ is the set of (u,i) pairs for which $r_{u,i}$ is known.

To avoid overfitting, the loss function also includes a regularization parameter $\lambda$. We will choose a $\lambda$ to minimize the square of the difference between all ratings in our dataset $R$ and our predictions.

There's the **least squares** part of ALS, got it!

## So now we use gradient descent, right?

![incorrect](img/incorrect.gif)

### Here comes the alternating part

ALS alternates between holding the $q_i$'s constant and the $p_u$'s constant. 

While all $q_i$'s are held constant, each $p_u$ is computed by solving the least squared problem.<br>
After that process has taken place, all the $p_u$'s are held constant while the $q_i$'s are altered to solve the least squares problem, again, each independently.<br> 
This process repeats many times until you've reached convergence (ideally).

### Changing Loss function:

First let's assume first the item vectors are fixed, we first solve for the user vectors:

$$p_u=(\sum{r{u,i}\in r_{u*}}{q_iq_i^T + \lambda I_k})^{-1}\sum_{r_{u,i}\in r_{u*}}{r_{ui}{q_{i}}}$$__
Then we hold the user vectors constant and solve for the item vectors

$$q_i=(\sum{r{u,i}\in r_{i*}}{p_up_u^T + \lambda I_k})^{-1}\sum_{r_{u,i}\in r_{u*}}{r_{ui}{p_{u}}}$$__
This process repeats until convergence

# Review
What levers do we have available to adjust?
![lever-choice](img/levers.jpeg)

- Pros and cons of large rank?
- Pros and cons of lambda size?
- Iterations?

# Enough - let's get to the data

### Importing the Data
To begin with:
* initialize a SparkSession object
* import the dataset found at './data/ratings.csv' into a pyspark DataFrame

In [None]:
import pyspark
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.executor.memory', '1g')
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:

sc._conf.getAll()

In [37]:
!ls data/

movies.csv  ratings.csv


In [38]:
!head data/ratings.csv

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041


In [39]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('data/ratings.csv',
                               inferSchema=True,
                               header=True)

Check the data types of each of the values to ensure that they are a type that makes sense given the column.

In [40]:
import numpy as np
np.log(movie_ratings.count())

11.521250713716219

In [41]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



But if they were ever incorrectly assigned, here's how we fix it:

In [42]:
from pyspark.sql.types import *  # noqa

In [43]:
schema = StructType(
    [
        StructField('userId', IntegerType()),
        StructField('movieId', IntegerType()),
        StructField('rating', FloatType()),
        StructField('timestamp', LongType()),
    ]
)

In [44]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv('data/ratings.csv',
                               inferSchema=False,
                               schema=schema,
                               header=True)

In [45]:
movie_ratings.persist()

DataFrame[userId: int, movieId: int, rating: float, timestamp: bigint]

In [46]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)



In [47]:
movie_ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



We aren't going to need the time stamp, so we can go ahead and remove that column.

In [48]:
movie_ratings = movie_ratings.drop('timestamp')

### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Use the randomSplit method on the pyspark DataFrame to separate the dataset into a training and test set
* Import the ALS module from pyspark.ml.recommendation.
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the userCol, itemCol, and ratingCol to the appropriate names given this dataset. Then fit the data to the training set and assign it to a variable model. 

In [49]:
# split into training and testing sets
# How would we do that?
train, test = movie_ratings.randomSplit([.8, .2])

In [50]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

als = None
als = ALS(
    rank=11,
    maxIter=25,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop'
)

In [51]:
# Build the recommendation model using ALS on the training data
# fit the ALS model to the training set

als_model = als.fit(train)

Py4JJavaError: An error occurred while calling o439.fit.
: org.apache.spark.SparkException: Job 25 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2128)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2041)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1031)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:676)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:658)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:658)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:569)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [59]:
als_model.rank

10

Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* import `RegressionEvalutor` from pyspark.ml.evaluation ([documentation](http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator)
* generate predictions with your model for the test set by using the `transform` method on your ALS model
* evaluate your model and print out the RMSE from your test set [options for evaluating regressors](http://spark.apache.org/docs/2.4.3/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator.metricName)

In [19]:
predictions = als_model.transform(movie_ratings)

In [20]:
predictions.persist()

DataFrame[userId: int, movieId: int, rating: float, prediction: float]

In [72]:
movie_ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [21]:
predictions.show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   191|    148|   5.0| 4.9189696|
|   133|    471|   4.0|  3.413821|
|   597|    471|   2.0| 3.4794183|
|   385|    471|   4.0| 3.2651532|
|   436|    471|   3.0| 3.3713143|
|   602|    471|   4.0| 3.6438956|
|    91|    471|   1.0| 2.9102547|
|   409|    471|   3.0| 4.0372863|
|   372|    471|   3.0|  3.303689|
|   599|    471|   2.5| 3.0331006|
+------+-------+------+----------+
only showing top 10 rows



In [26]:
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print("Root-mean-square error = " + str(rmse))
print("R-squared = " + str(evaluator.evaluate(predictions, {evaluator.metricName: "r2"})))

Root-mean-square error = 0.6476749255692013
R-squared = 0.6130579538680968


In [40]:
user_factors = als_model.userFactors

In [74]:
user_factors.show(10)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.70180035, -1....|
| 20|[-0.22872846, -0....|
| 30|[-0.27222154, -0....|
| 40|[0.19280842, -0.6...|
| 50|[0.5314365, 0.027...|
| 60|[-0.15965079, -0....|
| 70|[0.26112306, -0.3...|
| 80|[-0.33152246, -0....|
| 90|[0.173722, 0.1178...|
|100|[-0.5818016, -0.6...|
+---+--------------------+
only showing top 10 rows



In [33]:
item_factors = als_model.itemFactors

In [75]:
item_factors.show(10)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.5837987, -0.2...|
| 20|[0.6320113, -0.39...|
| 30|[1.6145953, -0.46...|
| 50|[0.38214648, -0.3...|
| 60|[-0.53862923, 0.0...|
| 70|[0.3759784, 0.008...|
| 80|[0.46197715, -0.4...|
|100|[-0.35177737, 0.0...|
|110|[-0.30913052, -0....|
|140|[-0.068227984, -0...|
+---+--------------------+
only showing top 10 rows



### Important Question

Will Billy like movie m?

In [42]:
import numpy as np

In [43]:
billy_row = user_factors[user_factors['id'] == 10].first()
billy_factors = np.array(billy_row['features'])

In [77]:
pulp_fiction_row = item_factors[item_factors['id'] == 296].first()
pulp_fiction_factors = np.array(m_row['features'])

In [45]:
billy_factors

array([-0.70180035, -1.16532791,  0.48343414,  1.1524992 ,  0.23219989,
        0.45118707,  0.91564375,  0.25628972, -0.45314279,  0.47402212])

In [78]:
pulp_fiction_factors

array([ 0.60244417, -0.073041  ,  0.52547318,  0.8084228 ,  0.66577536,
        1.61979473, -0.50941175,  0.76582605, -0.93349969,  0.57247615])

In [79]:
billy_factors @ pulp_fiction_factors

2.157691575201566

In [80]:
billy_preds = predictions[predictions['userId'] == 10]

In [81]:
billy_preds.sort('movieId').show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    10|    296|   1.0| 2.1576915|
|    10|    356|   3.5| 3.6385257|
|    10|    588|   4.0| 3.0743887|
|    10|    597|   3.5| 3.0714793|
|    10|    912|   4.0| 3.8665726|
|    10|   1028|   0.5| 2.2753046|
|    10|   1088|   3.0| 3.0766957|
|    10|   1247|   3.0| 2.9903915|
|    10|   1307|   3.0| 3.0419095|
|    10|   1784|   3.5| 3.0407722|
|    10|   1907|   4.0| 2.8614848|
|    10|   2571|   0.5| 2.5810933|
|    10|   2671|   3.5|  3.011416|
|    10|   2762|   0.5|  2.508453|
|    10|   2858|   1.0| 2.1224108|
|    10|   2959|   0.5| 2.0427136|
|    10|   3578|   4.0|  3.288957|
|    10|   3882|   3.0| 2.7231166|
|    10|   4246|   3.5| 3.2491283|
|    10|   4306|   4.5| 3.3684144|
+------+-------+------+----------+
only showing top 20 rows



In [76]:
!grep "^296," < data/movies.csv

296,Pulp Fiction (1994),Comedy|Crime|Drama|Thriller


## Okay, what *will* Billy like?

In [51]:
recs = als_model.recommendForAllUsers(numItems=10)

In [52]:
recs[recs['userId']==10].first()['recommendations']

[Row(movieId=42730, rating=5.079683303833008),
 Row(movieId=3682, rating=4.82638692855835),
 Row(movieId=26614, rating=4.801482200622559),
 Row(movieId=8869, rating=4.612627029418945),
 Row(movieId=56145, rating=4.5430192947387695),
 Row(movieId=7076, rating=4.530222415924072),
 Row(movieId=3155, rating=4.529270648956299),
 Row(movieId=140110, rating=4.5004658699035645),
 Row(movieId=5867, rating=4.458368301391602),
 Row(movieId=120635, rating=4.4580183029174805)]

In [54]:
!grep "^42730," < data/movies.csv

42730,Glory Road (2006),Drama


## Objective Review

* Identify the key components of the ALS 
* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark DataFrame, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Some great technical resources:

- [good one from Stanford](http://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf)
- [the netflix recommendation project](https://www.netflixprize.com/assets/GrandPrize2009_BPC_BellKor.pdf)