In [29]:
import findspark
findspark.init()

In [30]:
import numpy as np
import pandas as pd
import seaborn as sb
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, LongType, StringType
from pyspark.sql import functions as f
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import col, size, split, udf
from pyspark.mllib.recommendation import ALS
import os

In [31]:
# Creating spark session/
spark = SparkSession \
        .builder \
        .appName("Movie Recommendation") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

In [32]:
# Loading Datasets
movies_df = spark.read.load(r'C:\My Drive\UMBC\Code\BigDataProject\Project_Dataset\movies.csv', format = 'csv', header = True)
ratings_df = spark.read.load(r'C:\My Drive\UMBC\Code\BigDataProject\Project_Dataset\ratings.csv', format = 'csv', header = True)
tags_df = spark.read.load(r'C:\My Drive\UMBC\Code\BigDataProject\Project_Dataset\tags.csv', format = 'csv', header = True)
links_df = spark.read.load(r'C:\My Drive\UMBC\Code\BigDataProject\Project_Dataset\links.csv', format = 'csv', header = True)

In [33]:
# Looking at the schema of above dataframes
print("\n")
print("--" * 10, "\tMovies Schema\t", "--" * 10,)
movies_df.printSchema()

print("--" * 10, "\tRatings Schema\t", "--" * 10,)
ratings_df.printSchema()

print("--" * 10, "\tTags Schema\t", "--" * 10,)
tags_df.printSchema()

print("--" * 10, "\tLinks Schema\t", "--" * 10)
links_df.printSchema()



-------------------- 	Movies Schema	 --------------------
root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

-------------------- 	Ratings Schema	 --------------------
root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

-------------------- 	Tags Schema	 --------------------
root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)

-------------------- 	Links Schema	 --------------------
root
 |-- movieId: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: string (nullable = true)



In [77]:
# For now, I'm gonna concentrate only on movies and ratings dataset.
# rating's field values such as userId and movieId and rating is of STRING datatype which I'll convert into int

ratings_df = ratings_df.select(ratings_df["userId"].cast("int"),
                               ratings_df["movieId"].cast("int"),
                               ratings_df["rating"].cast("float"),
                               ratings_df["timestamp"].cast("int"))

# Similar for the tags dataset
tags_df = tags_df.select(tags_df["userId"].cast("int"),
                         tags_df["movieId"].cast("int"),
                         tags_df["timestamp"].cast("int"))

# Similar for links dataset
links_df = links_df.select(links_df["movieId"].cast("int"),
                           links_df["imdbId"].cast("int"),
                           links_df["tmdbId"].cast("int"))

# Printing out the new Schema
print("\n")
print("--" * 10, "\tNew Movies Schema\t", "--" * 10,)
movies_df.printSchema()

print("--" * 10, "\tNew Ratings Schema\t", "--" * 10,)
ratings_df.printSchema()

print("--" * 10, "\tNew Tags Schema\t", "--" * 10,)
tags_df.printSchema()

print("--" * 10, "\tNew Links Schema\t", "--" * 10)
links_df.printSchema()



-------------------- 	New Movies Schema	 --------------------
root
 |-- movieId: integer (nullable = true)

-------------------- 	New Ratings Schema	 --------------------
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: integer (nullable = true)

-------------------- 	New Tags Schema	 --------------------
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- timestamp: integer (nullable = true)

-------------------- 	New Links Schema	 --------------------
root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [35]:
# Checking if any values are missing:
from pyspark.sql.functions import isnan, when, count, col

print("\n")
print("--" * 10, "\tMissing Values in movies\t", "--" * 10,)
movies_df.select([count(when(isnan(c), c)).alias(c) for c in movies_df.columns]).show()

print("\n")
print("--" * 10, "\tMissing Values in Ratings\t", "--" * 10,)
ratings_df.select([count(when(isnan(c), c)).alias(c) for c in ratings_df.columns]).show()

print("\n")
print("--" * 10, "\tMissing Values in tags\t", "--" * 10,)
tags_df.select([count(when(isnan(c), c)).alias(c) for c in tags_df.columns]).show()

print("\n")
print("--" * 10, "\tMissing Values in Links\t", "--" * 10,)
links_df.select([count(when(isnan(c), c)).alias(c) for c in links_df.columns]).show()



-------------------- 	Missing Values in movies	 --------------------
+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+



-------------------- 	Missing Values in Ratings	 --------------------
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



-------------------- 	Missing Values in tags	 --------------------
+------+-------+---------+
|userId|movieId|timestamp|
+------+-------+---------+
|     0|      0|        0|
+------+-------+---------+



-------------------- 	Missing Values in Links	 --------------------
+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      0|     0|     0|
+-------+------+------+



