# Building a Recommendation System in PySpark - Lab

## Introduction

In this last lab, we will implement a a movie recommendation system using ALS in Spark programming environment. Spark's machine learning libraray `ml` comes packaged with a very efficient imeplementation of ALS algorithm that we looked at in the previous lesson. The lab will require you to put into pratice your spark programming skills for creating and manipulating pyspark DataFrames. We will go through a step-by-step process into developing a movie recommendation system using ALS and pyspark using the MovieLens Dataset that we used in a previous lab.

Note: You are advised to refer to [PySpark Documentation](http://spark.apache.org/docs/2.2.0/api/python/index.html) heavily for completing this lab as it will introduce a few new methods. 


## Objectives

You will be able to:

* Demonstrate an understanding on how recommendation systems are being used for personalization of online services/products
* Parse and filter datasets into Spark RDDs, performing basic feature selection
* Run a brief hyper-parameter selection activity through a scalable grid search
* Train and evaluate the predictive performance of recommendation system
* Generate predictions from the trained model

## Building a Recommendation System

We have seen how recommender/Recommendation Systems have played an  integral parts in the success of Amazon (Books, Items), Pandora/Spotify (Music), Google (News, Search), YouTube (Videos) etc.  For Amazon these systems bring more than 30% of their total revenues. For Netflix service, 75% of movies that people watch are based on some sort of recommendation.

> The goal of Recommendation Systems is to find what is likely to be of interest to the user. This enables organizations to offer a high level of personalization and customer tailored services.


For online video content services like Netflix and Hulu, the need to build robust movie recommendation systems is extremely important. An example of recommendation system is such as this:

1.    User A watches Game of Thrones and Breaking Bad.
2.    User B performs a search query for Game of Thrones.
3.    The system suggests Breaking Bad to user B from data collected about user A.


This lab will guide you through a step-by-step process into developing such a movie recommendation system. We will use the MovieLens dataset to build a movie recommendation system using the collaborative filtering technique with Spark's Alternating Least Saqures implementation. After building that recommendation system, we will go through the process of adding a new user to the dataset with some new ratings and obtaining new recommendations for that user.

### Importing the Data
To begin with:
* initialize a SparkSession object
* import the dataset found at './data/ratings.csv' into a pyspark DataFrame

In [1]:
# import necessary libraries

import pyspark

In [3]:
# instantiate SparkSession object
spark = (pyspark.sql.SparkSession.builder
         .master("local[*]") #local[4]: run on my computer, use 4 processors, 
                             #local[*]: use a processor for each core
         .getOrCreate())

In [5]:
!ls data/

movies.csv  ratings.csv


In [6]:
!file data/ratings.csv

data/ratings.csv: ASCII text, with CRLF line terminators


In [7]:
!head data/ratings.csv

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041


- If userId wasn't numeric, you'd have to encode it numerically (ex. github username)

In [14]:
# read in the dataset into pyspark DataFrame
movie_ratings = spark.read.csv("data/ratings.csv",header=True, inferSchema=True)


#inferSchema reads through doc twice, takes longer
#header helps recognize column names

In [21]:
#OR
#pass in schema to specify column names & column types for a large dataset

from pyspark.sql.types import (
                                StructType,
                                StructField,
                                IntegerType,
                                FloatType,
                                LongType,)
dir(pyspark.sql.types)

['ArrayType',
 'AtomicType',
 'BinaryType',
 'BooleanType',
 'ByteType',
 'CloudPickleSerializer',
 'DataType',
 'DataTypeSingleton',
 'DateConverter',
 'DateType',
 'DatetimeConverter',
 'DecimalType',
 'DoubleType',
 'FloatType',
 'FractionalType',
 'IntegerType',
 'IntegralType',
 'JavaClass',
 'LongType',
 'MapType',
 'NullType',
 'NumericType',
 'Row',
 'ShortType',
 'SparkContext',
 'StringType',
 'StructField',
 'StructType',
 'TimestampType',
 'UserDefinedType',
 '_FIXED_DECIMAL',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 '_acceptable_types',
 '_all_atomic_types',
 '_all_complex_types',
 '_array_signed_int_typecode_ctype_mappings',
 '_array_type_mappings',
 '_array_unsigned_int_typecode_ctype_mappings',
 '_atomic_types',
 '_check_dataframe_convert_date',
 '_check_dataframe_localize_timestamps',
 '_check_series_convert_date',
 '_check_series_convert_timestamps_internal',
 '_check_series_convert_

In [18]:
schema = StructType(
[
    StructField('userId', IntegerType()),
    StructField('movieId', IntegerType()),
    StructField('rating', FloatType()),
    StructField('timestamp', LongType())
]
)

In [32]:
movie_ratings = spark.read.csv("data/ratings.csv",
                               header=True, 
                               inferSchema=False,
                               schema=schema)

movie_ratings.persist()

DataFrame[userId: int, movieId: int, rating: float, timestamp: bigint]

Check the data types of each of the values to ensure that they are a type that makes sense given the column.

In [20]:
movie_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)



In [22]:
movie_ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



- If we want to cross-validate, we should do a temporal split on timestamps in recommendation systems, SO do NOT drop the timestamp column
- New users will have new NaNs

### Fitting the Alternating Least Squares Model

Because this dataset is already preprocessed for us, we can go ahead and fit the Alternating Least Squares model.

* Import the ALS module from pyspark.ml.recommendation.
* Use the randomSplit method on the pyspark DataFrame to separate the dataset into a training and test set
* Fit the Alternating Least Squares Model to the training dataset. Make sure to set the userCol, itemCol, and ratingCol to the appropriate names given this dataset. Then fit the data to the training set and assign it to a variable model. 

In [23]:
from pyspark.ml.evaluation import RegressionEvaluator
#DON'T USE MLLIB BECAUSE IT'S NOT DATAFRAME AWARE

#USE ML, even though pyspark documentation may appear as MLLIB even though it's talking about the new ML library
#SPARK changes fast, make sure your documentation is using the same version as you

from pyspark.ml.recommendation import ALS, ALSModel
#ALS creates an ALS model, ALSModel lets you save a model and load it back

In [27]:
#DEFAULTS:
#ALS(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, 
#alpha=1.0, userCol='user', itemCol='item', seed=None, ratingCol='rating', nonnegative=False, 
#checkpointInterval=10,intermediateStorageLevel='MEMORY_AND_DISK', 
#finalStorageLevel='MEMORY_AND_DISK', coldStartStrategy='nan')

# split into training and testing sets
#SPECIFY WHICH COLUMNS ARE USER, ITEM, and RATING
als = ALS(
        rank=10,
        maxIter=10,
        userCol='userId',
        itemCol='movieId',
        ratingCol='rating',
        )

In [28]:
# Build the recommendation model using ALS on the training data

als.fit(movie_ratings)
#EVERYTHING'S IMMUTABLE - WE MUST ACTUALLY NAME AN OBJECT

ALS_1ccaf09c4b2d

In [29]:
als_model = als.fit(movie_ratings)

In [30]:
predictions = als_model.transform(movie_ratings)
#everything just uses fit & transform instead of sklearn's fit, transform, predict, predict_proba mess

In [33]:
predictions.persist()

DataFrame[userId: int, movieId: int, rating: float, timestamp: bigint, prediction: float]

In [36]:
movie_ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [35]:
predictions.show(5) #first run after .persist() isn't fast, 2nd run after both .persist() and .show() IS FASTER

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|   191|    148|   5.0|829760897|  4.923959|
|   133|    471|   4.0|843491793| 3.2797976|
|   597|    471|   2.0|941558175| 3.8111665|
|   385|    471|   4.0|850766697| 3.3658185|
|   436|    471|   3.0|833530187|  3.523111|
+------+-------+------+---------+----------+
only showing top 5 rows



In [37]:
#make new dataframe with column renamed, show first 5 lines, throws it away
predictions.withColumnRenamed('prediction', 'pred').show(5)

+------+-------+------+---------+---------+
|userId|movieId|rating|timestamp|     pred|
+------+-------+------+---------+---------+
|   191|    148|   5.0|829760897| 4.923959|
|   133|    471|   4.0|843491793|3.2797976|
|   597|    471|   2.0|941558175|3.8111665|
|   385|    471|   4.0|850766697|3.3658185|
|   436|    471|   3.0|833530187| 3.523111|
+------+-------+------+---------+---------+
only showing top 5 rows



In [40]:
als_model.userFactors.limit(2).toPandas()

Unnamed: 0,id,features
0,10,"[0.2496262937784195, -0.7958771586418152, 0.25..."
1,20,"[-0.6927688717842102, 0.20887841284275055, 0.4..."


In [41]:
als_model.itemFactors.limit(2).toPandas()

Unnamed: 0,id,features
0,10,"[-0.8358665108680725, -0.028308473527431488, 0..."
1,20,"[0.23314112424850464, 0.123916856944561, 0.611..."


In [43]:
!head -n 15 data/movies.csv

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
11,"American President, The (1995)",Comedy|Drama|Romance
12,Dracula: Dead and Loving It (1995),Comedy|Horror
13,Balto (1995),Adventure|Animation|Children
14,Nixon (1995),Drama


In [46]:
# WHAT's HAPPENING BEHIND THE SCENES, DOT-PRODUCT
user_factors = als_model.userFactors

In [47]:
item_factors = als_model.itemFactors

In [50]:
#call features data itself from a Row object
user_factors[user_factors['id'] == 10].first()['features']

[0.2496262937784195,
 -0.7958771586418152,
 0.25280100107192993,
 -0.33441755175590515,
 1.3946477174758911,
 -1.0567063093185425,
 0.24984325468540192,
 0.300751268863678,
 0.19300296902656555,
 -0.7851032018661499]

In [52]:
import numpy as np
billy_row = user_factors[user_factors['id'] == 10].first() 
#.first() takes the Row object out of the 1-row Spark DataFrame
billy_factors = np.array(billy_row['features'])

In [64]:
pulp_row = item_factors[item_factors['id'] == 296].first()
pulp_factors = np.array(pulp_row['features'])

In [65]:
billy_factors

array([ 0.24962629, -0.79587716,  0.252801  , -0.33441755,  1.39464772,
       -1.05670631,  0.24984325,  0.30075127,  0.19300297, -0.7851032 ])

In [66]:
pulp_factors

array([-0.55188572,  0.84363765,  0.21665037,  0.17438513,  0.63840616,
       -1.6038481 ,  0.08963302,  0.27322373,  1.381253  , -0.1680087 ])

In [67]:
billy_factors @ pulp_factors

2.2754596446813307

In [68]:
billy_preds = predictions[predictions['userId'] == 10] 

In [69]:
billy_preds.sort('movieId').show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|    10|    296|   1.0|1455303387| 2.2754595|
|    10|    356|   3.5|1455301685| 3.5585384|
|    10|    588|   4.0|1455306173| 3.3146539|
|    10|    597|   3.5|1455357645| 3.3858886|
|    10|    912|   4.0|1455302254| 3.2757502|
|    10|   1028|   0.5|1455306152| 2.6577158|
|    10|   1088|   3.0|1455619275| 3.4713135|
|    10|   1247|   3.0|1455303518|  2.597934|
|    10|   1307|   3.0|1455357613| 3.0849853|
|    10|   1784|   3.5|1455301699| 3.0649784|
|    10|   1907|   4.0|1455306183| 3.3260531|
|    10|   2571|   0.5|1455356378| 3.3142972|
|    10|   2671|   3.5|1455357517| 3.5754137|
|    10|   2762|   0.5|1455356388|  2.750549|
|    10|   2858|   1.0|1455356578|  2.703698|
|    10|   2959|   0.5|1455356582| 2.5517616|
|    10|   3578|   4.0|1455356591| 3.3524153|
|    10|   3882|   3.0|1455398344| 3.1930141|
|    10|   4246|   3.5|1455302676|

In [70]:
#als_model.recommendForAllUsers
recs = als_model.recommendForAllUsers(numItems=10) #recommeend 10 things for each user

In [71]:
recs.persist().show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[3925, 4.834355]...|
|   463|[[5075, 5.186709]...|
|   496|[[8477, 5.4480753...|
|   148|[[183897, 4.57298...|
|   540|[[3379, 5.275497]...|
|   392|[[84847, 5.224326...|
|   243|[[72171, 6.19093]...|
|    31|[[5466, 5.4735403...|
|   516|[[8477, 4.9331756...|
|   580|[[5075, 5.3298316...|
|   251|[[3379, 5.7613378...|
|   451|[[3379, 5.454221]...|
|    85|[[25850, 5.529511...|
|   137|[[3379, 4.742041]...|
|    65|[[3379, 4.792878]...|
|   458|[[32892, 5.483429...|
|   481|[[299, 4.364671],...|
|    53|[[33649, 6.954679...|
|   255|[[74754, 5.386463...|
|   588|[[3379, 4.5398135...|
+------+--------------------+
only showing top 20 rows



In [72]:
recs[recs['userId']==10].first()['recommendations']

[Row(movieId=32892, rating=4.977327823638916),
 Row(movieId=86320, rating=4.826628684997559),
 Row(movieId=3682, rating=4.6353936195373535),
 Row(movieId=110130, rating=4.558963298797607),
 Row(movieId=71579, rating=4.544266700744629),
 Row(movieId=7169, rating=4.480790138244629),
 Row(movieId=67618, rating=4.456949710845947),
 Row(movieId=7121, rating=4.4546356201171875),
 Row(movieId=8869, rating=4.428635597229004),
 Row(movieId=113275, rating=4.419461727142334)]

In [75]:
!grep 32892 < data/movies.csv
!grep 86320 < data/movies.csv

32892,Ivan's Childhood (a.k.a. My Name is Ivan) (Ivanovo detstvo) (1962),Drama|War
86320,Melancholia (2011),Drama|Sci-Fi


In [None]:
#EVALUATE error WITH RMSE

#IN REAL LIFE, AN ERROR ON THE HIGH END (WHAT YOU RECOMMEND) IS WAY WORSE THAN AN ERROR ON THE LOW END

#USERS REVEALED PREFERENCE > STATED PREFERENCE

#YOU CAN PULL OUT USER AND ITEM FACTORS INTO NUMPY ARRAYS, MATH IS EASY ONCE FACORIZATION IS DONE

#ALS RANDOMLY INITIALIZES ENTRIES IN USER VECTORS FOR COLD START
#ALTERNATIVE: impute initial entries in a way that makes common sense for the user

In [62]:
billy_pulp_preds =  billy_preds[billy_preds['movieId'] == 10]

In [59]:
b.show()

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
+------+-------+------+---------+----------+



Now you've fit the model, and it's time to evaluate it to determine just how well it performed.

* import `RegressionEvalutor` from pyspark.ml.evaluation
* generate predictions with your model for the test set by using the `transform` method on your ALS model
* evaluate your model and print out the RMSE from your test set

In [11]:
# importing appropriate library


# Evaluate the model by computing the RMSE on the test data


Root-mean-square error = 0.9968853671625669


### Cross Validation to Find the Optimal Model

Let's now find the optimal values for the parameters of the ALS model. Use the built-in Cross Validator in pyspark with a suitable param grid and determine the optimal model. Try with the parameters:

* regularization = [0.01,0.001,0.1])
* rank = [4,10,50]



In [63]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# initialize the ALS model


# create the parameter grid              


## instantiating crossvalidator estimator


# We see the best model has a rank of 50, so we will use that in our future models with this dataset


### Incorporating the names of the movies

When we make recommendations, it would be ideal if we could have the actual name of the movie be used rather than just an ID. There is another file called './data/movies.csv' that contains all of the names of the movies matched up to the movie_id that we have in the ratings dataset.

* import the data into a Spark DataFrame
* look at the first 5 rows

In [17]:
movie_titles = None

movie_titles.head(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

We will eventually be matching up the movie_ids with the movie titles. In the cell below, create a function `name_retriever` that takes in a movie_id and returns a string that. 

> Hint: It's possible to do this operation in one line with the `df.where` or the `df.filter` method

In [18]:
def name_retriever(movie_id,movie_title_df):
    pass

In [22]:
print(name_retriever(1023,movie_titles))

Winnie the Pooh and the Blustery Day (1968)


## Getting Recommendations

Now it's time to actually get some recommendations! The ALS model has built in methods called `recommendForUserSubset` and `recommendForAllUsers`. We'll start off with using a subset of users.

In [34]:
users = movie_ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(users, 10)
recs = userSubsetRecs.take(1)

We can now see we have a list of rows with recommended items. Now try and get the name of the top recommended movie by way of the function you just created, using number one item for this user.

In [47]:
# use indexing to obtain the movie id of top predicted rated item
first_recommendation = recs[0]['recommendations'][0][0]

# use the name retriever function to get the values
name_retriever(first_recommendation,movie_titles)

'Pirate Radio (2009)'

Of course, you can also make recommendations for everyone, although this will take longer. In the next line, we are creating an RDD with the top 5 recommendations for every user and then selecting one user to find out his predictions:

In [48]:
recommendations = model.recommendForAllUsers(5)
recommendations.where(recommendations.userId == 3).collect()

### Getting Predictions for a New User

Now, it's time to put together all that you've learned in this section to create a function that will take in a new user and some movies they've rated and then return n number of highest recommended movies. This function will have multiple different steps to it:

* adding the new ratings into the dataframe (hint: look into using the union df method)
* fitting the als model to
* make recommendations for the user of choice
* print out the names of the top n recommendations in a reader-friendly manner

The function should take in the parameters:
* user_id : int 
* new_ratings : list of tuples in the format (user_id,item_id,rating)
* rating_df : spark DF containing ratings
* movie_title_df : spark DF containing movie titles
* num_recs : int

Rate these new movies

```python
[Row(movieId=3253, title="Wayne's World (1992)", genres='Comedy'),
 Row(movieId=2459, title='Texas Chainsaw Massacre, The (1974)', genres='Horror'),
 Row(movieId=2513, title='Pet Sematary (1989)', genres='Horror'),
 Row(movieId=6502, title='28 Days Later (2002)', genres='Action|Horror|Sci-Fi'),
 Row(movieId=1091, title="Weekend at Bernie's (1989)", genres='Comedy'),
Row(movieId=441, title='Dazed and Confused (1993)', genres='Comedy'),
Row(movieId=370, title='Naked Gun 33 1/3: The Final Insult (1994)', genres='Action|Comedy')]

```

In [81]:
def new_user_recs(user_id,new_ratings,rating_df,movie_title_df,num_recs):
    # turn the new_recommendations list into a spark DataFrame
    
    
    # combine the new ratings df with the rating_df
  
    
    # create an ALS model and fit it

    
    # make recommendations for all users using the recommendForAllUsers method

    
    # get recommendations specifically for the new user that has been added to the DataFrame
    pass
        

In [82]:
# try out your function with the movies listed above




Recommendation 1: Star Wars: Episode IV - A New Hope (1977)  | predicted score :5.517341136932373
Recommendation 2: Usual Suspects, The (1995)  | predicted score :5.442122936248779
Recommendation 3: In the Name of the Father (1993)  | predicted score :5.3851237297058105
Recommendation 4: Star Wars: Episode V - The Empire Strikes Back (1980)  | predicted score :5.381286144256592
Recommendation 5: Fight Club (1999)  | predicted score :5.361552715301514
Recommendation 6: Monty Python and the Holy Grail (1975)  | predicted score :5.347217559814453
Recommendation 7: Willy Wonka & the Chocolate Factory (1971)  | predicted score :5.328979969024658
Recommendation 8: Who Framed Roger Rabbit? (1988)  | predicted score :5.324649810791016
Recommendation 9: Clerks (1994)  | predicted score :5.305201530456543
Recommendation 10: Office Space (1999)  | predicted score :5.297811985015869


So here we have it! Our recommendation system is generating recommendations for the top 10 movies. 




## Level up - Optional 


* Create a user interface to allow users to easily choose items and get recommendations.

* Use IMDB links to scrape user reviews from IMDB and using basic NLP techniques, create extra embeddings for ALS model. 

* Create a hybrid recommender system using features like genre

## Summary

In this lab, we learned how to build a model using Spark, how to perform some parameter selection, and how to update the model every time that new user preferences come in. We looked at how Spark's ALS implementation can be be used to build a scalable and efficient recommendation system. We also saw that such systems can become computationally expensive and using them with an online system could be a problem with traditional computational platforms. Spark's distributed computing architecture provides a great solution to deploy such recommendation systems for real world applications (think Amazon, Spotify).