# Second Exercise: Cosine Similarity for movie comparison

## Exercise statement

In this exercise you have to implement in a python notebook using the spark framework:

1. The distributed (map/reduce) algorithm of slide "3.7" (in notebook "8-Item-to-Items-globalfiltering-recommenders-py3-sshow.ipynb")
for computing the cosine similarity of a set of products with negative and positive ratings, using as input information an RDD (or spark dataframe that is also distributed) with ratings with this format:

     (userID,movieID,rating)

2. The computation of the Cosine Similarity (with the previous algorithm) of all the pairs of movies from the different files you have with this exercise:
  filtered50movies.csv filtered100movies.csv  filtered150movies.csv   filtered200movies.csv

Each file contains ratings for a different set of movies, but the ones in a smaller file
are always a subset of a file with bigger size. We provide files with different size
in case you have some memory issues in your computer, so use the biggest file you are able to use, although during "testing" of your code you can of course use the smallest file, or even any smaller subset of the file filtered50movies.csv.

3. Show on the screen the information for the "top 10" most similar pairs, but using the
name of the movies you can find in the file movies.

All the steps should be implemented always with map/reduce operations with spark RDDs/dataframes. Except the last step, when you have to find the name of the movies in the top-ten recommendations.

Present your notebook with plenty of comments in all your functions.

NOTE: The ratings for movies come from a dataset obtained from the smallest dataset from:
https://grouplens.org/datasets/movielens/
But the ratings have been re-scaled from the range [0,5] to the range [-3,2.5]

# Project initialization

## Dependencies

We make sure the dependencies that we will use throughout this exercise are downloaded and installed:

In [None]:
!pip install pyspark


## Spark context 
We initialize the Spark context:

In [None]:
from pyspark.sql import SparkSession


SPARK_ENDPOINT = "local[*]"
sparkSession = SparkSession.builder \
    .master(SPARK_ENDPOINT) \
    .getOrCreate()
sparkContext = sparkSession.sparkContext
sparkSession


## Data loading

We start by loading the CSV file that contains the information about each movie:

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


MOVIES_INPUT_PATH = "movies.csv"
moviesDataFrame = sparkSession.read.csv(MOVIES_INPUT_PATH, header = True) \
    .withColumn("MovieId", col("MovieId").cast(IntegerType())) \
    .withColumn("Title", col("Title").cast(StringType())) \
    .withColumn("Genres", col("Genres").cast(StringType()))
moviesDataFrame.show()
moviesRdd = moviesDataFrame.rdd.map(lambda x: (int(x[0]), str(x[1]), str(x[2])))

We then load the CSV file with the movie ratings: 

_(for alternative file sizes, check the `Alternative file sizes` folder inside the root of the delivery folder)_

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType, IntegerType


INPUT_PATH = "filtered50movies.csv"
dataFrame = sparkSession.read.csv(INPUT_PATH, header = True) \
    .withColumn("UserId", col("UserId").cast(IntegerType())) \
    .withColumn("MovieId", col("MovieId").cast(IntegerType())) \
    .withColumn("Rating", col("Rating").cast(FloatType()))
dataFrame.show()
rdd = dataFrame.rdd.map(lambda x: (int(x[0]), int(x[1]), float(x[2])))
print(rdd)


# Calculation of the cosine distance for all pairs

We start by generating every pair of user-product ratings in the following format:

$$  (u,p_1,r_1),(u,p_2,r_2)  $$

_(We have avoided carrying redundant information by discarding bidirectional repetitions of pairs)_

In [None]:
cartesianRdd = rdd.cartesian(rdd) \
    .map(lambda x: (
        (x[0][0], x[0][1], x[0][2]), 
        (x[1][0], x[1][1], x[1][2])
    )) \
    .filter(lambda x: x[0][0] == x[1][0]) \
    .filter(lambda x: x[0][1] < x[1][1])
cartesianDataFrame = sparkSession.createDataFrame(cartesianRdd, ["(u, p1, r1)", "(u, p2, r2)"])
cartesianDataFrame.show()


We follow by mapping every pair of user-product ratings with the same user (u) to the values they contribute to in the final cosine distance between p1 and p2:
$$  (u,p_1,r_1),(u,p_2,r_2) \rightarrow ((p_1,p_2),(r_1 r_2,r_1^2,r_2^2) ) $$

In [None]:
from math import pow


userProductRatingsRdd = cartesianRdd.map(lambda x: (
    (x[0][1], x[1][1]), 
    (x[0][2] * x[1][2], pow(x[0][2], 2), pow(x[1][2], 2))
))
userProductRatingsDataFrame = sparkSession.createDataFrame(userProductRatingsRdd, ["(p1, p2)", "(r1 * r2, r1^2, r2^2)"])
userProductRatingsDataFrame.show()

We then reduce all the previous key-value pairs, with the same key as:
$$ ((p_1,p_2),(pra_{1,2},ra_1^2,ra_2^2) ) + ((p_1,p_2),(prb_{1,2},rb_1^2,rb_2^2) ) 
   \rightarrow \\  ((p_1,p_2),( pra_{1,2}+prb_{1,2}, ra_1^2+rb_1^2,
   ra_2^2+rb_2^2) ) $$

In [None]:
step2Rdd = userProductRatingsRdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))
step2DataFrame = sparkSession.createDataFrame(step2Rdd, ["(p1, p2)", "(pra1,2 + prb1,2, ra1^2 + rb1^2, ra2^2 + rb2^2)"])
step2DataFrame.show()

We end up computing the cosine distance combining the reduced values in a final map:
$$ ((p_1,p_2),(\sum_u r_1 r_2,\sum_u r_1^2,\sum_u r_2^2) ) \rightarrow 
\frac{\sum_u r_1 r_2}{\sqrt{\sum_u r_1^2} \sqrt{\sum_u r_2^2}}  $$ 

In [None]:
from math import sqrt


cosineRdd = step2Rdd.map(lambda x: (
    x[0][0], 
    x[0][1], 
    x[1][0] / ( sqrt(x[1][1]) * sqrt(x[1][2]) )
))
cosineDataFrame = sparkSession.createDataFrame(cosineRdd, ["Movie1Id", "Movie2Id", "CosineDistance"])
cosineDataFrame.show()

# Results 

We get the movie titles matching every movie ID to ease the visualization of the pairs of movies, as requested in the activity statement. 

We then sort the pairs of movies by their cosine distance and take the first 10 with the highest cosine distance, hence displaying the top 10 most similar pairs of movies, as requested in the activity statement:

In [None]:
movies1DataFrame = sparkSession.createDataFrame(moviesRdd, ["MovieId1", "Title1", "Genres1"])
movies2DataFrame = sparkSession.createDataFrame(moviesRdd, ["MovieId2", "Title2", "Genres2"])
cosineWithTitlesRdd = cosineDataFrame \
    .join(movies1DataFrame, cosineDataFrame.Movie1Id == movies1DataFrame.MovieId1, "inner") \
    .join(movies2DataFrame, cosineDataFrame.Movie2Id == movies2DataFrame.MovieId2, "inner") \
    .rdd \
    .map(lambda x: (x[0], x[4], x[1], x[7], x[2])) \
    .sortBy(lambda x: -x[4])
cosineWithTitlesDataFrame = sparkSession.createDataFrame(cosineWithTitlesRdd, ["MovieId1", "MovieTitle1", "MovieId2", "MovieTitle2", "CosineDistance"])
cosineWithTitlesDataFrame.show(100)