# Recommendation System

You will be making movie recommendations based on the <a href="https://grouplens.org/datasets/movielens/latest/">MovieLens dataset</a> from the GroupLens research lab at the University of Minnesota. Unless you are planning to run your analysis on a paid cloud platform, we recommend that you use the "small" dataset containing 100,000 user ratings (and potentially, only a particular subset of that dataset).

Your task is to:

    Build a model that provides top 5 movie recommendations to a user, based on their ratings of other movies.

The MovieLens dataset is a "classic" recommendation system dataset, that is used in numerous academic papers and machine learning proofs-of-concept. You will need to create the specific details about how the user will provide their ratings of other movies, in addition to formulating a more specific business problem within the general context of "recommending movies".

#### Collaborative Filtering
At minimum, your recommendation system must use collaborative filtering. If you have time, consider implementing a hybrid approach, e.g. using collaborative filtering as the primary mechanism, but using content-based filtering to address the <a href="https://en.wikipedia.org/wiki/Cold_start_(recommender_systems)">cold start</a> problem.

#### Evaluation
The MovieLens dataset has explicit ratings, so achieving some sort of evaluation of your model is simple enough. But you should give some thought to the question of metrics. Since the rankings are ordinal, we know we can treat this like a regression problem. But when it comes to regression metrics there are several choices: RMSE, MAE, etc. Here are some further ideas.

## Import Packages

In [83]:
import zipfile
import pandas as pd
import numpy as np

from sklearn.preprocessing import LabelEncoder

import pyspark
from pyspark.sql import SparkSession

# import pyspark.sql.functions
from pyspark.sql.functions import col, min, max, avg

from pyspark.ml.recommendation import ALS

# Import RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

# Import ParamGridBuilder, CrossValidator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Start a spark session
spark = SparkSession.builder.getOrCreate()

## Import Data

In [2]:
# Download the data

# ! wget -P ../../data http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

In [3]:
# Unzip the data

# with zipfile.ZipFile('../../data/ml-latest-small.zip', 'r') as zip_ref:
#     zip_ref.extractall('../../data')

## Data exploration & manipulation

In [91]:
# Read in the data with spark DataFrames
links = spark.read.csv('../../data/ml-latest-small/links.csv', header=True)
movies = spark.read.csv("../../data/ml-latest-small/movies.csv", header=True)
ratings = spark.read.csv("../../data/ml-latest-small/ratings.csv", header=True)
tags = spark.read.csv("../../data/ml-latest-small/tags.csv", header=True)

tags = tags.select('userId', 'movieId', 'tag')

# Join tabels
df = ratings.join(tags, ["userId", "movieId"], "left")
df = df.join(movies, ["movieId"], "left")

#Convert to Pandas Dataframe and filter columns
pandas_df=df.toPandas()
pandas_df = pandas_df[['userId', 'rating', 'movieId', 'title', 'genres', 'tag', 'timestamp']]

# Fill null tags with "Unkown"
pandas_df.tag = pandas_df.tag.fillna("Uknown")

# LabelEncode tags and genres
le = LabelEncoder()
le_genres = LabelEncoder()
tags_le = le.fit_transform(pandas_df.tag)
genres_le = le_genres.fit_transform(pandas_df.genres)

pandas_df = pandas_df.drop(['genres', 'tag'], axis=1)
pandas_df['tags_le'] = pd.Series(tags_le, index=pandas_df.index).astype(str)
pandas_df['genres_le'] = pd.Series(genres_le, index=pandas_df.index).astype(str)

# Convert back Spark Dataframe
df = spark.createDataFrame(pandas_df)

# Change dtypes from strings to numeric
df = df.select(df.userId.cast("integer"),
               df.movieId.cast("integer"),
               df.rating.cast("double"),
               df.genres_le.cast("integer"),
               df.tags_le.cast("integer"),
               df.timestamp.cast("integer"),
               df.title)

In [92]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- genres_le: integer (nullable = true)
 |-- tags_le: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- title: string (nullable = true)



In [93]:
df.show(10)

