# Building recommender using Apache Spark
Written by: Ryan Garnet Andrianto (Student ID: 05111940000063)

Learning source: https://github.com/jadianes/spark-movie-lens/blob/master/notebooks/building-recommender.ipynb

## File download
We download required dataset file for this project. And then, we extract and store them in the datasets folder.

http://files.grouplens.org/datasets/movielens/ml-latest.zip

http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

## Prepare pySpark

In [1]:
from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('PySparkShell2') \
    .getOrCreate()

spark

In [2]:
# Create spark context
sc = spark.sparkContext

sc

## Loading and parsing datasets

### For ratings data

In [3]:
import os

datasets_path = 'datasets'

# Load the raw ratings data
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
small_ratings_raw_data = sc.textFile(small_ratings_file)

In [5]:
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
print(small_ratings_raw_data_header)

userId,movieId,rating,timestamp


In [6]:
# Parse the raw data into a new RDD
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [7]:
# View first 3 lines of the data
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

### For movies data

In [8]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## Collaborative Filtering

### Select ALS parameters using the small dataset

In this section, we need to find rank value that has minimum RMSE.

In [9]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [11]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = [5, 10, 20]
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = []
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    for iter in iterations:
        model = ALS.train(training_RDD, rank, seed=seed, iterations=iter, lambda_=regularization_parameter)
        predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
        rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        errors.append(error)
        err += 1
        print('For rank %s and iter %d the RMSE is %s' % (rank, iter, error))
        if error < min_error:
            min_error = error
            best_rank = rank
            best_iteration = iter

print('The best model was trained with rank %s and iteration %d' % (best_rank, best_iteration))

For rank 4 and iter 5 the RMSE is 0.9185618863571628
For rank 4 and iter 10 the RMSE is 0.9101130521673209
For rank 4 and iter 20 the RMSE is 0.9088463399661528
For rank 8 and iter 5 the RMSE is 0.9232933464148313
For rank 8 and iter 10 the RMSE is 0.9181027569907748
For rank 8 and iter 20 the RMSE is 0.9154119934303628
For rank 12 and iter 5 the RMSE is 0.9241003122243715
For rank 12 and iter 10 the RMSE is 0.9172515561525698
For rank 12 and iter 20 the RMSE is 0.911461232989448
The best model was trained with rank 4 and iteration 20


In [12]:
predictions.take(3)

[((610, 81132), 2.9841647491030305),
 ((480, 6156), 2.5253400158653942),
 ((474, 6156), 2.729239165057468)]

In [13]:
rates_and_preds.take(3)

[((1, 2193), (4.0, 4.06605669449706)),
 ((3, 26409), (4.5, 0.2151898683676754)),
 ((4, 914), (5.0, 3.589487919443745))]

In [14]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=best_iteration,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9073832832692129


## Using the complete dataset to build the final model

In [15]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 100836 recommendations in the complete dataset


In [16]:
# Train the recommender model
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=best_iteration, lambda_=regularization_parameter)

In [17]:
# Test the testing set
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8952075137861463


## How to make recommendation

In this section, we make recommendation.

In [18]:
# Read movies data for later use

complete_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 9742 movies in the complete dataset


In [19]:
# Count the number of ratings per movie
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

## Adding new user ratings

In [20]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [21]:
# Add the new user rating data to the recommender model
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [22]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, iterations=best_iteration, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in %s seconds" % round(tt,3))

New model trained in 24.109 seconds


## Getting top recommendations

In [23]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [24]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)
     

[(143472, ((2.9579303799677983, 'Into the Grizzly Maze (2015)'), 1)),
 (1344, ((7.691646603845651, 'Cape Fear (1962)'), 14)),
 (50064, ((5.915860759935597, '"Good German'), 1))]

In [25]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [26]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))
     

TOP recommended movies (with more than 25 reviews):
('12 Angry Men (1957)', 9.12750527649064, 57)
('"Philadelphia Story', 9.036267056714419, 29)
('Wallace & Gromit: The Best of Aardman Animation (1996)', 9.035811815588932, 27)
('Citizen Kane (1941)', 8.954452888584955, 69)
('"Great Escape', 8.940897708653859, 43)
('"Producers', 8.873706552642934, 33)
('Casablanca (1942)', 8.858144061900639, 100)
('My Fair Lady (1964)', 8.842192804660739, 35)
('Lawrence of Arabia (1962)', 8.837736921642282, 45)
('Strangers on a Train (1951)', 8.837677088638609, 25)
('"Godfather', 8.822060893995387, 192)
('North by Northwest (1959)', 8.819166466178423, 57)
('"Bridge on the River Kwai', 8.805509036335005, 45)
('In the Name of the Father (1993)', 8.778590271477004, 25)
('Wallace & Gromit: The Wrong Trousers (1993)', 8.777712999352625, 56)
('Spirited Away (Sen to Chihiro no kamikakushi) (2001)', 8.773133437929857, 87)
('Raging Bull (1980)', 8.760831495636037, 40)
('Sunset Blvd. (a.k.a. Sunset Boulevard) (19

## Getting individual ratings

In [27]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=81132, rating=6.630364599151603)]

## Persisting the model

In [30]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)