# Recommendation System

You will be making movie recommendations based on the <a href="https://grouplens.org/datasets/movielens/latest/">MovieLens dataset</a> from the GroupLens research lab at the University of Minnesota. Unless you are planning to run your analysis on a paid cloud platform, we recommend that you use the "small" dataset containing 100,000 user ratings (and potentially, only a particular subset of that dataset).

Your task is to:

    Build a model that provides top 5 movie recommendations to a user, based on their ratings of other movies.

The MovieLens dataset is a "classic" recommendation system dataset, that is used in numerous academic papers and machine learning proofs-of-concept. You will need to create the specific details about how the user will provide their ratings of other movies, in addition to formulating a more specific business problem within the general context of "recommending movies".

#### Collaborative Filtering
At minimum, your recommendation system must use collaborative filtering. If you have time, consider implementing a hybrid approach, e.g. using collaborative filtering as the primary mechanism, but using content-based filtering to address the <a href="https://en.wikipedia.org/wiki/Cold_start_(recommender_systems)">cold start</a> problem.

#### Evaluation
The MovieLens dataset has explicit ratings, so achieving some sort of evaluation of your model is simple enough. But you should give some thought to the question of metrics. Since the rankings are ordinal, we know we can treat this like a regression problem. But when it comes to regression metrics there are several choices: RMSE, MAE, etc. Here are some further ideas.

## Import Packages

In [1]:
import zipfile
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession

# import pyspark.sql.functions
from pyspark.sql.functions import col, min, max, avg

from pyspark.ml.recommendation import ALS

# Import RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

# Import ParamGridBuilder, CrossValidator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Start a spark session
spark = SparkSession.builder.getOrCreate()

## Import Data

In [2]:
# Download the data

# ! wget -P ../../data http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

In [3]:
# Unzip the data

# with zipfile.ZipFile('../../data/ml-latest-small.zip', 'r') as zip_ref:
#     zip_ref.extractall('../../data')

In [4]:
# Read in the data to Pandas DataFrames

# links_pd = pd.read_csv('../../data/ml-latest-small/links.csv')
# movies_pd = pd.read_csv('../../data/ml-latest-small/movies.csv')
# ratings_pd = pd.read_csv('../../data/ml-latest-small/ratings.csv')
# tags_pd = pd.read_csv('../../data/ml-latest-small/tags.csv')

In [5]:
# Read in the data with spark DataFrames
links = spark.read.csv('../../data/ml-latest-small/links.csv', header=True)
movies = spark.read.csv("../../data/ml-latest-small/movies.csv", header=True)
ratings = spark.read.csv("../../data/ml-latest-small/ratings.csv", header=True)
tags = spark.read.csv("../../data/ml-latest-small/tags.csv", header=True)

## Data exploration & manipulation

In [6]:
# Drop the Timestamp Column from ratings
ratings = ratings.select('userID', 'movieID', 'rating')

# Change dtypes from strings to numeric
ratings = ratings.select(ratings.userID.cast("integer"),
                         ratings.movieID.cast("integer"),
                         ratings.rating.cast("double"))


# Add ratings table to spark session
# ratings.createOrReplaceTempView('ratings')
# print(spark.catalog.listTables())

In [7]:
tags.show(10)

+------+-------+-----------------+----------+
|userId|movieId|              tag| timestamp|
+------+-------+-----------------+----------+
|     2|  60756|            funny|1445714994|
|     2|  60756|  Highly quotable|1445714996|
|     2|  60756|     will ferrell|1445714992|
|     2|  89774|     Boxing story|1445715207|
|     2|  89774|              MMA|1445715200|
|     2|  89774|        Tom Hardy|1445715205|
|     2| 106782|            drugs|1445715054|
|     2| 106782|Leonardo DiCaprio|1445715051|
|     2| 106782|  Martin Scorsese|1445715056|
|     7|  48516|     way too long|1169687325|
+------+-------+-----------------+----------+
only showing top 10 rows



In [8]:
ratings.columns

['userID', 'movieID', 'rating']

In [9]:
ratings.dtypes

[('userID', 'int'), ('movieID', 'int'), ('rating', 'double')]

In [10]:
links.show(10)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
|      6|0113277|   949|
|      7|0114319| 11860|
|      8|0112302| 45325|
|      9|0114576|  9091|
|     10|0113189|   710|
+-------+-------+------+
only showing top 10 rows



#### Calculating Sparsity

In [11]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.30% empty.


#### Ratings Summary Statistics

