## Pumpkinmeter: # A Movie Recommendation Service

### source: https://www.codementor.io/@jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

#### Create a SparkContext configured for local mode


In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'


#### Download location(s)

In [3]:
import os

datasets_path = os.path.join('..', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')


#### Downloading dataset

In [4]:
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

#### Extracting file(s)


In [5]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)
    
with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

## Using small dataset to find the parameters 

In [6]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [7]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [8]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [9]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [10]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [11]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank, error) )
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


In [12]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is ',error )

For testing data the RMSE is  0.9113780946334407


### Load complete ratings

In [13]:
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print ('There are {} recommendations in the complete dataset'.format(complete_ratings_data.count()))
complete_ratings_data.take(3)

There are 33832162 recommendations in the complete dataset


[(1, 1, 4.0), (1, 110, 4.0), (1, 158, 4.0)]

### Training the ratings model

In [14]:

training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [15]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is ', (error))

For testing data the RMSE is  0.8257054095972955


### Load movies name

In [16]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are {} movies in the complete dataset".format(complete_movies_titles.count()))

There are 86537 movies in the complete dataset


In [17]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### User 1 and their recommendations

In [19]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
    (0,147542,4), # Terminal (1996)
    (0,207313,1.5), # Knives Out (2019)
    (0,220104,1.5), # Drive (2019)
    (0,78039,1.5), # Blue Valentine (2010)
    (0,45720,4), # Devil Wears Prada, The (2006)
    (0,109487,4.5), # Interstellar (2014)
    (0,134130,4.5), # The Martian (2015)
    (0,116797,4.5), # The Imitation Game (2014)
    (0,106002,3.5), # Ender's Game (2013)
    (0,192283,4), # Crazy Rich Asians (2018)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: ', new_user_ratings_RDD.take(10))
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in {} seconds".format(round(tt,3)) )


New user ratings:  [(0, 147542, 4), (0, 207313, 1.5), (0, 220104, 1.5), (0, 78039, 1.5), (0, 45720, 4), (0, 109487, 4.5), (0, 134130, 4.5), (0, 116797, 4.5), (0, 106002, 3.5), (0, 192283, 4)]
New model trained in 213.528 seconds


In [20]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print('TOP 15 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print('\nTOP 15 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 25 reviews):
('Mower Minions (2016)', 4.4931411311453235, 42)
('Flywheel (2003)', 4.428797085077275, 31)
('Fireproof (2008)', 4.390744448603254, 264)
('One Piece Film: GOLD (2016)', 4.369903352972412, 42)
('The Bible (2013)', 4.369476398401672, 31)
('Scusa ma ti chiamo amore (2008)', 4.328424174781761, 39)
('Minions: Orientation Day (2010)', 4.325486897298378, 37)
('One Piece: Stampede (2019)', 4.274530072848607, 34)
('Detective Conan: The Last Wizard of the Century (1999)', 4.264817454298781, 34)
('What a Beautiful Day (2011)', 4.2519894353765135, 31)
('Facing the Giants (2006)', 4.232164443918677, 204)
('Pollyanna (2003)', 4.225019597916236, 33)
('Naruto Shippuden the Movie: Blood Prison (2011)', 4.22116338807817, 38)
('Minions: Banana (2010)', 4.2173592576965735, 40)
('Shaadi Mein Zaroor Aana (2017)', 4.212495911939958, 26)

TOP 15 recommended movies (with more than 100 reviews):
('Fireproof (2008)', 4.390744448603254, 264)
('Facing the Gian

### User 2 and their recommendations

In [21]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
    (0,103228,4), # Pacific Rim (2013)
    (0,115149,4.5), # John Wick (2014)
    (0,7454,3.5), # Van Helsing (2004)
    (0,84152,4.5), # Limitless (2011)
    (0,122916,4.5), # Thor: Ragnarok (2017)
    (0,201773,4), # Spider-Man: Far from Home (2019)
    (0,274053,5), # Top Gun: Maverick (2022)
    (0,177765,5), # Coco (2017)
    (0,72378,4), # 	2012 (2009)
    (0,59315,4.5), # Iron Man (2008)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: ', new_user_ratings_RDD.take(10))
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)


t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in {} seconds".format(round(tt,3)) )

new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print('TOP 15 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print('\nTOP 15 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

New user ratings:  [(0, 103228, 4), (0, 115149, 4.5), (0, 7454, 3.5), (0, 84152, 4.5), (0, 122916, 4.5), (0, 201773, 4), (0, 274053, 5), (0, 177765, 5), (0, 72378, 4), (0, 59315, 4.5)]
New model trained in 170.886 seconds
TOP 15 recommended movies (with more than 25 reviews):
('Band of Brothers (2001)', 5.068602905238924, 2835)
('Planet Earth II (2016)', 5.020846141996181, 2041)
('Planet Earth (2006)', 5.015570009897525, 3015)
('"Shawshank Redemption', 5.004293973400436, 122296)
('Cosmos: A Spacetime Odissey', 4.9976465599722655, 599)
('His Last Vow', 4.941830866446576, 41)
('Three Men and a Leg (1997)', 4.934922527691409, 29)
('Firefly (2002)', 4.894696099539658, 895)
('Attack On Titan (2013)', 4.8937120159325325, 263)
('Violet Evergarden: The Movie (2020)', 4.88186981898267, 25)
('Spider-Man: Across the Spider-Verse (2023)', 4.88018285501334, 528)
('Twelve Angry Men (1954)', 4.879578967336652, 332)
('Blue Planet II (2017)', 4.879547543127085, 1267)
('Hornblower: The Even Chance (1998

## Persisting the model


In [22]:
from pyspark.mllib.recommendation import MatrixFactorizationModel
import shutil
import os

model_path = os.path.join('..', 'models', 'movie_lens_als')


# Delete if exists otherwise save command will fail
if os.path.exists(model_path):
    shutil.rmtree(model_path)
    
# Save and load model
new_ratings_model.save(sc, model_path)
new_ratings_model = MatrixFactorizationModel.load(sc, model_path)

## Persisting the ratings

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
complete_data_with_new_ratings_RDD_df = spark.createDataFrame(complete_data_with_new_ratings_RDD, ["userId", "movieId", "rating"])  # rename columns as needed

complete_data_with_new_ratings_RDD_df.write.mode("overwrite").parquet("ratings_data.parquet")

print("Rating data saved in parquet format")


Rating data saved in parquet format
