## IDS 561 PROJECT CODE
# MOVIE RECOMMENDATION SYSTEM

*   Nimisha Asati 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------



In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!tar xf spark-2.3.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import HiveContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [0]:
from pyspark.mllib.recommendation import ALS
import math
import pandas as pd
from time import time
from pyspark.mllib.recommendation import MatrixFactorizationModel

### Loading small movies and ratings dataset to learn the ALS (Alternating Least Squares) parameters

In [0]:
# Remove header in each file
ratings_raw_data_s = sc.textFile('./ratings_small.csv')
ratings_raw_data_s_h = ratings_raw_data_s.take(1)[0]                    

In [0]:
ratings_data_s = ratings_raw_data_s.filter(lambda line: line!=ratings_raw_data_s_h)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [0]:
# First few lines of Ratings RDD
ratings_data_s.take(5)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [0]:
movies_raw_data_s = sc.textFile('movies.csv')
ratings_raw_data_s_h = movies_raw_data_s.take(1)[0]

In [0]:
# Filter out header from movies data and view first few lines of Movies RDD
small_movies_data = movies_raw_data_s.filter(lambda line: line!=ratings_raw_data_s_h)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [0]:
# Split ratings data into Training, Validation and Testing RDDs (65%-20%-15%)
train_s, validate_s, test_s = ratings_data_s.randomSplit([6.5, 2, 1.5], seed=123)
validate_predict_s = validate_s.map(lambda x: (x[0], x[1]))
test_predict_s = test_s.map(lambda x: (x[0], x[1]))

### Train the model to determine the ALS parameters - collaborative filtering technique

In [0]:
seed = 123
iterations = 15
regularization = 0.025
ranks = [4, 6, 8]
errors = [0, 0, 0]
err = 0
tolerance = 0.01


min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model_small = ALS.train(train_s, rank, seed=seed, iterations=iterations,
                      lambda_=regularization)
    predictions = model_small.predictAll(validate_predict_s).map(lambda r: ((r[0], r[1]), r[2]))
    rates = validate_s.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9086635340748691
For rank 8 the RMSE is 0.9072065372411254
For rank 12 the RMSE is 0.9042878440506998
The best model was trained with rank 12


In [0]:
# We have the UserID, the MovieID, and the Rating, as we have in our ratings dataset. 
# In this case: 3rd element is predictions 

predictions.take(3)

[((474, 1084), 3.9205706402476435),
 ((74, 1084), 4.191654659558981),
 ((294, 1084), 2.9152407085144922)]

In [0]:
# join these with our validation data (the one that includes ratings) 
rates.take(3)

[((1, 349), (4.0, 3.5931104079098533)),
 ((1, 441), (4.0, 4.577913131186497)),
 ((1, 553), (5.0, 4.446245131026217))]

###  Applied a squared difference and the we use the mean() action to get the MSE and apply sqrt

In [0]:
# test the selected model
model_1 = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model_1.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates1 = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates1.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.9125507816616728


## Now to build our recommender model, we will use the complete dataset

In [0]:
# Load the complete dataset file
ratings_raw_full = sc.textFile('ratings.csv')
ratings_raw_full_header = ratings_raw_full.take(1)[0]

# Parse and process
ratings_full = ratings_raw_full.filter(lambda line: line!=ratings_raw_full_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print("There are %s recommendations in the complete dataset" % (ratings_full.count()))

There are 2904670 recommendations in the complete dataset


### Train the recommender model

In [0]:
train_full, test_full = ratings_full.randomSplit([7, 3], seed=1)

model_full = ALS.train(train_full, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [0]:
# Test the recommender model
test_for_predict_RDD = test_full.map(lambda x: (x[0], x[1]))

predictions = model_full.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates2 = test_full.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates2.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.841243074184287


In [0]:
#load the movies complete file for later use
movies_full = sc.textFile('movies.csv')

movies_full_header = movies_full.take(1)[0]

# Parse
movies_data = movies_full.filter(lambda line: line!=movies_full_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

movies_full_title = movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("There are %s movies in the complete dataset" % (movies_full_title.count()))


There are 58098 movies in the complete dataset


### Give recommendations of movies with a certain minimum number of ratings

In [0]:
# count the number of ratings per movie
def get_counts_avg(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_ratings = (ratings_full.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_ratings.map(get_counts_avg)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### Adding new user ratings to recommend top 25 movies for the new user

In [0]:
# adding random new user ID
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,27,6), # Now and Then (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,8), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,362,4), # Jungle Book, The (1994)
     (0,379,5), # Timecop (1994)
     (0,419,9), # Beverly Hillbillies, The (1993)
     (0,853,10) , # Dingo (1972)
     (0,50,7) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)

print ('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [0]:
# we add new user rating to the data we will use to train our recommender model
data_with_new_ratings = complete_ratings_data.union(new_user_ratings_RDD)

In [0]:
# train the ALS model using all the parameters we used for training small dataset
t0 = time()
new_ratings_model = ALS.train(data_with_new_ratings, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))

New model trained in 27.352 seconds


## Generate top recommendations

In [0]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) 
# get just movie IDs
# keep just those not on the ID list
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommend = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [0]:
# Transform new_user_recommend into pairs of the form (Movie ID, Predicted Rating)
new_user_recommend_rating = new_user_recommend.map(lambda x: (x.product, x.rating))
new_user_recommend_rating_title_and_count = \
    new_user_recommend_rating.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommend_rating_title_and_count.take(3)

# print 25 movies with the highest predicted ratings. 
# Join them with the movies RDD to get the titles and ratings count in order to get movies with a minimum number of counts

[(150724, ((0.4713894177211366, 'Army Dog (2016)'), 1)),
 (131572, ((5.679704662552246, '"Isoroku Yamamoto'), 1)),
 (2744, ((6.381294336573095, 'Otello (1986)'), 21))]

In [0]:
# Flatten to get (Title, Rating, Ratings Count)
new_user_recommend_rating_title_and_count = \
    new_user_recommend_rating_title_and_count.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [0]:
# get highest rated recommendations for the new user, filtering out movies with less than 25 ratings
top_movies = new_user_recommend_rating_title_and_count.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Ikiru (1952)', 8.629515306061084, 163)
('Blue Planet II (2017)', 8.629401811333164, 36)
('"Great Day in Harlem', 8.576463585007765, 41)
('"Godfather', 8.575356301382742, 6393)
('Human Planet (2011)', 8.570723423867193, 32)
('"Lord of the Rings: The Fellowship of the Ring', 8.56647006545224, 6463)
('"Short Film About Killing', 8.562351201720922, 54)
('Wuthering Heights (1939)', 8.558613181198462, 29)
('"Lord of the Rings: The Return of the King', 8.55149734365465, 6058)
('Band of Brothers (2001)', 8.511805044926056, 104)
('"Godfather: Part II', 8.479937480598675, 4125)
('Heart of a Dog (Sobachye serdtse) (1988)', 8.466780498213808, 73)
('"Lord of the Rings: The Two Towers', 8.43858671691385, 5956)
("Schindler's List (1993)", 8.42692224937953, 7649)
('Planet Earth (2006)', 8.41412108150751, 134)
('"Decalogue', 8.396111633948545, 59)
('Harakiri (Seppuku) (1962)', 8.388904305929062, 60)
('The Godfather Trilogy: 1972-1990 (1992)', 8.3492

### Generate individual ratings

In [0]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating.take(1)

[Rating(user=0, product=143464, rating=1.8997978509362683)]