In [12]:
# Group data by userId, count ratings
ratings.groupBy("userID").count().show(10)

+------+-----+
|userID|count|
+------+-----+
|   148|   48|
|   463|   33|
|   471|   28|
|   496|   29|
|   243|   36|
|   392|   25|
|   540|   42|
|    31|   50|
|   516|   26|
|    85|   34|
+------+-----+
only showing top 10 rows



In [13]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieID").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movieID").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("userID").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("userID").count().select(avg("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+------------------+
|        avg(count)|
+------------------+
|10.369806663924312|
+------------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|        20|
+----------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|165.30491803278687|
+------------------+



In [14]:
ratings.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- rating: double (nullable = true)



#### FSM

In [15]:
# Split the ratings dataframe into training and test data
(training_data, test_data) = ratings.randomSplit([.8, .2], seed=42)

# Set the ALS hyperparameters
als_fsm = ALS(userCol="userID", itemCol="movieID", ratingCol="rating", rank =10, maxIter =15, regParam =.1,
          coldStartStrategy="drop", nonnegative =True, implicitPrefs = True)

# Fit the model to the training_data
fsm = als_fsm.fit(training_data)

# Generate predictions on the test_data
test_predictions = fsm.transform(test_data)
test_predictions.show()

+------+-------+------+-----------+
|userID|movieID|rating| prediction|
+------+-------+------+-----------+
|   182|    471|   4.5| 0.60043967|
|   462|    471|   2.5| 0.48213986|
|   610|    471|   4.0| 0.15886141|
|   176|    471|   5.0| 0.24627937|
|   448|    471|   4.0|  0.4772345|
|   608|    833|   0.5| 0.07938771|
|   463|   1088|   3.5|0.031147564|
|   606|   1088|   3.0| 0.49066234|
|   563|   1088|   4.0|  0.6199479|
|   387|   1088|   1.5| 0.44290695|
|   381|   1088|   3.5| 0.62757486|
|   594|   1088|   4.5| 0.24537411|
|    10|   1088|   3.0|  0.3181059|
|   226|   1088|   1.0|  0.5522457|
|   414|   1088|   3.0|  0.7986954|
|    68|   1088|   3.5|  0.9275609|
|   116|   1088|   4.5| 0.12881464|
|    42|   1088|   3.0| 0.25902092|
|   587|   1580|   4.0|  0.5611114|
|   577|   1580|   3.0| 0.60380816|
+------+-------+------+-----------+
only showing top 20 rows



#### Build RMSE evaluator

In [16]:
# Complete the evaluator code
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [17]:
# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(test_predictions)

# Print the RMSE
print (RMSE)

3.221041348721874


#### Tuning Hyperparameters

In [18]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed=42)

# Create ALS model
als = ALS(userCol="userID", itemCol="movieID", ratingCol="rating", coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

In [38]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 25, 50, 75, 100]) \
            .build()
#             .addGrid(als.regParam, [.05, .1, .15, .2]) \
#             .addGrid(als.maxIter, [5, 25, 50, 75, 100]) \

# print length of evaluator
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  5


In [39]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [40]:
# Fit cross validator to the 'train' dataset
model = cv.fit(train)

In [41]:
#Extract best model from the cv model above
best_model = model.bestModel

In [42]:
# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# Print "Rank"
# print("  Rank:", best_model._java_obj.parent().getRank())
#   Rank: 50

# Print "MaxIter"
# print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
# print("  RegParam:", best_model._java_obj.parent().getRegParam())

# Print "ParamMap"
print("\n\n  ParamMap:", best_model._java_obj.parent().extractParamMap())


**Best Model**


  ParamMap: {
	ALS_254ecd8984e7-alpha: 1.0,
	ALS_254ecd8984e7-blockSize: 4096,
	ALS_254ecd8984e7-checkpointInterval: 10,
	ALS_254ecd8984e7-coldStartStrategy: drop,
	ALS_254ecd8984e7-finalStorageLevel: MEMORY_AND_DISK,
	ALS_254ecd8984e7-implicitPrefs: false,
	ALS_254ecd8984e7-intermediateStorageLevel: MEMORY_AND_DISK,
	ALS_254ecd8984e7-itemCol: movieID,
	ALS_254ecd8984e7-maxIter: 10,
	ALS_254ecd8984e7-nonnegative: true,
	ALS_254ecd8984e7-numItemBlocks: 10,
	ALS_254ecd8984e7-numUserBlocks: 10,
	ALS_254ecd8984e7-predictionCol: prediction,
	ALS_254ecd8984e7-rank: 50,
	ALS_254ecd8984e7-ratingCol: rating,
	ALS_254ecd8984e7-regParam: 0.15,
	ALS_254ecd8984e7-seed: -2848911472366898902,
	ALS_254ecd8984e7-userCol: userID
}


