# Finding movie recommendations to a given movie through Cosine Similarity using Spark RDD

This project processed 1M MovieLens records downloaded from <a href = 'https://grouplens.org/datasets/movielens/'>groulens.org</a> using Spark RDD. The code below can be executed either on the Jupyter Notebook powered by an [Amazon EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html), or converted into a Python script and run from the master node of the cluster.

In [1]:
from pyspark import SparkContext, SparkConf
from math import sqrt

In [2]:
# Instantiate SparkConf() object

# Run the code below if executing locally
conf = SparkConf().setMaster('local').setAppName('SimilarMovie')
# conf = SparkConf().setAppName('SimilarMovie').set("spark.serializer", "org.apache.spark.serializer.PickleSerializer")

# Run the code below if executing on AWS EMR cluster
#conf = SparkConf()

In [3]:
# Instantiate sc object
sc = SparkContext(conf = conf)

**About the dataset "ratings.dat"**

All ratings are contained in the file "./data/ml-1m/ratings.dat" and are in the
following format:

`UserID::MovieID::Rating::Timestamp`

- **UserIDs** range between 1 and 6040 
- **MovieIDs** range between 1 and 3952
- **Ratings** are made on a 5-star scale (whole-star ratings only)
- **Timestamp** is represented in seconds since the epoch as returned by time(2)
- Each user has at least 20 ratings

In [4]:
# Preview the first 5 rows in ratings.dat
!head -n 5 ./data/ml-1m/ratings.dat

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291


In [5]:
# Read RDD from ratings.dat

# Run the code below if executing locally
RDD_movies = sc.textFile('./data/ml-1m/ratings.dat')

# Run the code below if executing on AWS EMR cluster
#RDD_movies = sc.textFile('s3n://maxineproject/ml-1m/ratings.dat')

In [6]:
# Show the first 5 records in RDD_movies
# UserID::MovieID::Rating::Timestamp
RDD_movies.collect()[:5]

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

Convert each record to the following format:

`(UserID, (MovieID, Rating))`

In [7]:
RDD_ratings = RDD_movies.map(lambda x: ( int(x.split('::')[0]), (int(x.split('::')[1]), float(x.split('::')[2])) ))

In [8]:
RDD_ratings.collect()[:5]

[(1, (1193, 5.0)),
 (1, (661, 3.0)),
 (1, (914, 3.0)),
 (1, (3408, 4.0)),
 (1, (2355, 5.0))]

##### Self-join RDD_ratings

_**Tip**_: Call `partitionBy()` method to break RDD into smaller chunks before running large operations on RDD such as `join()`, `reduceByKey()`, `groupByKey()`

In [9]:
RDD_ratings = RDD_ratings.partitionBy(100)

In [10]:
# Join the same RDD by keys
RDD_ratings_joined = RDD_ratings.join(RDD_ratings)

In [11]:
RDD_ratings_joined.collect()[:5]

[(100, ((648, 2.0), (648, 2.0))),
 (100, ((648, 2.0), (800, 5.0))),
 (100, ((648, 2.0), (3948, 3.0))),
 (100, ((648, 2.0), (1408, 3.0))),
 (100, ((648, 2.0), (1196, 4.0)))]

In [12]:
# Remove duplicate pairs
RDD_ratings_filtered = RDD_ratings_joined.filter(lambda x: x[1][0][0] < x[1][1][0])

In [13]:
RDD_ratings_filtered.collect()[:5]

[(100, ((648, 2.0), (800, 5.0))),
 (100, ((648, 2.0), (3948, 3.0))),
 (100, ((648, 2.0), (1408, 3.0))),
 (100, ((648, 2.0), (1196, 4.0))),
 (100, ((648, 2.0), (1197, 4.0)))]

Convert each record to the following format:

`((movieId1, movieId2), (rating1, rating2))`

In [14]:
RDD_ratings_paired = RDD_ratings_filtered.map(lambda x: ( (x[1][0][0], x[1][1][0]), (x[1][0][1], x[1][1][1]) ))

In [15]:
RDD_ratings_paired.collect()[:5]

[((648, 800), (2.0, 5.0)),
 ((648, 3948), (2.0, 3.0)),
 ((648, 1408), (2.0, 3.0)),
 ((648, 1196), (2.0, 4.0)),
 ((648, 1197), (2.0, 4.0))]

##### Group by keys

Group the records with same keys into the following format:

( (movieId1, movieId2), [(rating1, rating2)$_1 $, (rating1, rating2)$_2 $, ... (rating1, rating2)$_i $] )

In [16]:
RDD_ratings_paired = RDD_ratings_paired.partitionBy(100)

In [17]:
# group by key (movieId1, movieId2)
RDD_ratings_grouped = RDD_ratings_paired.groupByKey().mapValues(list)

In [19]:
RDD_ratings_grouped.collect()[0]

