# Overview

In this project, I will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens Datasets](https://grouplens.org/datasets/movielens/latest/)

##  [Alternating Least Squares](https://endymecy.gitbooks.io/spark-ml-source-analysis/content/%E6%8E%A8%E8%8D%90/papers/Large-scale%20Parallel%20Collaborative%20Filtering%20the%20Netflix%20Prize.pdf)
ALS is one of the low rank matrix approximation algorithms for collaborative filtering. ALS decomposes user-item matrix into two low rank matrixes: user matrix and item matrix. In collaborative filtering, users and products are described by a small set of latent factors that can be used to predict missing entries. And ALS algorithm learns these latent factors by matrix factorization


## Data Sets
I use [MovieLens Datasets](https://grouplens.org/datasets/movielens/latest/).
This dataset (ml-latest.zip) describes 5-star rating and free-text tagging activity from [MovieLens](http://movielens.org), a movie recommendation service. It contains 27753444 ratings and 1108997 tag applications across 58098 movies. These data were created by 283228 users between January 09, 1995 and September 26, 2018. This dataset was generated on September 26, 2018.

Users were selected at random for inclusion. All selected users had rated at least 1 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.

The data are contained in the files `genome-scores.csv`, `genome-tags.csv`, `links.csv`, `movies.csv`, `ratings.csv` and `tags.csv`.

In [1]:
# spark imports
from pyspark.sql import SparkSession
# from pyspark.sql.functions import UserDefinedFunction, explode, desc
# from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

# import recommender
from recommender.data_loading.spark_get_data import load_csv_to_spark_df, load_csv_to_spark_rdd
from recommender.data_cleaning.spark_preprocessing import get_rating, data_split
from recommender.modeling.spark_collaborative_filtering import als_gridsearch
from recommender.modeling.spark_recommend import make_recommendation, make_recommendation_new

In [2]:
# spark config
spark = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

# Load data

In [3]:
movies = load_csv_to_spark_df('../dataset/movies.csv', spark)
ratings = load_csv_to_spark_df('../dataset/ratings.csv', spark)

## basic inspection

In [4]:
movies.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [5]:
ratings.show(3)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
+------+-------+------+----------+
only showing top 3 rows



## Reload data
We will use an RDD-based API from pyspark.mllib to predict the ratings, so let's reload "ratings.csv" using sc.textFile and then convert it to the form of (user, item, rating) tuples.

In [6]:
# load data
movie_rating = load_csv_to_spark_rdd('../dataset/ratings.csv', sc)

# preprocess data -- only need ["userId", "movieId", "rating"]
rating_data = get_rating(movie_rating)

# check three rows
rating_data.take(3)

[(1, 307, 3.5), (1, 481, 3.5), (1, 1091, 1.5)]

## Split data
Now we split the data into training/validation/testing sets using a 6/2/2 ratio.

In [7]:
train, validation, test = data_split(rating_data, [6, 2, 2])

# cache data
train.cache()
validation.cache()
test.cache()

PythonRDD[35] at RDD at PythonRDD.scala:53

# Spark ALS based approach for training model
1. ALS model selection and evaluation
2. Model testing

## ALS model selection and evaluation
With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [8]:
# hyper-param config
num_iterations = 10
# ranks = [14]
# reg_params = [0.05]
ranks = [8, 10, 12, 14, 16, 18, 20]
reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]

# grid search and select best model
final_model = als_gridsearch(train, validation, num_iterations, reg_params, ranks)

8 latent factors and regularization = 0.001: validation RMSE is 0.8790523112952408
8 latent factors and regularization = 0.01: validation RMSE is 0.8526333272385692
8 latent factors and regularization = 0.05: validation RMSE is 0.8224192070107165
8 latent factors and regularization = 0.1: validation RMSE is 0.8230813541241517
8 latent factors and regularization = 0.2: validation RMSE is 0.865981220900152
10 latent factors and regularization = 0.001: validation RMSE is 0.891151967668972
10 latent factors and regularization = 0.01: validation RMSE is 0.8562651468256183
10 latent factors and regularization = 0.05: validation RMSE is 0.8190701557383728
10 latent factors and regularization = 0.1: validation RMSE is 0.8206398496275041
10 latent factors and regularization = 0.2: validation RMSE is 0.8658968041719375
12 latent factors and regularization = 0.001: validation RMSE is 0.8983982396110236
12 latent factors and regularization = 0.01: validation RMSE is 0.8607961248888077
12 latent fa

## Model testing
And finally, make a prediction and check the testing error using out-of-sample data

In [9]:
# make prediction using test data
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))

The out-of-sample RMSE of rating predictions is 0.8141


# Make movie recommendation
Define a function to make top 10 recommendations to a user

In [10]:
# get recommends
recommends = make_recommendation(
    model = final_model, 
    ratings_data = rating_data, 
    df_movies = movies, 
    user_id = 1, 
    n_recommendations = 10, 
    spark_context = sc)

print('Recommendations for User {}:'.format(1))
for i, title in enumerate(recommends):
    print('{0}: {1}'.format(i+1, title))

Recommendations for User 1:
1: Eve and the Fire Horse (2005)
2: Crime Wave (1985)
3: Boogie-Doodle (1948)
4: Kaaka Muttai (2015)
5: Morgan Murphy: Irish Goodbye (2014)
6: The Zohar Secret (2015)
7: Who Killed Chea Vichea? (2010)
8: Final Cut: Ladies and Gentlemen (2012)
9: NOFX Backstage Passport 2
10: Heroes (2008)


# Make movie recommendation to a new user
We need to define a function that takes new user's movie rating and output top 10 recommendations

In [11]:
# my favorite movies
my_favorite_movies = ['Iron Man', 'Jumanji', 'Transformers', 'Independence Day']

# get recommends
recommends = make_recommendation_new(
    best_model_params = {'iterations': 10, 'rank': 14, 'lambda_': 0.05}, 
    ratings_data=rating_data, 
    df_movies=movies, 
    fav_movie_list=my_favorite_movies, 
    n_recommendations=10, 
    spark_context=sc)

print('Recommendations for {}:'.format(my_favorite_movies[0]))
for i, title in enumerate(recommends):
    print('{0}: {1}'.format(i+1, title))

Recommendations for Iron Man:
1: Scarlet Dove, The (Tulipunainen kyyhkynen) (1961)
2: Pearl Jam: Immagine in Cornice - Live in Italy 2006 (2007)
3: Presumed Guilty (Presunto culpable) (2008)
4: The Veil of Twilight (2014)
5: O Pátio das Cantigas (1942)
6: Hunterrr (2015)
7: Margaret Cho: PsyCHO (2015)
8: Heroes Above All (2017)
9: The Magnificent Scoundrels (1991)
10: Whitney Cummings: Money Shot (2010)