+------+-------+------+---------+-------+---------+--------------------+
|userId|movieId|rating|genres_le|tags_le|timestamp|               title|
+------+-------+------+---------+-------+---------+--------------------+
|     1|      1|   4.0|      351|    520|964982703|    Toy Story (1995)|
|     1|      3|   4.0|      732|    520|964981247|Grumpier Old Men ...|
|     1|      6|   4.0|      260|    520|964982224|         Heat (1995)|
|     1|     47|   5.0|      937|    520|964983815|Seven (a.k.a. Se7...|
|     1|     50|   5.0|      790|    520|964982931|Usual Suspects, T...|
|     1|     70|   3.0|      214|    520|964982400|From Dusk Till Da...|
|     1|    101|   5.0|      430|    520|964980868|Bottle Rocket (1996)|
|     1|    110|   4.0|      290|    520|964982176|   Braveheart (1995)|
|     1|    151|   5.0|      281|    520|964984041|      Rob Roy (1995)|
|     1|    157|   5.0|      741|    520|964984100|Canadian Bacon (1...|
+------+-------+------+---------+-------+---------+

#### Calculating Sparsity

In [94]:
# Count the total number of ratings in the dataset
numerator = df.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = df.select("userId").distinct().count()
num_movies = df.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.27% empty.


#### Ratings Summary Statistics

In [95]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
df.groupBy("movieID").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
df.groupBy("movieID").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
df.groupBy("userID").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
df.groupBy("userID").count().select(avg("count")).show()

print("Num of users: " + str(df.groupBy("userID").count().count()))
print("Num of movies: " + str(df.groupBy("movieID").count().count()))
print("Num of tags: " + str(df.groupBy("tags_le").count().count()))

print("Num of rows in ratings: " + str(ratings.count()))

print("Num of rows in tags: " + str(tags.count()))

print("Num of rows in combined dataframe: " + str(df.count()))

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+------------------+
|        avg(count)|
+------------------+
|10.559132044426162|
+------------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|        20|
+----------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|168.32295081967214|
+------------------+

Num of users: 610
Num of movies: 9724
Num of tags: 1544
Num of rows in ratings: 100836
Num of rows in tags: 3683
Num of rows in combined dataframe: 102677


#### FSM

In [102]:
# Split the ratings dataframe into training and test data
(training_data, test_data) = df.randomSplit([.8, .2], seed=42)

# Set the ALS hyperparameters
als_fsm = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative =True, implicitPrefs = False)

# Fit the model to the training_data
fsm = als_fsm.fit(training_data)

# Generate predictions on the test_data
test_predictions = fsm.transform(test_data)
test_predictions.show(10)

+------+-------+------+---------+-------+----------+--------------------+----------+
|userId|movieId|rating|genres_le|tags_le| timestamp|               title|prediction|
+------+-------+------+---------+-------+----------+--------------------+----------+
|   597|    471|   2.0|      634|    520| 941558175|Hudsucker Proxy, ...| 4.1348867|
|   603|    471|   4.0|      634|    520| 954482443|Hudsucker Proxy, ...|   3.54139|
|   218|    471|   4.0|      634|    520|1111624874|Hudsucker Proxy, ...|  2.900381|
|   474|    471|   3.0|      634|    991| 974668858|Hudsucker Proxy, ...| 3.4150596|
|   387|    471|   3.0|      634|    520|1139047519|Hudsucker Proxy, ...| 3.4201646|
|   171|    471|   3.0|      634|    520| 866905683|Hudsucker Proxy, ...|  4.209692|
|   541|    471|   3.0|      634|    520| 835643551|Hudsucker Proxy, ...|  3.833968|
|   104|    471|   4.5|      634|    520|1238111129|Hudsucker Proxy, ...| 3.3618152|
|   609|    833|   3.0|      634|    520| 847221080|High School H

#### Build RMSE evaluator

In [103]:
# Complete the evaluator code
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [104]:
# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(test_predictions)

# Print the RMSE
print (RMSE)

0.8730344272005154


#### Tuning Hyperparameters

In [105]:
# # Create ALS model
# als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
#           coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

In [None]:
# Add hyperparameters and their respective values to param_grid
# param_grid = ParamGridBuilder() \
#             .addGrid(als.alpha, [0, .25, .5, .75, 1]) \
#             .addGrid(als.rank, [10, 25, 50, 75, 100]) \
#             .addGrid(als.regParam, [.05, .1, .15, .2]) \
#             .addGrid(als.maxIter, [5, 25, 50, 75, 100]) \
#             .build()


# print length of evaluator
# print ("Num models to be tested: ", len(param_grid))

In [None]:
# Build cross validation using CrossValidator
# cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [None]:
# Fit cross validator to the 'train' dataset
# model = cv.fit(train)

In [None]:
# Extract best model from the cv model above
# best_model = model.bestModel

In [None]:
# Complete the code below to extract the ALS model parameters
# print("**Best Model**")

# rank: 50
# regParam=0.15
# alpha=0

# Print "ParamMap"
# print("\n\n  ParamMap:", best_model._java_obj.parent().extractParamMap())

In [107]:
# New model with best params
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          alpha=0, rank=50, regParam=0.15, maxIter=10,
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)
# Fit model to train
model = als.fit(training_data)

