# Building a Movie Recommendation Service with Apache Spark & Flask

# A Movie Recommendation Service

### Source: https://www.codementor.io/@jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

### Create a SparkContext configured for local mode

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

### File download
Small: 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users.  
Full: 27,000,000 ratings and 1,100,000 tag applications applied to 58,000 movies by 280,000 users.

In [3]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

### Download locations

In [4]:
import os

datasets_path = os.path.join('..', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

### Getting Files

In [5]:
import os
import urllib.request

os.makedirs('../datasets', exist_ok=True)  

small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'  
small_dataset_path = 'ml-latest-small.zip'

small_f = urllib.request.urlretrieve(small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve(complete_dataset_url, complete_dataset_path)

### Extracting Files

In [6]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

### Loading and parsing datasets

In [7]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [8]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [9]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [10]:
[(u'1', u'6', u'2.0'), (u'1', u'22', u'3.0'), (u'1', u'32', u'2.0')]

[('1', '6', '2.0'), ('1', '22', '3.0'), ('1', '32', '2.0')]

In [11]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [12]:
[(u'1', u'Toy Story (1995)'),
 (u'2', u'Jumanji (1995)'),
 (u'3', u'Grumpier Old Men (1995)')]

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

### Collaborative Filtering

### Selecting ALS parameters using the small dataset
In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.

In [13]:
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [14]:
from pyspark.mllib.recommendation import ALS
import math

seed = 0
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.9183862590462684
For rank 8 the RMSE is 0.9213494912641171
For rank 12 the RMSE is 0.9200915163590333
The best model was trained with rank 4


In [15]:
predictions.take(3)

[((372, 1084), 3.5593894578412266),
 ((4, 1084), 3.868934864944413),
 ((402, 1084), 3.494127326393066)]

In [16]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.741557296710193)),
 ((1, 1025), (5.0, 4.594651219750348)),
 ((1, 1089), (5.0, 4.912674081228232))]

In [17]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9152086502370205


### Using the complete dataset to build the final model

In [18]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 33832162 recommendations in the complete dataset


In [19]:
complete_ratings_data.take(3)

[(1, 1, 4.0), (1, 110, 4.0), (1, 158, 4.0)]

In [20]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [21]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8266775749029673


### How to make recommendations

In [22]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print('There are %s movies in the complete dataset' % (complete_movies_titles.count()))

There are 86537 movies in the complete dataset


In [23]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### Adding new user ratings

In [24]:
new_user_ID = 0

#S1
# The format of each line is (userID, movieID, rating)
# new_user_ratings = [
#      (0,260,4), # Star Wars (1977)
#      (0,1,3), # Toy Story (1995)
#      (0,16,3), # Casino (1995)
#      (0,25,4), # Leaving Las Vegas (1995)
#      (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
#      (0,335,1), # Flintstones, The (1994)
#      (0,379,1), # Timecop (1994)
#      (0,296,3), # Pulp Fiction (1994)
#      (0,858,5) , # Godfather, The (1972)
#      (0,50,4) # Usual Suspects, The (1995)
#     ]
# new_user_ratings_RDD = sc.parallelize(new_user_ratings)
# print ('New user ratings: %s' % new_user_ratings_RDD.take(10))


#S2
new_user_ratings = [
     (0,13,2), # Balto (1995)
     (0,21,1), # Get Shorty (1995)
     (0,24,3), # Powder (1995)
     (0,32,2), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,36,2), # Dead Man Walking (1995)
     (0,49,3), # When Night Is Falling (1995)
     (0,42,2), # Dead Presidents (1995)
     (0,28,3), # Persuasion (1995)
     (0,96,3), # In the Bleak Midwinter (1995)
     (0,88,1) # Black Sheep (1996)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: %s' % new_user_ratings_RDD.take(10))


New user ratings: [(0, 13, 2), (0, 21, 1), (0, 24, 3), (0, 32, 2), (0, 36, 2), (0, 49, 3), (0, 42, 2), (0, 28, 3), (0, 96, 3), (0, 88, 1)]


In [25]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [26]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print('New model trained in %s seconds' % round(tt,3))

New model trained in 213.763 seconds


### Getting top recommendations

In [27]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [28]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(257805, ((2.0568566469147687, 'Best Sellers (2021)'), 32)),
 (154530, ((0.5149921109076971, 'Recto / verso'), 2)),
 (71910, ((1.5626779540337972, '"Tournament'), 268))]

In [29]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

### FULL dataset, filtering out movies with less than 25 ratings (meaning 25 or more ratings)

In [30]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
("India's Daughter (2015)", 3.1900949119674493, 34)
('The Magic Ring (1982)', 3.1461716459918954, 27)
('The Mitten (1967)', 3.129949774051118, 29)
('Connections (1978)', 3.059156788369963, 57)
('Crazed Fruit (Kurutta kajitsu) (1956)', 3.0283982419995645, 27)
('The Adventures of Scamper the Penguin (1986)', 3.0233711488814023, 25)
('Thirst (Pyaasa) (1957)', 3.020262430221001, 37)
('Bobik Visiting Barbos (1977)', 3.0188672074759673, 55)
('Salut cousin! (1996)', 2.994001157195015, 35)
('A Plasticine Crow (1981)', 2.9895910057865875, 75)
('The Little World of Don Camillo (1952)', 2.987293687296024, 65)
('After Lucia (2012)', 2.9848709711036747, 40)
('Springsteen On Broadway (2018)', 2.9847495739722394, 28)
('Anne Of Green Gables: The Continuing Story (2000)', 2.983758547776909, 27)
('Rent: Filmed Live on Broadway (2008)', 2.9705508496413913, 41)
('Vovka in the Kingdom of Far Far Away (1965)', 2.9641320772671946, 88)
('Maiden (2019)', 2.95

### FULL dataset, filtering out movies with less than 100 ratings (meaning 100 or more ratings)

In [31]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 100 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 100 reviews):
('Boys Life 2 (1997)', 2.9518789206812457, 101)
('Junior and Karlson (1968)', 2.89748563740741, 108)
('Three from Prostokvashino (1978)', 2.852848159906679, 159)
('Karlson Returns (1970)', 2.844121300859129, 119)
('Formula of Love (1984)', 2.805795037751656, 106)
('Winter in Prostokvashino (1984)', 2.8057230179619745, 131)
('Anne Frank Remembered (1995)', 2.800809529010722, 1058)
('Nobody Loves Me (Keiner liebt mich) (1994)', 2.7982850339351604, 140)
('Boy Meets Girl (2015)', 2.7963783942737344, 101)
('Solas (1999)', 2.7800401898054226, 170)
('The Biggest Little Farm (2018)', 2.779439307713261, 121)
('That Munchhausen (1979)', 2.7583928021932698, 155)
('Ordinary Miracle (1978)', 2.7580537484463363, 152)
("Last Year's Snow Was Falling (1983)", 2.7548471266804118, 244)
('Return to Treasure Island (1988)', 2.753999903474514, 180)
('As it is in Heaven (Så som i himmelen) (2004)', 2.7500127306505195, 290)
('Winnie the Pooh Goes Visiting (

### Getting individual ratings

In [32]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=206085, rating=1.8627968780826043)]