In [36]:
# Our dataset have no missing values... Great!
# Let's do some exploratory analysis to ge better insights of our dataset

#print(ratings_df.count(), len(ratings_df.columns))
print("Number of users who rated movies: ", ratings_df.select('userId').distinct().count())
print("Number of movies in the dataset: ", movies_df.select('movieId').distinct().count())

Number of users who rated movies:  610
Number of movies in the dataset:  9742


In [37]:
# creating interface for spark SQL

movies_df.createOrReplaceTempView("Movies")
ratings_df.createOrReplaceTempView("Ratings")

numberOfMoviesRated = spark.sql("SELECT COUNT(DISTINCT(Movies.movieId)) AS movies \
                                 FROM Movies \
                                 LEFT JOIN Ratings \
                                 ON Movies.movieId = Ratings.movieId \
                                 WHERE Ratings.rating IS NOT NULL")
numberOfMoviesRated.show(truncate = False)

unrated_movies_count = spark.sql("SELECT COUNT(DISTINCT(Movies.movieId)) AS unrated_count \
                                                       FROM MOVIES \
                                                       LEFT JOIN Ratings \
                                                       ON Movies.movieId = Ratings.movieId \
                                                       WHERE Ratings.rating IS NULL")

unrated_movies_title = spark.sql("SELECT Movies.title AS unrated_movie_title \
                                                       FROM MOVIES \
                                                       LEFT JOIN Ratings \
                                                       ON Movies.movieId = Ratings.movieId \
                                                       WHERE Ratings.rating IS NULL")
unrated_movies_title.show(truncate = False)

rating_details = spark.sql("SELECT Movies.title, COUNT(Ratings.rating) AS numberOfRating \
                            FROM Movies \
                            LEFT JOIN Ratings \
                            ON Movies.movieId = Ratings.movieId \
                            GROUP BY Movies.title \
                            ORDER BY numberOfRating")
rating_details.toPandas()
print("Number of movies which is rated only one time: ", rating_details[rating_details["numberOfRating"] < 2].count())

+------+
|movies|
+------+
|9724  |
+------+