# Use mode to predict on test
test_predictions = model.transform(test_data)
test_predictions.show(10)

+------+-------+------+---------+-------+----------+--------------------+----------+
|userId|movieId|rating|genres_le|tags_le| timestamp|               title|prediction|
+------+-------+------+---------+-------+----------+--------------------+----------+
|   597|    471|   2.0|      634|    520| 941558175|Hudsucker Proxy, ...|  4.160037|
|   603|    471|   4.0|      634|    520| 954482443|Hudsucker Proxy, ...| 3.1859825|
|   218|    471|   4.0|      634|    520|1111624874|Hudsucker Proxy, ...| 2.9703295|
|   474|    471|   3.0|      634|    991| 974668858|Hudsucker Proxy, ...| 3.2990625|
|   387|    471|   3.0|      634|    520|1139047519|Hudsucker Proxy, ...| 3.1886456|
|   171|    471|   3.0|      634|    520| 866905683|Hudsucker Proxy, ...|  4.216942|
|   541|    471|   3.0|      634|    520| 835643551|Hudsucker Proxy, ...| 3.6519241|
|   104|    471|   4.5|      634|    520|1238111129|Hudsucker Proxy, ...|  3.426191|
|   609|    833|   3.0|      634|    520| 847221080|High School H

In [109]:
# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(test_predictions)

# Print the RMSE
print (RMSE)

0.8673723199273932


In [117]:
recommendations = model.recommendForAllUsers(5)

In [138]:
recommendations.registerTempTable("ALS_recs_temp")
clean_recs = spark.sql("SELECT userId, movieIds_and_ratings.movieId AS movieId, movieIds_and_ratings.rating AS prediction FROM ALS_recs_temp LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings")
clean_recs = clean_recs.join(movies, on=['movieId'], how='left').sort('userId')
clean_recs.show()

+-------+------+----------+--------------------+--------------------+
|movieId|userId|prediction|               title|              genres|
+-------+------+----------+--------------------+--------------------+
| 132333|     1|  5.693803|         Seve (2014)|   Documentary|Drama|
|   5915|     1|  5.545244|Victory (a.k.a. E...|    Action|Drama|War|
|   5490|     1|  5.693803|  The Big Bus (1976)|       Action|Comedy|
|  33649|     1| 5.4943852|  Saving Face (2004)|Comedy|Drama|Romance|
|   7842|     1| 5.6125774|         Dune (2000)|Drama|Fantasy|Sci-Fi|
| 117531|     2|   4.67297|    Watermark (2014)|         Documentary|
|   7071|     2|   4.67297|Woman Under the I...|               Drama|
|  26073|     2|   4.67297|Human Condition I...|           Drama|War|
| 179135|     2|   4.67297|Blue Planet II (2...|         Documentary|
|  84273|     2|   4.67297|Zeitgeist: Moving...|         Documentary|
|   6835|     3| 4.8442287|Alien Contaminati...|Action|Horror|Sci-Fi|
|   5181|     3|  4.