<a href="https://colab.research.google.com/github/SAIGANESH02/Movie-Recommendation-System/blob/main/MovieRecomendationSystem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import findspark
findspark.init()

In [None]:
# Here we are creating a spark context
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Loading the raw ratings data from 100k dataset into spark as a RDD
small_ratings_file = r"C:\Users\Vamsi Reddy\SparkProj\MoviLenseRecomend\Datasets\ml-latest-small\ratings.csv"
small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

In [None]:
# Now we can parse the raw ratings data into a new RDD. of the form (UserID, MovieID, Rating)
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [None]:
# Printing to verify 
small_ratings_data.take(3)

[('1', '1', '4.0'), ('1', '3', '4.0'), ('1', '6', '4.0')]

In [None]:
# Loading the raw movies.csv data from 100k dataset into spark as a RDD
small_movies_file = r"C:\Users\Vamsi Reddy\SparkProj\MoviLenseRecomend\Datasets\ml-latest-small\movies.csv"

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

# Now we can parse the raw ratings data into a new RDD. of the form (MovieID, Title)
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

# Printing to verify 
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [None]:
# We split ratings dataset into train, validation, and test datasets.
# This is done because we use this to get the best ALS parameters 
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [None]:
# Using validation_rdd to know the best number of latent factors for the dataset we have
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.8973056100718643
For rank 8 the RMSE is 0.9143149069672253
For rank 12 the RMSE is 0.9141049207539428
The best model was trained with rank 4


In [None]:
# printing and verifying
validation_for_predict_RDD.take(2)

[('1', '6'), ('1', '47')]

In [None]:
# printing and verifying 
predictions.take(2)

[((249, 69069), 3.3365203516084856), ((68, 69069), 3.1620382534666387)]

In [None]:
# Load the complete dataset file
complete_ratings_file = r"C:\Users\Vamsi Reddy\SparkProj\MoviLenseRecomend\Datasets\ml-latest\ratings.csv"
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

In [None]:
# Parsing the data same way as 100k dataset
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header).map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
print("There are %s recommendations in the complete dataset" % (complete_ratings_data.count()))

There are 27753444 recommendations in the complete dataset


In [None]:
# splitting the complete data using training and testing dataset and Building the ASL model using the 
# previously found hyperparameters
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, 
                           iterations=iterations, lambda_=regularization_parameter)

In [None]:
# Now we test on our testing set.
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 0.8334782410156314


In [None]:
# load the movies complete file
complete_movies_file = r"C:\Users\Vamsi Reddy\SparkProj\MoviLenseRecomend\Datasets\ml-latest\movies.csv"
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
    
print("There are %s movies in the complete dataset" % (complete_movies_titles.count()))

There are 58098 movies in the complete dataset


In [None]:
# function to count the number of ratings per movie.
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [None]:
# Adding new user ratings
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_RDD.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [None]:
# Now we add them to the data we will use to train our recommender model
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

In [None]:
# training the ALS model using all the parameters we selected before (when using the small dataset).
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in %s seconds" % round(tt,3))

New model trained in 169.906 seconds


In [None]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [None]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(7020, ((3.0758903881085082, 'Proof (1991)'), 377)),
 (53352, ((2.11427667988852, 'Sheitan (2006)'), 59)),
 (162396, ((0.9736832918466494, 'Skiptrace (2016)'), 71))]

In [None]:
# Transform to the form (Title, Rating, Ratings Count)
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [None]:
# filtering out movies with less than 25 ratings.
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

# get the highest rated recommendations for the new user
print ('TOP recommended movies (with more than 25 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
('Elway To Marino (2013)', 4.03702785250745, 25)
('Rabbit of Seville (1950)', 4.002279522928873, 30)
('"Human Condition III', 3.99453726418848, 91)
('Harakiri (Seppuku) (1962)', 3.9061620070327905, 679)
('Wow! A Talking Fish! (1983)', 3.8586009958174685, 47)
('"Last Lions', 3.8585627928003063, 38)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 3.8577707893893347, 29484)
('Rebels of the Neon God (Qing shao nian nuo zha) (1992)', 3.852821730679487, 28)
("Jim Henson's The Storyteller (1989)", 3.8492614693875, 36)
('Cosmos', 3.8487806839748657, 157)
('"Great War', 3.845119718097372, 31)
('"Godfather', 3.840897941465556, 60904)
('Crooks in Clover (a.k.a. Monsieur Gangster) (Les tontons flingueurs) (1963)', 3.8365516701854787, 52)
('Death on the Staircase (Soupçons) (2004)', 3.828444877084493, 130)
('Music for One Apartment and Six Drummers (2001)', 3.8275481951426045, 31)
('"Century of the Self', 3.82631134

In [None]:
# Trying to predict the rating for a specific movie
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=116688, rating=0.8086061251091987)]