```python
Professor Name: Dr James J. Lee
Student Name: Aditi Somani
Student ID: 4181152
Subject: BUAN 5315 02 23SQ Big Data Analysis
Total Points: 200
```

# A Movie Recommendation Service

### Source: https://www.codementor.io/spark/tutorial/building-a-recommender-with-apache-spark-python-example-app-part1

#### Create a SparkContext configured for local mode

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

#### File download

Small: 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users. Last updated 9/2018.

Full: 27,000,000 ratings and 1,100,000 tag applications applied to 58,000 movies by 280,000 users. Includes tag genome data with 14 million relevance scores across 1,100 tags. Last updated 9/2018.

In [2]:
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

#### Download location(s)

In [3]:
import os

datasets_path = os.path.join('/home/jovyan/work', 'Data Translation Challenge - Aditi Somani')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

#### Getting file(s)

In [4]:
import urllib.request

small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

#### Extracting file(s)

In [5]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

with zipfile.ZipFile(complete_dataset_path, "r") as z:
    z.extractall(datasets_path)

## Loading and parsing datasets
Now we are ready to read in each of the files and create an RDD consisting of parsed lines. 

Each line in the ratings dataset (ratings.csv) is formatted as: 
+ userId,movieId,rating,timestamp 

Each line in the movies (movies.csv) dataset is formatted as:
+ movieId,title,genres 

The format of these files is uniform and simple, so we can use Python split() to parse their lines once they are loaded into RDDs. Parsing the movies and ratings files yields two RDDs: 
+ For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating). We drop the timestamp because we do not need it for this recommender.
+ For each line in the movies dataset, we create a tuple of (MovieID, Title). We drop the genres because we do not use them for this recommender.

#### ratings.csv

In [6]:
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [7]:
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [8]:
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

#### movies.csv

In [9]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

## Collaborative Filtering

## Selecting ALS parameters using the small dataset

In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.

In [10]:
# source uses see=0L, which is the previous version of python (2.x)
# 0L should be written as 0 from now on

training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

#### Training phase

In [11]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


In [12]:
predictions.take(3)

[((372, 1084), 3.42419871162954),
 ((4, 1084), 3.866749726695713),
 ((402, 1084), 3.4099577968422152)]

In [13]:
rates_and_preds.take(3)

[((1, 457), (5.0, 4.381060760461434)),
 ((1, 1025), (5.0, 4.705295366590298)),
 ((1, 1089), (5.0, 4.979982471805129))]

In [14]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9113780946334407


## Using the complete dataset to build the final model

We need first to split it into training and test datasets.

In [15]:
# Load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ('There are {} recommendations in the complete dataset'.format(complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [16]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [17]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8318265262101795


## How to make recommendations
Although we aim at building an online movie recommender, now that we know how to have our recommender model ready, we can give it a try providing some movie recommendations. This will help us coding the recommending engine later on when building the web service, and will explain how to use the model in any other circumstances.

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular movie. These are less costly operations than training the model itself.

Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per movie.

