# Building Recommendation System Using ALS Algorithms with Saprk
---

This file will build a recommendation system on the [Movielens](https://grouplens.org/datasets/movielens/) dataset. using [Alternating Least Squares](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) algorithms with Spark. Particularly, this file is in the early development and debugging phase, which will only use a small dataset from Movielens instead of the complete dataset, and there will also elaborate how to search parameters for ALS algorithms for later use. 

Key components of this file:
* **Preparing data**. Look into the data and conclude the statistics of the data to provide evidence to select algorithms.
* **Parameters searching using a small set of data**. Before developing a recommendation engine on the whole dataset, using a small amount of data fitting into a single machine's memory to develop and test the recommendation system and search for parameters if necessary.
* **AWS EMR deployment** (see details in README.md file.)

Since it's not a classification problem, Root Mean Square Error will be used to evaluate it's performance.

## Preparing data
---

Small dataset : 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users. Last updated 9/2018.

The specific statistics are:
1. ratings.csv (100837 lines) with format: 
`userId, movieId, raitng, timestamp`   Eg.(1,70,3.0,964982400)

2. movies.csv (9743 lines) with format:
`movieId, title, gengres`   Eg.(6,Heat (1995),Action|Crime|Thriller)

3. tags.csv (3684 lines) with format:
`userId, movieId, tag, timestamp`   Eg.(2,60756,funny,1445714994)

4. links.csv (9743 lines) with format:
`movieId, imdbId, tmdbId`   Eg.(1,0114709,862)

Load the raw data, filter the header and parse it into a new RDD

In [1]:
import pyspark
sc = pyspark.SparkContext()

In [2]:
import os

# datasets_path = '~/where-you-data-locates'
datasets_path = 'dataset/ml-latest-small'
small_ratings_file = os.path.join(datasets_path, 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

Filtering out 'timestep' bacause we don't need it so far. Using `cache()` to save it in the memory for later use because this RDD will be used frequently.

**note**: You can mark an RDD to be persisted using the persist() or cache() methods on it. each persisted RDD can be stored using a different storage level The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). refered from:https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist

In [3]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line != small_ratings_raw_data_header)\
            .map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()

Apply all the same to the movie.csv file

In [4]:
small_movies_file = 'dataset/ml-latest-small/movies.csv'

small_movies_raw_data = sc.textFile(small_movies_file)
small_movie_file_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movie_file_header)\
                .map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1])).cache()

In order to determine the ALS parameters. we are going to split the dataset into training set, validation set and test set.

In [5]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed = 0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

Finished

## Parameters searching using a small set of data
---

The implementation in MLlib has the following parameters:

* numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
* rank is the number of latent factors in the model.
* iterations is the number of iterations to run.
* lambda specifies the regularization parameter in ALS.
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [6]:
from pyspark.mllib.recommendation import ALS
import math

# set hyper-parameters
seed = 1
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteratoin = -1

# start training
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, 
                     lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    # calculate the error
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print("For rank %s the RMSE is %s"%(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank
    
print("The best rank for the model is %s"%(best_rank))

For rank 4 the RMSE is 0.9160644755656506
For rank 8 the RMSE is 0.9193834821092156
For rank 12 the RMSE is 0.9179160443531135
The best rank for the model is 4


Now, we can test the selected model

In [7]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                 lambda_=regularization_parameter)
raw_predictions = model.predictAll(test_for_predict_RDD)
# raw_predictoin is a (user, produdct, raitng) tuple
predictions = raw_predictions.map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0]-r[1][1])**2).mean())

print("For test data, the RMSE is%s"%error)

For test data, the RMSE is0.9142876730094521


In the end, the ALS recommendation system was developed and teset through a small amount of dataset. Beside, we also selected the rank parameter as 4. Next, I will build the system on the complete dataset and deploy it on the AWS EMR Clusters.