((648, 800),
 [(3.0, 5.0),
  (2.0, 5.0),
  (3.0, 5.0),
  (4.0, 5.0),
  (2.0, 5.0),
  (4.0, 5.0),
  (2.0, 2.0),
  (2.0, 5.0),
  (2.0, 5.0),
  (5.0, 4.0),
  (3.0, 3.0),
  (2.0, 1.0),
  (4.0, 5.0),
  (1.0, 4.0),
  (2.0, 3.0),
  (3.0, 5.0),
  (3.0, 2.0),
  (3.0, 4.0),
  (3.0, 5.0),
  (4.0, 3.0),
  (4.0, 5.0),
  (3.0, 5.0),
  (2.0, 5.0),
  (2.0, 5.0),
  (3.0, 4.0),
  (3.0, 5.0),
  (3.0, 3.0),
  (4.0, 5.0),
  (3.0, 4.0),
  (2.0, 3.0),
  (4.0, 4.0),
  (3.0, 4.0),
  (3.0, 4.0),
  (2.0, 5.0),
  (4.0, 4.0),
  (3.0, 3.0),
  (2.0, 3.0),
  (2.0, 3.0),
  (4.0, 5.0),
  (3.0, 5.0),
  (4.0, 5.0),
  (3.0, 5.0),
  (3.0, 4.0),
  (4.0, 5.0),
  (2.0, 4.0),
  (3.0, 5.0),
  (2.0, 3.0),
  (2.0, 4.0),
  (4.0, 4.0),
  (5.0, 3.0),
  (3.0, 3.0),
  (4.0, 5.0),
  (3.0, 4.0),
  (3.0, 5.0),
  (2.0, 3.0),
  (3.0, 4.0),
  (3.0, 3.0),
  (2.0, 5.0),
  (3.0, 5.0),
  (3.0, 5.0),
  (4.0, 5.0),
  (5.0, 5.0),
  (3.0, 4.0),
  (3.0, 4.0),
  (3.0, 3.0),
  (3.0, 5.0),
  (4.0, 5.0),
  (3.0, 3.0),
  (3.0, 5.0),
  (3.0, 5.0),
  (4.0,

##### Compute Cosine Similarity score

In [20]:
# Declare a function that computes Cosine Similarity score for each movie pair
def computeCosineSimilarity(ratingPairs):
    # Initialize variables to keep track of the sums and the number of pairs
    sumXX = 0
    sumYY = 0
    sumXY = 0
    numPairs = 0
    
    # Loop through each rating pair in the input list
    for ratingPair in ratingPairs:
        # Extract the individual ratings from the pair
        ratingX = ratingPair[0]
        ratingY = ratingPair[1]
        
        # Calculate the sum of squares for both ratings (X and Y)
        sumXX += ratingX * ratingX
        sumYY += ratingY * ratingY
        # Calculate the sum of the product of ratings (X and Y)
        sumXY += ratingX * ratingY
        # Increment the count of rating pairs processed
        numPairs += 1
       
    # Calculate the numerator and denominator for the cosine similarity
    numerator = sumXY
    denominator = sqrt(sumXX) * sqrt(sumYY)
    # Calculate the cosine similarity score
    score = numerator / denominator
    
    # Return the Cosine Similarity score along with the number of rating pairs
    # numPairs also indicates how many people watched the same pair of movies
    return (score, numPairs)

In [21]:
# Compute Cosine Similarity score for each movie pair
RDD_pairs_similarities = RDD_ratings_grouped.mapValues(lambda x: computeCosineSimilarity(x))

In [22]:
RDD_pairs_similarities.collect()[:5]

[((648, 800), (0.939054432011561, 274)),
 ((648, 3948), (0.9464824388307304, 414)),
 ((648, 1408), (0.9440372259647507, 565)),
 ((648, 1196), (0.9553752925210446, 1102)),
 ((648, 1197), (0.952789624869145, 908))]

##### Configure parameters

In [23]:
# Define the target movie ID for which we want to find similar movies
movieID = 50
# Set the threshold for similarity score 
scoreThreshold = 0.97
# Set the threshold for numPairs
coOccurenceThreshold = 50

**Filter** the RDD to get only the movie pairs for the target movie that **exceed the specified thresholds** for both Cosine Similarity score and number of rating pairs.

In [24]:
RDD_results = RDD_pairs_similarities.filter(lambda x: ((x[0][0] == movieID) or (x[0][1] == movieID))
                                                      and (x[1][0] > scoreThreshold)
                                                      and (x[1][1] > coOccurenceThreshold)
                                            )

##### Find the top 10 movie recommendations based on their Cosine Similarity score of ratings

In [25]:
# Sort the RDD to get the Top 10 movies with highest similarity scores
top_10 = RDD_results.sortBy(lambda x: x[1][0], ascending = False).collect()[:10]

In [26]:
top_10

[((50, 172), (0.9895522078385338, 345)),
 ((50, 181), (0.9857230861253026, 480)),
 ((50, 174), (0.981760098872619, 380)),
 ((50, 141), (0.9789385605497993, 68)),
 ((50, 178), (0.9776576120448436, 109)),
 ((50, 408), (0.9775948291054827, 92)),
 ((50, 498), (0.9764692222674887, 138)),
 ((50, 194), (0.9751512937740359, 204)),
 ((50, 169), (0.9748681355460885, 103)),
 ((50, 114), (0.9741816128302572, 58))]

##### Print the full information about the top 10 movie recommendations

**movies.dat** contained a mapping of movie ID(s) to their respective movie names.

In [27]:
# Preview the first 5 rows in movies.dat
!head -n 5 ./data/ml-1m/movies.dat

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy


In [28]:
# Run the code below if executing locally
movie_lines = open('./data/ml-1m/movies.dat', encoding='ascii', errors='ignore').readlines()

If you are running the code on Amazon EMR cluster with the MovieLens 1M dataset stored in S3 bucket, please start by executing `aws s3 cp [s3-file-path] ./` on the Master public DNS to move **movies.dat** from S3 to the current directory. Afterward, you can proceed to run the code below to read lines from **movies.dat**.

In [29]:
# Run the code below if executing on AWS EMR cluster
# movie_lines = open('movies.dat', encoding='ascii', errors='ignore').readlines()

In [30]:
movie_lines[:5]

["1::Toy Story (1995)::Animation|Children's|Comedy\n",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy\n",
 '3::Grumpier Old Men (1995)::Comedy|Romance\n',
 '4::Waiting to Exhale (1995)::Comedy|Drama\n',
 '5::Father of the Bride Part II (1995)::Comedy\n']

In [31]:
# Create a dictionary with movieID as key and [movieName, category] as value
movies = {}
for line in movie_lines:
    movies[int(line.split('::')[0])] = [line.split('::')[1], line.split('::')[2][:-1].replace('|', '/')]

In [32]:
movies

{1: ['Toy Story (1995)', "Animation/Children's/Comedy"],
 2: ['Jumanji (1995)', "Adventure/Children's/Fantasy"],
 3: ['Grumpier Old Men (1995)', 'Comedy/Romance'],
 4: ['Waiting to Exhale (1995)', 'Comedy/Drama'],
 5: ['Father of the Bride Part II (1995)', 'Comedy'],
 6: ['Heat (1995)', 'Action/Crime/Thriller'],
 7: ['Sabrina (1995)', 'Comedy/Romance'],
 8: ['Tom and Huck (1995)', "Adventure/Children's"],
 9: ['Sudden Death (1995)', 'Action'],
 10: ['GoldenEye (1995)', 'Action/Adventure/Thriller'],
 11: ['American President, The (1995)', 'Comedy/Drama/Romance'],
 12: ['Dracula: Dead and Loving It (1995)', 'Comedy/Horror'],
 13: ['Balto (1995)', "Animation/Children's"],
 14: ['Nixon (1995)', 'Drama'],
 15: ['Cutthroat Island (1995)', 'Action/Adventure/Romance'],
 16: ['Casino (1995)', 'Drama/Thriller'],
 17: ['Sense and Sensibility (1995)', 'Drama/Romance'],
 18: ['Four Rooms (1995)', 'Thriller'],
 19: ['Ace Ventura: When Nature Calls (1995)', 'Comedy'],
 20: ['Money Train (1995)', 'Act

In [33]:
# Print movie recommendations in full details

print('Top 10 movie recommendations for {} ({}) with similar ratings:\n'.format(movies[movieID][0], movies[movieID][1]))
print('----------------------------------------------------------------------------------------------------------------')

for result in top_10:
    # Get the name and category of similar movies
    if result[0][0] == movieID:
        similar_movie_name = movies[result[0][1]][0]
        category = movies[result[0][1]][1]
    if result[0][1] == movieID:
        similar_movie_name = movies[result[0][0]][0]
        category = movies[result[0][0]][1]
    
    # Extract the similarity score and strength
    similarity_score = result[1][0]
    strength = result[1][1]
    
    # Print the information about each similar movie, including its name, similarity score, category, and strength
    print('{} viewers also watched:\n{}\nCategory: {}\nSimilarity Score: {}\n' \
          .format(strength, similar_movie_name, category, similarity_score))
    print('----------------------------------------------------------------------------------------------------------------')

Top 10 movie recommendations for Usual Suspects, The (1995) (Crime/Thriller) with similar ratings:

----------------------------------------------------------------------------------------------------------------
345 viewers also watched:
Johnny Mnemonic (1995)
Category: Action/Sci-Fi/Thriller
Similarity Score: 0.9895522078385338

----------------------------------------------------------------------------------------------------------------
480 viewers also watched:
Mighty Morphin Power Rangers: The Movie (1995)
Category: Action/Children's
Similarity Score: 0.9857230861253026

----------------------------------------------------------------------------------------------------------------
380 viewers also watched:
Jury Duty (1995)
Category: Comedy
Similarity Score: 0.981760098872619

----------------------------------------------------------------------------------------------------------------
68 viewers also watched:
Birdcage, The (1996)
Category: Comedy
Similarity Score: 0.978938560

To run the driver Python script version of the code above from the master node of the EMR cluster (i.e., Master public DNS), use the following command:

`spark-submit --executor-memory 1g [Spark-driver-script-name].py`

Please note that since the default executor memory (512MB) might not be sufficient for processing one million movie ratings, it is essential to specify the executor memory to be 1GB.