In [18]:
complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print('There are {} movies in the complete dataset'.format(complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [19]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### Adding new user ratings
Now we need to rate some movies for the new user. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset. Check the dataset movies file for ID to Title assignment (so you know what movies are you actually rating).

## User 1: Aditi Somani

In [20]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
        (0,190863,3), # 3 Musketeers (2011)
        (0,147246,4), # Adventures of Mowgli (1973)
        (0,165723,2), # Ae Dil Hai Mushkil (2016)
        (0,147300,5), # Adventures Of Sherlock Holmes And Dr. Watson: The Twentieth Century Approaches (1986)
        (0,52938,5), # Adventures of Mark Twain, The (1986)
        (0,93923,1), # Agent Vinod (2012)
        (0,151459,3), # Airlift (2016)
        (0,175559,2), # Aiyyaa (2012)
        (0,158236,5), # Ali Baba and the Forty Thieves (1954)
        (0,142382,4), # Agneepath (2012)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 190863, 3), (0, 147246, 4), (0, 165723, 2), (0, 147300, 5), (0, 52938, 5), (0, 93923, 1), (0, 151459, 3), (0, 175559, 2), (0, 158236, 5), (0, 142382, 4)]


In [21]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [22]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 150.922 seconds


### Gettings Recommendations

In [23]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)


In [24]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((5.046652081520669, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((5.015150103764061, 'Once a Thief (1965)'), 1)),
 (83916, ((4.239779462274003, 'Blues in the Night (1941)'), 9))]

In [25]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

### User 1 : Aditi Somani, Scenario 1 
FULL dataset, filtering out movies with less than 25 ratings (meaning 25 or more ratings)

In [26]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 25 reviews):
('"Lonely Wife', 5.870108142013315, 43)
('Music for One Apartment and Six Drummers (2001)', 5.864985506876503, 31)
('"Things I Like', 5.771641506766851, 30)
('Cranford (2007)', 5.754259462579273, 35)
('The Garden of Sinners - Chapter 5: Paradox Paradigm (2008)', 5.721277947682164, 27)
("Won't You Be My Neighbor? (2018)", 5.70748342393759, 83)
('Slaying the Badger', 5.689480971449388, 25)
('Alone in the Wilderness (2004)', 5.658184439605291, 343)
('Mei and the Kittenbus (2002)', 5.637207192507796, 44)
('My Love (2006)', 5.635059975005253, 32)
('Hamlet (Gamlet) (1964)', 5.606352083856942, 37)
('"Civil War', 5.605288714379184, 431)
('"Crucified Lovers', 5.5999938654481465, 25)
('Life (2009)', 5.595653871758609, 166)
('Olive Kitteridge (2014)', 5.591718109650474, 211)


### User 1 : Aditi Somani, Scenario 2 

FULL dataset, filtering out movies with less than 100 ratings (meaning 100 or more ratings)

In [27]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 100 reviews):
('Alone in the Wilderness (2004)', 5.658184439605291, 343)
('"Civil War', 5.605288714379184, 431)
('Life (2009)', 5.595653871758609, 166)
('Olive Kitteridge (2014)', 5.591718109650474, 211)
('"Decalogue', 5.579240000970557, 547)
('"Best of Youth', 5.554732763576486, 548)
("Smiley's People (1982)", 5.553740799105896, 116)
('Blue Planet II (2017)', 5.552545805339198, 349)
('56 Up (2012)', 5.546527359315515, 193)
('Frozen Planet (2011)', 5.527124650868659, 402)
('Planet Earth (2006)', 5.526898391082175, 1384)
('Ikiru (1952)', 5.5156413672066416, 1551)
('Casablanca (1942)', 5.509163187436604, 31095)
('Promises (2001)', 5.501614878969943, 149)
('Harakiri (Seppuku) (1962)', 5.498013350397949, 679)


## User 2: Abhishek Somani

In [37]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
        (0,173175,4), # Badrinath Ki Dulhania (2017)
        (0,82906,5), # Baghban (2003)
        (0,139148,3), # Bajrangi Bhaijaan (2015)
        (0,89848,4), # Badmaash Company (2010)
        (0,146256,5), # Bade Miyan Chote Miyan (1998)
        (0,98956,5), # Barfi! (2012)
        (0,158408,4), # Ajab Prem Ki Ghazab Kahani (2009)
        (0,164600,2), # Akira (2016)
        (0,158697,1), # Aitraaz (2004)
        (0,184551,3), # Aiyaary (2018)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 173175, 4), (0, 82906, 5), (0, 139148, 3), (0, 89848, 4), (0, 146256, 5), (0, 98956, 5), (0, 158408, 4), (0, 164600, 2), (0, 158697, 1), (0, 184551, 3)]


In [38]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [39]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 197.159 seconds


### Gettings Recommendations

