In [1]:
#Movie Recommendation Engine, NLP and Exploratory Analysis.
#Part 1: ALS on MovieLens Ratings Dataset to acquire recommendations based on ratings
#Part 2: Aggregate movies comments into recommended movies from IMDb/Amazon Movie reviews dataset
#Part 3: Sentiment Analysis on comments and sorting recommendations based on sentiment score

In [2]:
#Part 1: ALS on MovieLens Ratings Dataset to acquire recommendations based on ratings

import os
from pyspark import SparkContext
sc = SparkContext()

filepath='gs://homework0bucket/datasets/ml-latest-small'

sample_ratings = os.path.join(filepath, 'ml-latest-small', 'ratings.csv')

#Converting CSV file with ratings into textfile and parsing into RDD
sample_ratings_raw = sc.textFile(sample_ratings)
sample_ratings_raw_header = sample_ratings_raw.take(1)[0]

sample_ratings_data = sample_ratings_raw.filter(lambda line: line!=sample_ratings_raw_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

#Repeating above process  for CSV file with movie names and identifiers

sample_movies = os.path.join(filepath, 'ml-latest-small', 'movies.csv')

sample_movies_raw_data = sc.textFile(sample_movies)
sample_movies_raw_data_header = sample_movies_raw_data.take(1)[0]

small_movies_data = sample_movies_raw_data.filter(lambda line: line!=sample_movies_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()


In [3]:
#Beginning setup for ALS Matrix Factorization Model

TrainingRDD, ValidateRDD, TestRDD = sample_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_predictRDD = ValidateRDD.map(lambda x: (x[0], x[1]))
prediction_testingRDD = TestRDD.map(lambda x: (x[0], x[1]))

#Defining ALS parameters to find optimal rank with best RMSE score

from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
reg_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    matrix_factorization_model = ALS.train(TrainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=reg_parameter)
    predicted_ratings = matrix_factorization_model.predictAll(validation_predictRDD).map(lambda r: ((r[0], r[1]), r[2]))
    ratings_and_predictions = ValidateRDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predicted_ratings)
    error = math.sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ("Using Rank %s results in an  RMSE of %s" % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ("Optimal Rank is %s" % best_rank)

predicted_ratings.take(3)

ratings_and_predictions.take(3)

#Testing  the ALS Model

matrix_factorization_model = ALS.train(TrainingRDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=reg_parameter)
predicted_ratings = matrix_factorization_model.predictAll(prediction_testingRDD).map(lambda r: ((r[0], r[1]), r[2]))
ratings_and_predictions = TestRDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predicted_ratings)
error = math.sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ("RMSE for testing data from smaple dataset is %s" % (error))

Using Rank 4 results in an  RMSE of 0.9080781059018078
Using Rank 8 results in an  RMSE of 0.9164629732715387
Using Rank 12 results in an  RMSE of 0.9176650314972615
Optimal Rank is 4
RMSE for testing data from smaple dataset is 0.9113780952339309


In [4]:
#Now that we have the ALS model ready, we can proceed by loading the complete dataset to generate recommendations for user

ratings_file = os.path.join(filepath, 'ml-latest', 'ratings.csv')
ratings_raw_data = sc.textFile(ratings_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]

# Parse
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
    
print ("There are %s recommendations in the complete dataset" % (ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [None]:
#Retraining model with the complete dataset

TrainingRDD, TestRDD = ratings_data.randomSplit([7, 3], seed=0)

complete_matrix_model = ALS.train(TrainingRDD, best_rank, seed=seed, iterations=iterations, lambda_=reg_parameter)

#Testing it again

prediction_testingRDD = TestRDD.map(lambda x: (x[0], x[1]))

predicted_ratings = complete_matrix_model.predictAll(prediction_testingRDD).map(lambda r: ((r[0], r[1]), r[2]))
ratings_and_predictions = TestRDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predicted_ratings)
error = math.sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ("RMSE for testing the model with complete data is %s" % (error))

#ALS Model is ready. Also note that using the compete data set with more reference points generates a better RMSE score

RMSE for testing the model with complete data is 0.8310675821411432


In [None]:
#We can now start defining the engine using  the ALS model as the core

movies_file = os.path.join(filepath, 'ml-latest', 'movies.csv')
movies_raw_data = sc.textFile(movies_file)
movies_raw_data_header = movies_raw_data.take(1)[0]

# Parse
movies_data = movies_raw_data.filter(lambda line: line!=movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

movies_titles = movies_data.map(lambda x: (int(x[0]),x[1]))
    
print ("Number of movies in dataset: %s" % (movies_titles.count()))

#Counting number of ratings for a movie. Only movies with a certain number of recommendations will be used.

def rating_counter_and_avg(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movieID_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movieID_average_ratings_RDD = movieID_ratings_RDD.map(rating_counter_and_avg)
ratings_count_RDD = movieID_average_ratings_RDD.map(lambda x: (x[0], x[1][0]))

#Engine is defined and ready to accept user inputs

Number of movies in dataset: 58098


In [None]:
#adding new user manually to test engine. Front end API will accept inputs in finished model

new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,9), # Star Wars (1977)
     (0,1,8), # Toy Story (1995)
     (0,16,7), # Casino (1995)
     (0,25,8), # Leaving Las Vegas (1995)
     (0,32,9), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,4), # Flintstones, The (1994)
     (0,379,3), # Timecop (1994)
     (0,296,7), # Pulp Fiction (1994)
     (0,858,10) , # Godfather, The (1972)
     (0,50,8) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ("New user ratings: %s" % new_user_ratings_RDD.take(10))

#adding new user ratings to dataframe
complete_data_with_new_ratings_RDD = ratings_data.union(new_user_ratings_RDD)

#new user ratings have been added

New user ratings: [(0, 260, 9), (0, 1, 8), (0, 16, 7), (0, 25, 8), (0, 32, 9), (0, 335, 4), (0, 379, 3), (0, 296, 7), (0, 858, 10), (0, 50, 8)]


In [None]:
#Generating recommendations for the new user

#training the ALS matrix model core to identify user preferences
from time import time

t0 = time()
new_ratings_matrix_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=reg_parameter)
tt = time() - t0

print ("New model trained in %s seconds" % round(tt,3))


new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)
new_user_unrated_movies_RDD = (movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
new_user_recommendations_RDD = new_ratings_matrix_model.predictAll(new_user_unrated_movies_RDD)

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(movies_titles).join(ratings_count_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

#filtering out movies with less than 25 ratings. This cell can also be used to filter recommendations based on genre and dates.

top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

#End of part 1


New model trained in 252.227 seconds
TOP recommended movies (with more than 25 reviews):
('Connections (1978)', 9.589850272368153, 49)
('Music for One Apartment and Six Drummers (2001)', 9.113976248583668, 31)
('"Lonely Wife', 9.044589041529733, 43)
('Strangers in Good Company (1990)', 9.032467453870453, 26)
('"Human Condition III', 8.941111300670922, 91)
('Harakiri (Seppuku) (1962)', 8.940563136081561, 679)
('"In the blue sea', 8.924261294747879, 37)
('"Dylan Moran: Yeah', 8.902586629245722, 81)
('"Hollow Crown', 8.863678371058903, 36)
('Planet Earth (2006)', 8.853181667310956, 1384)
("Won't You Be My Neighbor? (2018)", 8.840681254090555, 83)
('Blue Planet II (2017)', 8.838191985348487, 349)
('Ikiru (1952)', 8.833728045119933, 1551)
('Patton Oswalt: Werewolves and Lollipops (2007)', 8.832686105760956, 56)
('Planet Earth II (2016)', 8.830913001011101, 853)
('Duck Amuck (1953)', 8.83054122822869, 226)
('"Personal Journey with Martin Scorsese Through American Movies', 8.825504727300359, 

In [None]:
#predicting user ratings for given user ID

#my_movie = sc.parallelize([(0, 500)]) 
#individual_movie_rating_RDD = new_ratings_matrix_model.predictAll(new_user_unrated_movies_RDD)
#individual_movie_rating_RDD.take(1)