In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=8b86d4ef8708593aa2e71dde0171464382f8ca9e7a57ed4023a9cd3009437b90
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip

--2023-04-13 23:02:45--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘ml-latest.zip’


2023-04-13 23:02:51 (43.2 MB/s) - ‘ml-latest.zip’ saved [277113433/277113433]

--2023-04-13 23:02:51--  http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2023-04-13 23:02:52 (2.63 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]



In [3]:
import zipfile

with zipfile.ZipFile('ml-latest-small.zip', "r") as z:
    z.extractall()

with zipfile.ZipFile('ml-latest.zip', "r") as z:
    z.extractall()

In [4]:
import os
from pyspark import SparkContext as sc
from pyspark import SparkConf

small_ratings_file = os.path.join('ml-latest-small', 'ratings.csv')

conf = SparkConf().setMaster("local[*]").setAppName("Hello Pyspark").set("spark.driver.memory", "8g")
spark_context = sc(conf=conf).getOrCreate()

small_ratings_raw_data = sc.textFile(spark_context, small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [5]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [6]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [7]:
small_movies_file = os.path.join('ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(spark_context, small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [8]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [9]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [5, 10, 20]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank ' + str(rank) + ' the RMSE is ' + str(error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank ' + str(best_rank))

For rank 4 the RMSE is 0.9121002116178583
For rank 8 the RMSE is 0.9184327219898198
For rank 12 the RMSE is 0.916015153748882
The best model was trained with rank 4


In [10]:
predictions.take(3)

[((372, 1084), 3.6067833516058667),
 ((4, 1084), 3.653744366837769),
 ((402, 1084), 3.5469004708498133)]

In [11]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.478201161810268)),
 ((1, 1025), (5.0, 4.508338602938871)),
 ((1, 1089), (5.0, 4.927108355489968))]

In [12]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is ' + str(error))

For testing data the RMSE is 0.9121326053443494


In [13]:
# Load the complete dataset file
complete_ratings_file = os.path.join('ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(spark_context, complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are " + str(complete_ratings_data.count()) + " recommendations in the complete dataset")

There are 27753444 recommendations in the complete dataset


In [14]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [15]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is ' + str(error))

For testing data the RMSE is 0.8334782409945156


In [16]:
complete_movies_file = os.path.join('ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(spark_context, complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are " + str(complete_movies_titles.count()) + " movies in the complete dataset")

There are 58098 movies in the complete dataset


In [17]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [19]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(spark_context, new_user_ratings)
print('New user ratings: ' + str(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [20]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [21]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in "+ str(round(tt,3)) +" seconds") 

New model trained in 286.349 seconds


In [22]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [23]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((7.17338479587357, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'), 717)),
 (124320, ((7.359569804065565, 'Once a Thief (1965)'), 1)),
 (83916, ((6.125867196214774, 'Blues in the Night (1941)'), 9))]

In [24]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [25]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Rabbit of Seville (1950)', 8.602832993238621, 30)
("Jim Henson's The Storyteller (1989)", 8.544290861325912, 36)
('"Godfather', 8.526288637899562, 60904)
('Cosmos', 8.524673156099944, 157)
('Death on the Staircase (Soupçons) (2004)', 8.521655525905146, 130)
('Music for One Apartment and Six Drummers (2001)', 8.50282052560165, 31)
('"Human Condition III', 8.495126555951444, 91)
('Harakiri (Seppuku) (1962)', 8.494577936581912, 679)
('"I', 8.42435972188288, 85)
('Planet Earth (2006)', 8.424323239360128, 1384)
('Seven Samurai (Shichinin no samurai) (1954)', 8.400266574643759, 14578)
('Planet Earth II (2016)', 8.398934986500649, 853)
('Elway To Marino (2013)', 8.391344849131297, 25)
('"Last Lions', 8.391118675662526, 38)
('Baseball (1994)', 8.390720769664654, 42)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 8.388161933961854, 29484)
('"Godfather: Part II', 8.375398025723136, 38875)
('The Godfather Trilo

In [27]:
my_movie = sc.parallelize(spark_context, [(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=2.0702254002989218)]

In [32]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('models', 'movie_lens_als')

# Save and load model
model.save(spark_context, model_path)
same_model = MatrixFactorizationModel.load(spark_context, model_path)