In [42]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [43]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(6216,
  ((4.342085124281496, 'Nowhere in Africa (Nirgendwo in Afrika) (2001)'),
   717)),
 (124320, ((4.178952815614785, 'Once a Thief (1965)'), 1)),
 (83916, ((3.8758578701660866, 'Blues in the Night (1941)'), 9))]

In [44]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

### User 2: Abhishek Somani, Scenario 1

FULL dataset, filtering out movies with less than 25 ratings (meaning 25 or more ratings)


In [45]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 25 reviews):
("India's Daughter (2015)", 6.33296756728441, 25)
('Out in the Dark (2012)', 6.23684469759667, 27)
('Always (2011)', 5.947683051392968, 28)
('"Lord of the Rings: The Return of the King', 5.639035487784753, 57378)
('Bridegroom (2013)', 5.637766977192641, 36)
('The Red Pill (2016)', 5.589525603543304, 46)
('"Lord of the Rings: The Fellowship of the Ring', 5.587376033278204, 61883)
('"Things I Like', 5.579380106463301, 30)
('"Lord of the Rings: The Two Towers', 5.5730645122969875, 56696)
('N.H 10 (2015)', 5.557907634446291, 35)
('Jimmy Carr: Live (2004)', 5.488937260082846, 28)
('Udta Punjab (2016)', 5.4353618781713, 37)
('Holding the Man (2015)', 5.429306257218158, 39)
('Queen (2014)', 5.419449759710034, 82)
('"Misérables in Concert', 5.407820872248806, 43)


### User 2: Abhishek Somani, Scenario 2

FULL dataset, filtering out movies with less than 100 ratings (meaning 100 or more ratings)

In [46]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(15, key=lambda x: -x[1])

print ('TOP 15 recommended movies (with more than 100 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP 15 recommended movies (with more than 100 reviews):
('"Lord of the Rings: The Return of the King', 5.639035487784753, 57378)
('"Lord of the Rings: The Fellowship of the Ring', 5.587376033278204, 61883)
('"Lord of the Rings: The Two Towers', 5.5730645122969875, 56696)
('Prayers for Bobby (2009)', 5.198114519581836, 102)
('The Blue Planet (2001)', 5.078366014461743, 421)
('Blue Planet II (2017)', 5.07280128557607, 349)
('Doctor Who: The Husbands of River Song (2015)', 5.068229288321994, 239)
('Frozen Planet (2011)', 5.058000585103578, 402)
('Gangs of Wasseypur (2012)', 5.05719203931173, 167)
('Doctor Who: The Time of the Doctor (2013)', 5.043155067985097, 394)
('Life (2009)', 5.031069417476978, 166)
('Doctor Who: Last Christmas (2014)', 5.021954923432752, 228)
('Harry Potter and the Deathly Hallows: Part 2 (2011)', 4.995154400492718, 13262)
('"Swades: We', 4.985371442155568, 163)
('Voices from the List (2004)', 4.983503029912285, 1800)


### Interpret the results  and Insights/Foresights on both scenarios

For both the users in both the scenarios work well and generate top 15 recommendations based on the given criteria. The below are few insights from the above scenarios: 

1. The collaborative recommendation engine successfully generated top movie recommendations for both scenarios and users. And most of the recommendations are very aptly generated.

2. Filtering out movies with less than 25 ratings resulted in a larger set of recommended movies compared to filtering out movies with less than 100 ratings. This suggests that a broader range of movies can be recommended when considering movies with a minimum of 25 ratings.

3. The recommendations tailored to each user's ratings were able to provide personalized suggestions based on their preferences, indicating that the recommendation engine is effective in understanding individual customer preferences.

4. The Pumpkinmeter score, derived from collaborative filtering, has the potential to enhance customer engagement by providing relevant movie recommendations. This can lead to increased customer satisfaction and likelihood of staying with Ripe Pumpkins' services over competitors.

Thank You!