# Second Exercise: Cosine Similarity for movie comparison

In this exercise you have to implement in a python notebook using the spark framework:

1. The distributed (map/reduce) algorithm of slide "3.7" (in notebook "8-Item-to-Items-globalfiltering-recommenders-py3-sshow.ipynb")
for computing the cosine similarity of a set of products with negative and positive ratings, using as input information an RDD (or spark dataframe that is also distributed) with ratings with this format:

     (userID,movieID,rating)

2. The computation of the Cosine Similarity (with the previous algorithm) of all the pairs of movies from the different files you have with this exercise:
  filtered50movies.csv filtered100movies.csv  filtered150movies.csv   filtered200movies.csv

Each file contains ratings for a different set of movies, but the ones in a smaller file
are always a subset of a file with bigger size. We provide files with different size
in case you have some memory issues in your computer, so use the biggest file you are able to use, although during "testing" of your code you can of course use the smallest file, or even any smaller subset of the file filtered50movies.csv.

3. Show on the screen the information for the "top 10" most similar pairs, but using the
name of the movies you can find in the file movies.

All the steps should be implemented always with map/reduce operations with spark RDDs/dataframes. Except the last step, when you have to find the name of the movies in the top-ten recommendations.

Present your notebook with plenty of comments in all your functions.

NOTE: The ratings for movies come from a dataset obtained from the smallest dataset from:
https://grouplens.org/datasets/movielens/
But the ratings have been re-scaled from the range [0,5] to the range [-3,2.5]

In [1]:
# Libraries
import pyspark
import os
import math
import sys
from pyspark.sql import SparkSession

In [2]:
# Spark Session
spark = SparkSession.builder.appName('MovieRecommender').getOrCreate()
spark

In [3]:
# Movies Information
moviesDF = spark.read.csv('inputs/movies.csv',header=True)

# Cast
moviesDF = moviesDF.withColumn("movieId",moviesDF.movieId.cast('int'))
moviesDF.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [4]:
# User Qualifications
userMoviesDF = spark.read.csv('inputs/filtered50movies.csv',header=True)

# Cast
userMoviesDF = userMoviesDF.withColumn("UserID",userMoviesDF.UserID.cast('int'))
userMoviesDF = userMoviesDF.withColumn("MovieID",userMoviesDF.MovieID.cast('int'))
userMoviesDF = userMoviesDF.withColumn("Rating",userMoviesDF.Rating.cast('int'))

# Sort By MoviesID
userMoviesDF = userMoviesDF.orderBy('MovieID')

userMoviesDF.show(5)

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     1|      1|     1|
|     5|      1|     1|
|     7|      1|     2|
|    15|      1|     0|
|    17|      1|     2|
+------+-------+------+
only showing top 5 rows



In [5]:
# Cartesian product
userMoviesRDD = userMoviesDF.rdd.cartesian(userMoviesDF.rdd)

# Filter the same user and different movie
userMoviesRDD = userMoviesRDD.filter(lambda x: x[0][0] == x[1][0] and x[0][1]!=x[1][1])

In [6]:
userMoviesRDD.take(5)

[(Row(UserID=1, MovieID=1, Rating=1), Row(UserID=1, MovieID=3, Rating=1)),
 (Row(UserID=19, MovieID=1, Rating=1), Row(UserID=19, MovieID=3, Rating=0)),
 (Row(UserID=32, MovieID=1, Rating=0), Row(UserID=32, MovieID=3, Rating=0)),
 (Row(UserID=43, MovieID=1, Rating=2), Row(UserID=43, MovieID=3, Rating=2)),
 (Row(UserID=44, MovieID=1, Rating=0), Row(UserID=44, MovieID=3, Rating=0))]

In [7]:
userMoviesRDD = userMoviesRDD.map(lambda i:((i[0][1],i[1][1]), (i[0][2]*i[1][2],i[0][2]**2,i[1][2]**2)))

In [10]:
userMoviesRDD.take(10)

[((1, 3), (1, 1, 1)),
 ((1, 3), (0, 1, 0)),
 ((1, 3), (0, 0, 0)),
 ((1, 3), (4, 4, 4)),
 ((1, 3), (0, 0, 0)),
 ((1, 3), (1, 1, 1)),
 ((1, 3), (0, 0, 1)),
 ((1, 3), (0, 1, 0)),
 ((1, 3), (0, 4, 0)),
 ((1, 3), (4, 4, 4))]