In [50]:
# Check the environment
!java --version
!python --version

openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)
Python 3.9.16


In [51]:
# Download Apache Spark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [52]:
# Initiate the Spark Session
from pyspark.sql import SparkSession

# Create Spark Session/Context
spark = SparkSession.builder \
  .master("local") \
  .appName("Hello PySpark") \
  .config ("spark.some.config.option", "some-value") \
  .getOrCreate()

In [53]:
# Check spark session
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f0948c13190>


In [54]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

In [55]:
import os

datasets_path = os.path.join('/content', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

In [56]:
import urllib
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

In [57]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

In [58]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate();

small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [59]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [60]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [61]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)
     

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [62]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6., 2., 2.])
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [64]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5

# Use 3 values from Assignment 9 [5, 10, 20]

iterations_d = [5, 10, 15]

regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0, 0, 0, 0, 0 ,0 ,0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
best_iteration_value = -1
for rank in ranks:
    for it in iterations_d:
      model = ALS.train(training_RDD, rank, seed=seed, iterations=it,
                        lambda_=regularization_parameter)
      predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
      rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
      error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
      errors[err] = error
      err += 1
      print('For rank ' + str(rank) + ' with iteration value ' + str(it) +' the RMSE is ' + str(error) )
      if error < min_error:
          min_error = error
          best_rank = rank
          best_iteration_value = it

print('The best model was trained with rank ' + str(best_rank) + ' with iteration value ' + str(best_iteration_value))

For rank 4 with iteration value 5 the RMSE is 0.9103573150607506
For rank 4 with iteration value 10 the RMSE is 0.9064253747689525
For rank 4 with iteration value 15 the RMSE is 0.9079257072776615
For rank 8 with iteration value 5 the RMSE is 0.9195673810274106
For rank 8 with iteration value 10 the RMSE is 0.9156794256325467
For rank 8 with iteration value 15 the RMSE is 0.9137587120878743
For rank 12 with iteration value 5 the RMSE is 0.9174298392601326
For rank 12 with iteration value 10 the RMSE is 0.9127723374675542
For rank 12 with iteration value 15 the RMSE is 0.9120181236232715
The best model was trained with rank 4 with iteration value 10


In [65]:
predictions.take(3)

[((140, 37739), 2.797786020965213),
 ((204, 81132), 3.4907665795233482),
 ((6, 667), 3.0856364145234068)]

In [66]:
rates_and_preds.take(3)

[((1, 47), (5.0, 4.451314145876593)),
 ((1, 151), (5.0, 3.8846304226326973)),
 ((1, 333), (5.0, 4.069972133396847))]

In [69]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=best_iteration_value,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is ' + str(error))

For testing data the RMSE is 0.9027670293861173


In [71]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are " + str(complete_ratings_data.count()) + "recommendations in the complete dataset")

There are 27753444recommendations in the complete dataset


In [75]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=best_iteration_value, lambda_=regularization_parameter)

In [77]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is ' + str (error))

For testing data the RMSE is 0.8334782407175128


In [83]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are " + str(complete_movies_titles.count()) + "movies in the complete dataset" )
     

There are 58098movies in the complete dataset


In [84]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
     

In [86]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: ' + str(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [87]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)


In [89]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=best_iteration_value, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in " + str(round(tt,3)) + "seconds")

New model trained in 452.912seconds


In [90]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
     

In [91]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)
     

[(7020, ((7.10875580736019, 'Proof (1991)'), 377)),
 (53352, ((4.793284562650635, 'Sheitan (2006)'), 59)),
 (162396, ((3.184297373552882, 'Skiptrace (2016)'), 71))]

In [93]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [94]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Rabbit of Seville (1950)', 8.602833252855959, 30)
("Jim Henson's The Storyteller (1989)", 8.544291307524375, 36)
('"Godfather', 8.526288771636047, 60904)
('Cosmos', 8.524673058903474, 157)
('Death on the Staircase (Soupçons) (2004)', 8.521655972159827, 130)
('Music for One Apartment and Six Drummers (2001)', 8.50282131914495, 31)
('"Human Condition III', 8.495126990197761, 91)
('Harakiri (Seppuku) (1962)', 8.494578592460044, 679)
('"I', 8.424359849383443, 85)
('Planet Earth (2006)', 8.424323161050566, 1384)
('Seven Samurai (Shichinin no samurai) (1954)', 8.400267191214681, 14578)
('Planet Earth II (2016)', 8.398935346868512, 853)
('Elway To Marino (2013)', 8.391345064989846, 25)
('"Last Lions', 8.391119016112746, 38)
('Baseball (1994)', 8.390721256104534, 42)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.388162195683861, 29484)
('"Godfather: Part II', 8.37539843269385, 38875)
('The Godfather Trilo

In [95]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=2.070225501546683)]

In [96]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('..', 'models', 'movie_lens_als')

# Save and load model
model.save(sc, model_path)
same_model = MatrixFactorizationModel.load(sc, model_path)