# Movie Recommendation using PySpark

### Importing the libraries

In [1]:
import os
import math
from pyspark.mllib.recommendation import ALS

### Preparing RDD for movie ratings

In [2]:
ratings_file_path = os.path.join(os.getcwd(), 'ml-latest-small', 'ratings.csv')
ratings_raw_data = sc.textFile(ratings_file_path)
ratings_header = ratings_raw_data.take(1)[0]
ratings_data_RDD = ratings_raw_data.filter(lambda line: line != ratings_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()

### Preparing RDD for movies list

In [3]:
movies_file_path = os.path.join(os.getcwd(), 'ml-latest-small', 'movies.csv')
movies_raw_data = sc.textFile(movies_file_path)
movies_header = movies_raw_data.take(1)[0]
movies_data_RDD = movies_raw_data.filter(lambda line: line != movies_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0], tokens[1])).cache()

### Visualising ratings and movie RDD

In [4]:
print('\t'.join(ratings_header.split(',')[:-1]))
for i in ratings_data_RDD.take(5):
    print('{}\t{}\t{}'.format(i[0], i[1], i[2]))

userId	movieId	rating
1	1	4.0
1	3	4.0
1	6	4.0
1	47	5.0
1	50	5.0


In [5]:
print('\t'.join(movies_header.split(',')[:-1]))
for i in movies_data_RDD.take(5):
    print('{}\t{}'.format(i[0], i[1]))

movieId	title
1	Toy Story (1995)
2	Jumanji (1995)
3	Grumpier Old Men (1995)
4	Waiting to Exhale (1995)
5	Father of the Bride Part II (1995)


### Splitting ratings RDD into training, testing and validation set

In [6]:
training_RDD, validation_RDD, testing_RDD = ratings_data_RDD.randomSplit([0.6, 0.2, 0.2])
validation_predict_RDD = validation_RDD.map(lambda r: (r[0], r[1]))
testing_predict_RDD = testing_RDD.map(lambda r: (r[0], r[1]))

In [7]:
print('Size of training RDD: {}'.format(training_RDD.count()))
print('Size of validation RDD: {}'.format(validation_RDD.count()))
print('Size of testing RDD: {}'.format(testing_RDD.count()))

Size of training RDD: 60438
Size of validation RDD: 20198
Size of testing RDD: 20200


### Finding best rank for ALS

In [8]:
seed, iterations, lr = 5, 10, 0.1
errors, err, min_error = [0, 0, 0], 0, float('inf')
ranks, best_rank = [4, 8, 12], -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=lr)
    predictions = model.predictAll(validation_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    ratings_and_predictions = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
    errors[err] = error
    err += 1
    if error < min_error:
        best_rank, min_error = rank, error
    print('Rank: {}\tRMSE Error: {}'.format(rank, error))
print('-' * 25)
print('Best Rank: {}'.format(best_rank))

Rank: 4	RMSE Error: 0.9218659169501069
Rank: 8	RMSE Error: 0.9160156470726885
Rank: 12	RMSE Error: 0.9241499515046814
-------------------------
Best Rank: 8


### Viewing predictions results and comparison with actual ratings

In [10]:
print('userId\tmovieId\tPredicted Rating')
for i in predictions.take(5):
    print('{}\t{}\t{}'.format(i[0][0], i[0][1], round(i[1], 2)))

userId	movieId	Predicted Rating
140	1084	4.01
32	1084	4.51
309	1084	4.09
177	1084	3.79
474	1084	3.77


In [12]:
print('userId\tmovieId\tActual Rating\tPredicted Rating')
for i in ratings_and_predictions.take(5):
    print('{}\t{}\t{}\t\t{}'.format(i[0][0], i[0][1], i[1][0], round(i[1][1], 2)))

userId	movieId	Actual Rating	Predicted Rating
1	47	5.0		4.66
1	367	4.0		3.32
1	457	5.0		4.88
1	1625	5.0		4.32
1	2137	5.0		4.55


### Building the model

In [13]:
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lr)
predictions = model.predictAll(testing_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
ratings_and_predictions = testing_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean())
print('Error on test set: {}'.format(error))

Error on test set: 0.9099272124697338


## Making Recommendations

### Loading Data and restructuring

In [14]:
ratings_file_path = os.path.join(os.getcwd(), 'ml-latest-small', 'ratings.csv')
ratings_raw_data = sc.textFile(ratings_file_path)
ratings_header = ratings_raw_data.take(1)[0]
complete_ratings_data = ratings_raw_data.filter(lambda line: line != ratings_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()

movies_file_path = os.path.join(os.getcwd(), 'ml-latest-small', 'movies.csv')
movies_raw_data = sc.textFile(movies_file_path)
movies_header = movies_raw_data.take(1)[0]
complete_movies_data = movies_raw_data.filter(lambda line: line != movies_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), tokens[1], tokens[2])).cache()
complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

### Counting ratings for each movie and finding average rating

In [16]:
def get_counts_and_averages(id_and_rating_tuple):
    n = len(id_and_rating_tuple[1])
    return id_and_rating_tuple[0], (n, float(sum(x for x in id_and_rating_tuple[1])) / n)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [18]:
print('movieId\tNo of Reviews\tAvg Rating')
for i in movie_ID_with_avg_ratings_RDD.take(5):
    print('{}\t{}\t\t{}'.format(i[0], i[1][0], round(i[1][1], 2)))

movieId	No of Reviews	Avg Rating
6	102		3.95
50	204		4.24
70	55		3.51
110	237		4.03
216	49		3.33


### Adding new user to make recommendations

In [19]:
new_user_id = 0
new_user_ratings = [
    (0,260,4),
    (0,1,3),
    (0,16,3),
    (0,25,4),
    (0,32,4),
    (0,335,1),
    (0,379,1),
    (0,296,3),
    (0,858,5),
    (0,50,4)
]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)

### Merging new user data with existing data and training the model

In [20]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)
new_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, iterations=iterations, lambda_=lr)

In [21]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_id, x[0])))
new_user_recommendations_RDD = new_model.predictAll(new_user_unrated_movies_RDD)

In [22]:
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD = new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [23]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])
print('Top 25 Movies According To User Preferences:')
print('-' * 50)
for movie in top_movies:
    print(movie[0].replace('"', ''))

Top 25 Movies According To User Preferences:
--------------------------------------------------
Killing Fields
Producers
Godfather
Citizen Kane (1941)
Monty Python and the Holy Grail (1975)
North by Northwest (1959)
Philadelphia Story
Godfather: Part II
12 Angry Men (1957)
Big Lebowski
Bridge on the River Kwai
Chinatown (1974)
Fargo (1996)
Casablanca (1942)
Schindler's List (1993)
Great Escape
Lawrence of Arabia (1962)
Apocalypse Now (1979)
Strangers on a Train (1951)
Raging Bull (1980)
Boot
Wallace & Gromit: The Best of Aardman Animation (1996)
Cool Hand Luke (1967)
Hoop Dreams (1994)
Election (1999)
