In [0]:
# Install JDK 8 
!sudo apt-get purge openjdk-\* icedtea-\* icedtea6-\*
!sudo apt autoremove
!sudo apt install openjdk-8-jre-headless


In [15]:
!java -version

openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)


In [0]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [0]:
!pip3 install pyspark

In [0]:
import os
# os.mkdir('datasets')

In [0]:

# Make sure path is correct in your system
datasets_path = os.path.join('..', 'content/datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [0]:
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

In [0]:
# Extract content from zip files

import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

 
**Data Cleaning**
1.   For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating). We drop the timestamp because we do not need it for this recommender.
2.  For each line in the movies dataset, we create a tuple of (MovieID, Title). We drop the genres because we do not use them for this recommender.



In [0]:
from pyspark import SparkContext
sc =SparkContext()

In [0]:
# From raw ratings data filter out the header, included in each file 

small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [0]:
# Partse the raw data into a new RDD

small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [7]:
# Test whether working or not

small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [8]:
# Perform similar operation for movies.csv file 

small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

**Collaborative Filetering**

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares (ALS).

The implementation in MLlib has the following parameters: 

*   `numBlocks` = no. of blocks used to parallelize computation
*   `rank` = no. of latent factors in the model
*   `iterations` = no. of iterations to run
*   `lambda` = regularization parameter in ALS
*    `implicitPrefs` =  whether to use the explicit feedback ALS variant or one adapted for implicit feedback data





In [0]:
# Selecting the best parameters using ALS dataset
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=42)


In [0]:
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [11]:
# Training phase

from pyspark.mllib.recommendation import ALS
import math

seed = 43
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9064372777135662
For rank 8 the RMSE is 0.9091906850626293
For rank 12 the RMSE is 0.9140608348862169
The best model was trained with rank 4


In [12]:
# Denotes UserID, MovieID and the Rating 
predictions.take(3)

[((140, 1084), 3.231818212336625),
 ((74, 1084), 3.987631471353916),
 ((402, 1084), 3.7486550840147554)]

In [13]:
# Join with the validation data and the result looks like 
rates_and_preds.take(3)

[((1, 553), (5.0, 4.781500981595887)),
 ((1, 673), (3.0, 2.7816958851413034)),
 ((1, 1025), (5.0, 4.126055553786673))]

In [14]:
# Test the selected model 

model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9088557494902373


Use complete dataset to build the final model

In [15]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [0]:
# Train the recommender model
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=42)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [17]:
# Test on the testing set
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8319739154528915


In [18]:
# Load the movies complete file for later use

complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [0]:
# Find the count of the number of ratings per movie so that 
# we can give recommendations of movies with a certain number of ratings

def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [20]:
# Adding new user rating: we need to rate some movies for the new user

new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [0]:
# Add them to the data we will use to train the recommender model 

complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [22]:
# Train ALS model using all parameters we selected before

from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 218.825 seconds


In [0]:
# Getting top recommendations

new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [24]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)



[(6216,
  ((3.350020434608627, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((3.28735185728935, 'Once a Thief (1965)'), 1)),
 (83916, ((3.0594761092407268, 'Blues in the Night (1941)'), 9))]

In [0]:
# Rearrange to (Title, Rating, Ratings Count)
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [26]:
# Get highest rated movies, removies movies with less than 25 ratings 
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('"Things I Like', 4.122233013769481, 30)
('Slaying the Badger', 4.067588548573652, 25)
('Heimat - A Chronicle of Germany (Heimat - Eine deutsche Chronik) (1984)', 4.034303912020852, 35)
('Wild Wild Country (2018)', 3.989878863955422, 55)
('The Garden of Sinners - Chapter 5: Paradox Paradigm (2008)', 3.978482670589439, 27)
('Planet Earth II (2016)', 3.93846669843038, 853)
('Cosmos', 3.9372759193341125, 157)
('"Adventures of Picasso', 3.9364477195805523, 28)
('Frozen Planet (2011)', 3.936002348627664, 402)
('Planet Earth (2006)', 3.8960044347239364, 1384)
('The Blue Planet (2001)', 3.8892853413682786, 421)
('"Enemies of Reason', 3.8798502927959184, 32)
('Olive Kitteridge (2014)', 3.876616353993793, 211)
('Alone in the Wilderness (2004)', 3.8750906074826474, 343)
('Blue Planet II (2017)', 3.8746845657042392, 349)
('Over the Garden Wall (2013)', 3.874470974565863, 377)
('"In the blue sea', 3.8743901971325725, 37)
('"Decalogue', 3.8683238

In [27]:
# Getting invidual ratings 

my_movie = sc.parallelize([(0, 500)])
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=0.7575093482069493)]

In [0]:
# Save and load the ALS model 

from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('..', 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)

In [32]:
# Zip folder in colab
!zip -r /models/file.zip /models/movie_lens_als

  adding: models/movie_lens_als/ (stored 0%)
  adding: models/movie_lens_als/metadata/ (stored 0%)
  adding: models/movie_lens_als/metadata/._SUCCESS.crc (stored 0%)
  adding: models/movie_lens_als/metadata/_SUCCESS (stored 0%)
  adding: models/movie_lens_als/metadata/.part-00000.crc (stored 0%)
  adding: models/movie_lens_als/metadata/part-00000 (deflated 8%)
  adding: models/movie_lens_als/data/ (stored 0%)
  adding: models/movie_lens_als/data/product/ (stored 0%)
  adding: models/movie_lens_als/data/product/._SUCCESS.crc (stored 0%)
  adding: models/movie_lens_als/data/product/part-00000-ec5ab21d-82a9-4c64-b13d-35b6c15f09fe-c000.snappy.parquet (deflated 23%)
  adding: models/movie_lens_als/data/product/.part-00000-ec5ab21d-82a9-4c64-b13d-35b6c15f09fe-c000.snappy.parquet.crc (stored 0%)
  adding: models/movie_lens_als/data/product/part-00001-ec5ab21d-82a9-4c64-b13d-35b6c15f09fe-c000.snappy.parquet (deflated 24%)
  adding: models/movie_lens_als/data/product/_SUCCESS (stored 0%)
  addi

In [0]:
# Download files from colab

from google.colab import files
files.download("/models/file.zip")