+--------------------------------------------+
|unrated_movie_title                         |
+--------------------------------------------+
|Innocents, The (1961)                       |
|Niagara (1953)                              |
|For All Mankind (1989)                      |
|Color of Paradise, The (Rang-e khoda) (1999)|
|I Know Where I'm Going! (1945)              |
|Chosen, The (1981)                          |
|Road Home, The (Wo de fu qin mu qin) (1999) |
|Scrooge (1970)                              |
|Proof (1991)                                |
|Parallax View, The (1974)                   |
|This Gun for Hire (1942)                    |
|Roaring Twenties, The (1939)                |
|Mutiny on the Bounty (1962)                 |
|In the Realms of the Unreal (2004)          |
|Twentieth Century (1934)                    |
|Call Northside 777 (1948)                   |
|Browning Version, The (1951)                |
|Chalet Girl (

In [58]:
sc = spark.sparkContext
rating_df_new = sc.textFile(r'C:\My Drive\UMBC\Code\BigDataProject\Project_Dataset\ratings.csv')
header = rating_df_new.take(1)[0]
rating_data = rating_df_new.filter(lambda line: line!=header).\
              map(lambda line: line.split(",")).\
              map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
rating_data.take(3)

# print("-" * 60)
# print('There are {} instances of rating'.format(rating_data.count()))

[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]

In [66]:
movie_df_new = sc.textFile(r'C:\My Drive\UMBC\Code\BigDataProject\Project_Dataset\movies.csv')
header = movie_df_new.take(1)[0]

movie_data = movie_df_new.filter(lambda line: line!=header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
    
movie_data.take(3)

# print("-" * 60)
# print('There are {} instances of Movies'.format(movie_data.count()))

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [50]:
train_RDD, valid_RDD, test_RDD = rating_data.randomSplit([0.6, 0.2, 0.2], seed = 42)
valid_RDD_forPrediction = valid_RDD.map(lambda x: (x[0], x[1]))
test_RDD_forPrediction = test_RDD.map(lambda x: (x[0], x[1]))

In [78]:
from pyspark.mllib.recommendation import ALS
import math

seed = 42
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
optimal_rank = -1
optimal_iteration = -1

for rank in ranks:
    model = ALS.train(train_RDD, rank, seed = seed, iterations = iterations,
                      lambda_ = regularization_parameter)
    predictions = model.predictAll(valid_RDD_forPrediction).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = valid_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank {} the RMSE is {}'.format(rank, error))
    
    if error < min_error:
        min_error = error
        print('Current value of min_error', min_error)
        best_rank = rank

print("-" * 50)
print('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.9058043327998001
Current value of min_error 0.9058043327998001
For rank 8 the RMSE is 0.9156655436588836
For rank 12 the RMSE is 0.9078411395548683
--------------------------------------------------
The best model was trained with rank 4


In [54]:
predictions.take(3)

[((368, 3272), 2.2103941239381957),
 ((603, 3272), 2.7094106274500698),
 ((414, 3272), 3.3172783499986758)]

In [55]:
rates_and_preds.take(3)

[((1, 367), (4.0, 3.8778425793950486)),
 ((1, 553), (5.0, 4.48872221960911)),
 ((1, 673), (3.0, 2.9687504298721707))]

In [56]:
model = ALS.train(train_RDD, best_rank, seed = seed, iterations = iterations,
                      lambda_ = regularization_parameter)
predictions = model.predictAll(test_RDD_forPrediction).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.9048780956510838


In [61]:
new_user_ID = 0

# (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print("New user ratings: %s" % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [62]:
new_rating_data_RDD = rating_data.union(new_user_ratings_RDD)

In [63]:
from time import time

start_time = time()
new_model = ALS.train(new_rating_data_RDD, best_rank, seed = seed, iterations = iterations, lambda_ = regularization_parameter)
stop_time = time()

print("Model training time: ", stop_time - start_time)

Model training time:  19.06317687034607


In [65]:
rated_movie_ids = map(lambda x: x[1], new_user_ratings)
unrated_movies_rdd = (movie_data.filter(lambda x: x[0] not in rated_movie_ids).map(lambda x: (new_user_ID, x[0])))

new_user_recommendations = new_model.predictAll(unrated_movies_rdd)

In [68]:
def countAndAvg(idRatingTuple):
    nratings = len(idRatingTuple[1])
    return idRatingTuple[0], (nratings, float(sum(x for x in idRatingTuple[1])) / nratings)

movie_titles = movie_data.map(lambda x: (int(x[0]), x[1]))
movie_ID_ratings_RDD = (rating_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_ratings_RDD_avg = movie_ID_ratings_RDD.map(countAndAvg)
movie_rating_counts_RDD = movie_ID_ratings_RDD_avg.map(lambda x: (x[0], x[1][0]))

In [69]:
recommendation_RDD = new_user_recommendations.map(lambda x: (x.product, x.rating))
recommendation_RDD_titleAndCount = recommendation_RDD.join(movie_titles).join(movie_rating_counts_RDD)
recommendation_RDD_titleAndCount.take(3)

[(69720, ((1.5170735890918814, 'Hood of Horror (2006)'), 1)),
 (3240, ((2.30472879362691, '"Big Tease'), 2)),
 (98160, ((1.304571892555307, 'Nature Calls (2012)'), 1))]

In [70]:
recommendation_RDD_titleAndCount = recommendation_RDD_titleAndCount.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [71]:
recommendation_RDD_titleAndCount.take(3)

[('Hood of Horror (2006)', 1.5170735890918814, 1),
 ('"Big Tease', 2.30472879362691, 2),
 ('Nature Calls (2012)', 1.304571892555307, 1)]

In [72]:
top_movies_to_recommend = recommendation_RDD_titleAndCount.filter(lambda r: r[2] > 25).takeOrdered(25, key = lambda x: -x[1])

In [73]:
print('Top Recommended movies: \n%s' % '\n'.join(map(str, top_movies_to_recommend)))

Top Recommended movies: 
('"Producers', 4.1968157790109775, 33)
('Chinatown (1974)', 4.195708392289013, 59)
('Citizen Kane (1941)', 4.178241501170903, 69)
('Brazil (1985)', 4.168895069399608, 59)
('Raging Bull (1980)', 4.155766137731009, 40)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 4.135269753625976, 97)
('Old Boy (2003)', 4.116536880319541, 39)
('"Godfather', 4.11623343163422, 192)
('Apocalypse Now (1979)', 4.111207228001691, 107)
('Seven Samurai (Shichinin no samurai) (1954)', 4.106498543422082, 48)
('Annie Hall (1977)', 4.105297111700647, 58)
('"Boot', 4.0916875545231175, 40)
('"Godfather: Part II', 4.086838530578542, 129)
('There Will Be Blood (2007)', 4.071615674061283, 28)
('Pulp Fiction (1994)', 4.055605924818113, 307)
('12 Angry Men (1957)', 4.041446411778798, 57)
('Blue Velvet (1986)', 4.036994733901926, 46)
('Birdman: Or (The Unexpected Virtue of Ignorance) (2014)', 4.030885044493526, 26)
('Rear Window (1954)', 4.015692872421883, 84)
('D