In [25]:
# New model with best params
als = ALS(userCol="userID", itemCol="movieID", rank=50, regParam=0.15, ratingCol="rating", coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)
# Fit model to train
model = als.fit(train)

# Use mode to predict on test
test_predictions = model.transform(test)
test_predictions.show()

+------+-------+------+----------+
|userID|movieID|rating|prediction|
+------+-------+------+----------+
|   182|    471|   4.5| 3.5432718|
|   462|    471|   2.5| 2.6285312|
|   610|    471|   4.0|  3.491257|
|   176|    471|   5.0| 3.4462972|
|   448|    471|   4.0|  3.078767|
|   608|    833|   0.5| 2.0994062|
|   463|   1088|   3.5|  3.639413|
|   606|   1088|   3.0|   3.34542|
|   563|   1088|   4.0| 3.3124366|
|   387|   1088|   1.5| 2.7312846|
|   381|   1088|   3.5|  3.628671|
|   594|   1088|   4.5| 4.1675105|
|    10|   1088|   3.0| 3.2094507|
|   226|   1088|   1.0| 3.3509235|
|   414|   1088|   3.0| 3.3123379|
|    68|   1088|   3.5| 3.3393269|
|   116|   1088|   4.5| 3.5931325|
|    42|   1088|   3.0| 3.4111865|
|   587|   1580|   4.0| 3.8609521|
|   577|   1580|   3.0| 3.2277164|
+------+-------+------+----------+
only showing top 20 rows



In [26]:
# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(test_predictions)

# Print the RMSE
print (RMSE)

0.8663533147645811


In [27]:
recommendations = model.recommendForAllUsers(5)

In [28]:
recommendations.registerTempTable("ALS_recs_temp")
clean_recs = spark.sql("SELECT userID, movieIds_and_ratings.movieId AS movieID, movieIds_and_ratings.rating AS prediction FROM ALS_recs_temp LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings")

In [29]:
clean_recs.show()

+------+-------+----------+
|userID|movieID|prediction|
+------+-------+----------+
|   471|  96004| 4.7351313|
|   471|   3379| 4.7351313|
|   471|   7096|  4.674054|
|   471| 177593|  4.666936|
|   471|  33649|  4.649665|
|   463|  96004|  5.008803|
|   463|   3379|  5.008803|
|   463|  33649| 4.9356914|
|   463| 171495|  4.852105|
|   463|   7071| 4.7728896|
|   496|  25771| 4.3943233|
|   496|  96004|  4.343777|
|   496|   3379|  4.343777|
|   496|  33649| 4.2761493|
|   496|   3266| 4.2232966|
|   148|  33649|   4.39337|
|   148|   3379| 4.3492284|
|   148|  96004| 4.3492284|
|   148| 171495| 4.2653317|
|   148| 117531|  4.228605|
+------+-------+----------+
only showing top 20 rows



In [30]:
clean_recs = clean_recs.join(movies, ["movieID"], "left")

In [31]:
clean_recs = clean_recs.select('userID', 'prediction', 'movieId', 'title', 'genres').sort('userID')
clean_recs.show(100)

+------+----------+-------+--------------------+--------------------+
|userID|prediction|movieId|               title|              genres|
+------+----------+-------+--------------------+--------------------+
|     1|  5.467287|  33649|  Saving Face (2004)|Comedy|Drama|Romance|
|     1| 5.6248956|  96004|Dragon Ball Z: Th...|Action|Adventure|...|
|     1| 5.4539785| 171495|              Cosmos|  (no genres listed)|
|     1|   5.60177| 132333|         Seve (2014)|   Documentary|Drama|
|     1| 5.6248956|   3379| On the Beach (1959)|               Drama|
|     2|  4.553409|   7071|Woman Under the I...|               Drama|
|     2|   4.68128|  96004|Dragon Ball Z: Th...|Action|Adventure|...|
|     2| 4.8219223| 131724|The Jinx: The Lif...|         Documentary|
|     2|  4.592193| 171495|              Cosmos|  (no genres listed)|
|     2|   4.68128|   3379| On the Beach (1959)|               Drama|
|     3|  4.836814|   6835|Alien Contaminati...|Action|Horror|Sci-Fi|
|     3|  4.737157| 