In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [3]:
import os
import findspark
os.environ["PYSPARK_PYTHON"] = "python3"
findspark.init("../../../spark-2.2.1-bin-hadoop2.7",)

## Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = (
    SparkSession.builder
        .master("local[*]")
        .appName("Spark Movie Recommendation Project")
        .getOrCreate()
)
sc = spark.sparkContext

In [7]:
movies = spark.read.load("ml-latest-small/movies.csv", format='csv', header = True)
ratings = spark.read.load("ml-latest-small/ratings.csv", format='csv', header = True)
links = spark.read.load("ml-latest-small/links.csv", format='csv', header = True)
tags = spark.read.load("ml-latest-small/tags.csv", format='csv', header = True)

In [8]:
movies.show(5)

In [9]:
ratings.show(5)

In [10]:
print('Distinct values of ratings:')
print sorted(ratings.select('rating').distinct().rdd.map(lambda r: r[0]).collect())

In [11]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [12]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

In [13]:
links.show(5)

In [14]:
tags.show(5)

### Q1: The number of Users

In [16]:
ratings.select('userId').union(tags.select('userId')).distinct().count()

### Q2: The number of Movies

In [18]:
ratings.select('movieId').union(tags.select('movieId')).distinct().count()

### Q3:  How many movies are rated by users? List movies not rated before

In [20]:
num_movies_rated = ratings.select('movieId').distinct().count()
num_movies_rated

In [21]:
all_movies = ratings.select('movieId').union(tags.select('movieId'))
rated = ratings.select('movieId')
not_rated = all_movies.subtract(rated)
not_rated.distinct().show()

### Q4: List Movie Genres

In [23]:
#movies.select('genres').map(lambda x: x[0].split('|')) ????? does not work
movies.select('genres').distinct().show()

### Q5: Movie for Each Category

In [25]:
movies.groupby("genres").count().orderBy("count", ascending=False).show()

## Prepare Data for Training
We will use an RDD-based API from [pyspark.mllib](https://spark.apache.org/docs/2.1.1/mllib-collaborative-filtering.html) to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [27]:
from pyspark.mllib.recommendation import ALS

In [28]:
movie_rating = sc.textFile("ml-latest-small/ratings.csv")

In [29]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [30]:
# check three rows
rating_data.take(3)

Now we split the data into training/validation/testing sets using a 6/2/2 ratio.

In [32]:
train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)

In [33]:
train.cache()

In [34]:
validation.cache()

In [35]:
test.cache()

## ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [37]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            model = ALS.train(train_data, rank, iterations = num_iters, lambda_ = reg)
            predictions = model.predictAll(validation_data.map(lambda x: (x[0], x[1])))
            predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
            rate_and_preds = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
            error = math.sqrt(rate_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            print ('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print ('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [38]:
num_iterations = 10
ranks = [6, 8, 10, 12, 14]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

import time
start_time = time.time()

final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

The model with 10 latent factors and lambda = 0.2 yields the best result. Let's plot the learning curves for this model.

In [40]:
# Is there a smarter way to plot the learning curves without re-training
# the model repeatedly...?
def plot_learning_curve(iter_array, train_data, validation_data, reg, rank):
    
    train_rmse = []
    valid_rmse = []
    for num_iters in iter_array:
        
        model = ALS.train(train_data, rank, iterations = num_iters, lambda_ = reg)
        
        predictions = model.predictAll(validation_data.map(lambda x: (x[0], x[1])))
        predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
        rate_and_preds = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
        error = math.sqrt(rate_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        valid_rmse.append(error)
        
        predictions = model.predictAll(train_data.map(lambda x: (x[0], x[1])))
        predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
        rate_and_preds = train_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
        error = math.sqrt(rate_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        train_rmse.append(error)
        
    plt.plot(iter_array, train_rmse, label='Training', linewidth=5)
    plt.plot(iter_array, valid_rmse, label='Validation', linewidth=5)
    plt.xticks(range(0, max(iter_array) + 1, 2), fontsize=16)
    plt.yticks(fontsize=16)
    plt.xlabel('iterations', fontsize=30)
    plt.ylabel('RMSE', fontsize=30)
    plt.legend(loc='best', fontsize=20) 
    plt.show()

In [41]:
iter_array = [1, 2, 5, 10]
plot_learning_curve(iter_array, train, validation, 0.2, 10)

And finally, let's check the testing error.

In [43]:
predictions = final_model.predictAll(test.map(lambda x: (x[0], x[1]))) 
predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print ('For testing data the RMSE is %s' % (error))

This is slightly better than the validation error (0.936).