# MLSD Assignment 2
# Exercise 2 : Collaborative Filtering

#### Work done by:

Alexandra de Carvalho

Nuno Pedrosa

In [1]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import MatrixFactorizationModel
import pyspark.sql.functions as F
from math import sqrt
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row

Initializing spark

In [None]:
conf = SparkConf()
sc = SparkContext(conf=conf)

In [3]:
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName("MovieLens CF") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


Reading MovieLens dataset. Movies_df is used to ralationate the movieID with it' name. Ratings_df is has the score given by a user to a certain movie, it has the key information of the project

In [4]:
Movies_df = spark.read.csv("ml-latest-small/movies.csv",header=True)
Ratings_df = spark.read.csv("ml-latest-small/ratings.csv",header=True)


In [5]:
Movies_df.printSchema()
Movies_df.show()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Dra

In [6]:
Ratings_df.printSchema()
Ratings_df.show()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 ro

Changing data types and eliminating the timestamp column.

In [7]:
Ratings_df=Ratings_df.withColumn('rating', Ratings_df['rating'].cast("float"))
Ratings_df=Ratings_df.withColumn('userId', Ratings_df['userId'].cast("integer"))
Ratings_df=Ratings_df.withColumn('movieId', Ratings_df['movieId'].cast("integer"))
Ratings_df = Ratings_df.drop(*['timestamp'])

In [8]:
Ratings_df.printSchema()
Ratings_df.show()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



Checking the distribution of the dataset by user and by movie.

In [9]:
user_ratings = Ratings_df.groupBy("userId").count()
user_ratings.show()

+------+-----+
|userId|count|
+------+-----+
|   148|   48|
|   463|   33|
|   471|   28|
|   496|   29|
|   243|   36|
|   392|   25|
|   540|   42|
|    31|   50|
|   516|   26|
|    85|   34|
|   137|  141|
|   251|   23|
|   451|   34|
|   580|  436|
|    65|   34|
|   458|   59|
|    53|   20|
|   255|   44|
|   481|   31|
|   588|   56|
+------+-----+
only showing top 20 rows



In [10]:
movie_ratings = Ratings_df.groupBy("movieId").count()
movie_ratings.show()

+-------+-----+
|movieId|count|
+-------+-----+
|   1580|  165|
|   2366|   25|
|   3175|   75|
|   1088|   42|
|  32460|    4|
|  44022|   23|
|  96488|    4|
|   1238|    9|
|   1342|   11|
|   1591|   26|
|   1645|   51|
|   4519|    9|
|   2142|   10|
|    471|   40|
|   3997|   12|
|    833|    6|
|   3918|    9|
|   7982|    4|
|   1959|   15|
|  68135|   10|
+-------+-----+
only showing top 20 rows



To be able to validate the obtained results, we need to separate the dataset into a train and test dataset, so, 90% of the data will be used to train and 10% to test

In [11]:
# Create test and train set
(Train, Test) = Ratings_df.randomSplit([0.9, 0.1], seed = 0)

In [12]:
Train.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [13]:

Train_user_ratings = Train.groupBy("userId").count()
Train_users=Train_user_ratings.count()
Train_user_ratings=Train_user_ratings.select('userId').rdd.map(lambda x: x[0]).collect()
print(Train_users)

610


There are 610 different users in the Train dataset

In [14]:
Test_user_ratings = Test.groupBy("userId").count()
Test_users=Test_user_ratings.count() 
Test_user_ratings=Test_user_ratings.select('userId').rdd.map(lambda x: x[0]).collect()
print(Test_users)

597


There are 597 different users in the Test dataset

In [15]:

Train_movie_ratings = Train.groupBy("movieID").count()
Train_movie=Train_movie_ratings.count()
Train_movie_ratings=Train_movie_ratings.select('movieID').rdd.map(lambda x: x[0]).collect()
print(Train_movie)

9368


There are 9368 different movies in the Train dataset

In [16]:
Test_movie_ratings = Test.groupBy("movieID").count()
Test_movie=Test_movie_ratings.count()
Test_movie_ratings=Test_movie_ratings.select('movieID').rdd.map(lambda x: x[0]).collect()
print(Test_movie)

3646


There are 3646 different movies in the Test dataset

It's possible that the train test split obtained has unique users and/or unique movies.

This is problematic when comes the moment to compare the real results with predicted scores, so we need to check that with the NotUnique function.

In [17]:
# We need to check if the Test df doesn't have new users or new movies that aren't in the Train set

def NotUnique(TrainList,TestList):
    
    for item in TestList:
        if item not in TrainList:
            print('This Train Test Split is not perfect (a part of the test Dataset will have to be ignored)')
            return
    print('This Train Test Split is perfect')

In [18]:
# Check for users
NotUnique(Train_user_ratings,Test_user_ratings)
# Check for movies
NotUnique(Train_movie_ratings,Test_movie_ratings)

This Train Test Split is perfect
This Train Test Split is not perfect (a part of the test Dataset will have to be ignored)


Analysing the output of the function, we conclude that the test dataset doesn't have unique users, however, it has unique movies. These movie ratings will have to be ignored.

## Constructing User Item Matrix

In this section we will construct the user item matrix in two different ways, one based on movies, to get the similaritys between movies, and one based on users, to predict new scores.

For this we need two costum made functions: transformRating and RatingJunction.

The transform rating function creates a rdd entry for each rating, were, in the index of a specific movie/user, puts the desired rating, and the rest of the rating list will have Nones.

In [19]:
def transformRating(Id_1,rating,Id_2,items):
    rating_list = [rating if ele == Id_1 else None for ele in items]
    return ([Id_2]+[rating_list])

The RatingJunction function combines all the ratings of a specific movie/user in a unique RDD entry.

In [20]:
def RatingJunction(a,b):

    
    n=0
    for ind in b:   #Here I didn't run the whole list but only up to the index of b, this works because b is always an RDD with only one entry. Helps reduce computing time
        if ind != None:
            break
        n=n+1

    c=a
    c[n]=b[n]

    return c

### RDD Based on Users

Create a ordered list of movies (it'ś the order of the ratings in the user based RDD that will be made).

In [21]:
# This List has all of the movies in the dataset, in the order that will appear in the Train_user_RDD
items_movies = Train.select('movieId').rdd.map(lambda data:data.movieId).collect()
items_movies = list(dict.fromkeys(items_movies))
item_movies_len = len(items_movies)
print(item_movies_len)

9368


Getting the user based RDD

In [22]:


Train_user_RDD =Train.rdd.map(lambda data:(data.movieId,data.rating,data.userId))

Train_user_RDD=Train_user_RDD.map((lambda data:transformRating(data[0],data[1],data[2],items_movies)))

Train_user_RDD=Train_user_RDD.map(lambda item: (item[0],item[1]))

Train_user_RDD.take(10)

[(1,
  [4.0,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   Non

In [23]:
Train_user_RDD=Train_user_RDD.reduceByKey(lambda data_1,data_2:RatingJunction(data_1,data_2))

Train_user_RDD.take(10)


                                                                                

[(1,
  [4.0,
   4.0,
   4.0,
   5.0,
   5.0,
   3.0,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   3.0,
   5.0,
   4.0,
   5.0,
   3.0,
   3.0,
   5.0,
   4.0,
   4.0,
   4.0,
   3.0,
   4.0,
   5.0,
   4.0,
   3.0,
   4.0,
   4.0,
   5.0,
   4.0,
   4.0,
   4.0,
   5.0,
   3.0,
   5.0,
   4.0,
   3.0,
   3.0,
   4.0,
   5.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   3.0,
   5.0,
   5.0,
   4.0,
   5.0,
   4.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   4.0,
   2.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   5.0,
   3.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   4.0,
   3.0,
   3.0,
   3.0,
   3.0,
   4.0,
   4.0,
   5.0,
   4.0,
   5.0,
   3.0,
   5.0,
   5.0,
   4.0,
   5.0,
   3.0,
   3.0,
   5.0,
   4.0,
   5.0,
   4.0,
   4.0,
   5.0,
   5.0,
   4.0,
   4.0,
   5.0,
   5.0,
   4.0,
   5.0,
   4.0,
   4.0,
   5.0,
   5.0,
   5.0,
   3.0,
   4.0,
   4.0,
   4.0,
   5.0,
   5.0,
   5.0,
   5.0,
   

df_user_to_show shows us the item user RDD in a understandable way.

In [24]:
df_user_to_show=Train_user_RDD.toDF()

df_user_to_show.show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[4.0, 4.0, 4.0, 5...|
|  2|[null, null, null...|
|  3|[null, null, null...|
|  4|[null, null, null...|
|  5|[4.0, null, null,...|
|  6|[null, 5.0, null,...|
|  7|[4.5, null, null,...|
|  8|[null, null, null...|
|  9|[null, null, null...|
| 10|[null, null, null...|
| 11|[null, null, null...|
| 12|[null, null, null...|
| 13|[null, null, null...|
| 14|[null, null, null...|
| 15|[2.5, null, null,...|
| 16|[null, null, null...|
| 17|[4.5, null, null,...|
| 18|[3.5, null, 4.0, ...|
| 19|[4.0, 3.0, null, ...|
| 20|[null, null, null...|
+---+--------------------+
only showing top 20 rows



### RDD Based on Items

Create a ordered list of users (it'ś the order of the ratings in the movie based RDD that will be made).

In [25]:
# This List has all of the users in the dataset, in the order that will appear in the Train_movie_RDD
items_users = Train.select('userId').rdd.map(lambda data:data.userId).collect()
items_users = list(dict.fromkeys(items_users))
item_users_len = len(items_users)
print(item_users_len)

610


Getting the movie based RDD

In [26]:
Train_movie_RDD =Train.rdd.map(lambda data:(data.userId,data.rating,data.movieId))

Train_movie_RDD=Train_movie_RDD.map((lambda data:transformRating(data[0],data[1],data[2],items_users)))

Train_movie_RDD=Train_movie_RDD.map(lambda item: (item[0],item[1]))

Train_movie_RDD.take(10)

[(1,
  [4.0,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   Non

In [27]:
Train_movie_RDD=Train_movie_RDD.reduceByKey(lambda data_1,data_2:RatingJunction(data_1,data_2))

Train_movie_RDD.take(10)

                                                                                

[(1,
  [4.0,
   None,
   None,
   None,
   4.0,
   None,
   4.5,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   2.5,
   None,
   4.5,
   3.5,
   4.0,
   None,
   3.5,
   None,
   None,
   None,
   None,
   None,
   3.0,
   None,
   None,
   None,
   5.0,
   3.0,
   3.0,
   None,
   None,
   None,
   None,
   None,
   None,
   5.0,
   None,
   None,
   5.0,
   3.0,
   4.0,
   5.0,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
   3.0,
   None,
   None,
   5.0,
   None,
   None,
   None,
   None,
   None,
   5.0,
   4.0,
   None,
   None,
   None,
   None,
   None,
   None,
   5.0,
   None,
   4.5,
   None,
   None,
   0.5,
   None,
   None,
   None,
   None,
   None,
   2.5,
   None,
   None,
   None,
   None,
   None,
   None,
   3.0,
   3.0,
   4.0,
   None,
   3.0,
   None,
   None,
   5.0,
   None,
   4.5,
   None,
   None,
   None,
   None,
   4.0,
   None,
   None,
   None,
   4.0,
   None,
   None,
   None,
   None,
   None,
   None,
   None,
  

df_movie_to_show shows us the item user RDD in a understandable way.

In [28]:
df_movie=Train_movie_RDD.toDF()

df_movie.show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[4.0, null, null,...|
|  3|[4.0, null, null,...|
|  6|[4.0, null, null,...|
| 47|[5.0, null, null,...|
| 50|[5.0, null, null,...|
| 70|[3.0, null, null,...|
|101|[5.0, null, null,...|
|110|[4.0, null, null,...|
|151|[5.0, null, null,...|
|157|[5.0, null, null,...|
|163|[5.0, null, null,...|
|216|[5.0, null, null,...|
|223|[3.0, null, null,...|
|231|[5.0, null, null,...|
|235|[4.0, null, null,...|
|260|[5.0, null, null,...|
|296|[3.0, null, null,...|
|316|[3.0, null, null,...|
|333|[5.0, 4.0, null, ...|
|349|[4.0, null, null,...|
+---+--------------------+
only showing top 20 rows



## Get similaritys between Movies

The first step to obtain the similaritys between movies is to subtract the mean rating of a movie to each of it's ratings. Non existing ratings will be substituted with 0.

This is obtained with the Pearson_step1 function.

In [29]:
def Pearson_step1(item):
    ratings=item[1]
    ratings_Ex = list(filter(None,ratings))
    mean=sum(ratings_Ex)/len(ratings_Ex)
    n=0
    for rat in ratings:
        if rat != None:
            ratings[n]=ratings[n]-mean
        else:
            ratings[n]=0.0
        n=n+1
    return (item[0], ratings)

In [30]:
Similarity_RDD = Train_movie_RDD.map(lambda item: Pearson_step1(item))

Similarity_RDD.take(10)

[(1,
  [0.06578947368421062,
   0.0,
   0.0,
   0.0,
   0.06578947368421062,
   0.0,
   0.5657894736842106,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   -1.4342105263157894,
   0.0,
   0.5657894736842106,
   -0.4342105263157894,
   0.06578947368421062,
   0.0,
   -0.4342105263157894,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   -0.9342105263157894,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   -0.9342105263157894,
   -0.9342105263157894,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   0.0,
   0.0,
   1.0657894736842106,
   -0.9342105263157894,
   0.06578947368421062,
   1.0657894736842106,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   -0.9342105263157894,
   0.0,
   0.0,
   1.0657894736842106,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   0.06578947368421062,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   0.0,
   0.5657894736842106,
   0.0,
   0.0,
   -3.4342105263157894,
   0.0,
   0.

Turn the similarity RDD into a DataFrame.

In [31]:
df_movie_s=Similarity_RDD.toDF()

df_movie_s.show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0.06578947368421...|
|  3|[0.72340425531914...|
|  6|[0.09890109890109...|
| 47|[1.04278074866310...|
| 50|[0.78378378378378...|
| 70|[-0.5098039215686...|
|101|[1.19047619047619...|
|110|[-0.0539215686274...|
|151|[1.37837837837837...|
|157|[2.05, 0.0, 0.0, ...|
|163|[1.44067796610169...|
|216|[1.59756097560975...|
|223|[-0.8370786516853...|
|231|[1.86885245901639...|
|235|[0.40163934426229...|
|260|[0.80786026200873...|
|296|[-1.1845878136200...|
|316|[-0.3849206349206...|
|333|[1.13095238095238...|
|349|[0.40291262135922...|
+---+--------------------+
only showing top 20 rows



Here we do a crossJoin on the Dataframe, too facilitate the process to obtain similarities. Each row will be used to get different similarities between movies.

In [32]:

# Join DFs
df_join = df_movie_s.crossJoin(df_movie_s.select('_1', F.col("_2").alias("ratings_2")))

Data_list = ["movieId_1","ratings_1","movieId_2","ratings_2"]
 
df_join = df_join.toDF(*Data_list)

df_join.show()

+---------+--------------------+---------+--------------------+
|movieId_1|           ratings_1|movieId_2|           ratings_2|
+---------+--------------------+---------+--------------------+
|        1|[0.06578947368421...|        1|[0.06578947368421...|
|        1|[0.06578947368421...|        3|[0.72340425531914...|
|        1|[0.06578947368421...|        6|[0.09890109890109...|
|        1|[0.06578947368421...|       47|[1.04278074866310...|
|        1|[0.06578947368421...|       50|[0.78378378378378...|
|        1|[0.06578947368421...|       70|[-0.5098039215686...|
|        1|[0.06578947368421...|      101|[1.19047619047619...|
|        1|[0.06578947368421...|      110|[-0.0539215686274...|
|        1|[0.06578947368421...|      151|[1.37837837837837...|
|        1|[0.06578947368421...|      157|[2.05, 0.0, 0.0, ...|
|        1|[0.06578947368421...|      163|[1.44067796610169...|
|        1|[0.06578947368421...|      216|[1.59756097560975...|
|        1|[0.06578947368421...|      22

In [33]:
JoinedRDD= df_join.rdd.map(lambda x: ((x.movieId_1,x.movieId_2),x.ratings_1,x.ratings_2))

JoinedRDD.take(3)

                                                                                

[((1, 1),
  [0.06578947368421062,
   0.0,
   0.0,
   0.0,
   0.06578947368421062,
   0.0,
   0.5657894736842106,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   -1.4342105263157894,
   0.0,
   0.5657894736842106,
   -0.4342105263157894,
   0.06578947368421062,
   0.0,
   -0.4342105263157894,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   -0.9342105263157894,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   -0.9342105263157894,
   -0.9342105263157894,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   0.0,
   0.0,
   1.0657894736842106,
   -0.9342105263157894,
   0.06578947368421062,
   1.0657894736842106,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   -0.9342105263157894,
   0.0,
   0.0,
   1.0657894736842106,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   0.06578947368421062,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   1.0657894736842106,
   0.0,
   0.5657894736842106,
   0.0,
   0.0,
   -3.4342105263157894,
   0.0,


The funtion cosine_sim calculates the cosine similarity between lists of ratings. After applying it we obtain the similarities between movies.

In the function we sum the multiplication of the ratings of a movie "i" and "j" given by all users. Then divide this with a multiplication of square roots of the sum of squared ratings of the movie "i" and "j" by users.



In [34]:
def cosine_sim(item):
    rating_1=item[1]
    rating_2=item[2]

    #prod is the dividend of cosine similarity
    prod_list=[]
    for n in range(0,item_users_len):
        number=rating_1[n]*rating_2[n]
        prod_list.append(number)
    prod=sum(prod_list)

    #prod2 is the divider of cosine similarity
    square_1=sqrt(sum([ x**2 for x in rating_1 ]))
    square_2=sqrt(sum([ x**2 for x in rating_2 ]))

    prod2=square_1*square_2

    # if prod2 is 0, we can't use it as the divider, so we change it to a very small number
    if prod2==0:
        prod2=0.000000000000000001

    similarity=prod/prod2

    return (item[0],similarity)

In [35]:
similarityRDD=JoinedRDD.map(lambda data: cosine_sim(data))

similarityRDD.take(3)

                                                                                

[((1, 1), 1.0000000000000002),
 ((1, 3), 0.10202629136285349),
 ((1, 6), 0.05443336831241767)]

Now similarityRDD is a RDD with a tuple of the 2 movies and their similarity.

To reduce computation time and to eliminate low similarities, that, probably, would lead to bad predictions, we filter the similarities. We are going to use just movie similarities bigger than 0.3.

In [36]:
#Correlation values smaller than 0.30 are considered weak
similarity_filter_RDD=similarityRDD.filter(lambda x: x[1]>0.30)


Get movies : rating dictionary, it will be used to predict scores.

In [37]:
# This dictionary has all of the meaningfull similarities between movies, the similarities are duplicated (each similarity appears 2 times, but with the items displaied in a different order)

Similarity_Dict=similarity_filter_RDD.collectAsMap()


                                                                                

In [38]:
Similarity_Dict

{(1, 1): 1.0000000000000002,
 (1, 588): 0.31371537442274644,
 (3, 3): 1.0,
 (3, 333): 0.3082037212524729,
 (3, 1445): 0.30320122861796706,
 (3, 3450): 0.38779232076307274,
 (3, 60): 0.45413239235055713,
 (3, 210): 0.3355340596741378,
 (3, 267): 0.35922256556028104,
 (3, 419): 0.5062484850523314,
 (3, 432): 0.3237997117475652,
 (3, 468): 0.3391973996135204,
 (3, 569): 0.32508822979398017,
 (3, 575): 0.3810103428632064,
 (3, 700): 0.33209259372883104,
 (3, 1006): 0.30435641875970204,
 (3, 653): 0.3086286257552039,
 (3, 325): 0.33808673306389264,
 (3, 1461): 0.3362558108259908,
 (3, 2133): 0.3566323175873372,
 (3, 2418): 0.3872041148890987,
 (3, 3361): 0.40053537642405773,
 (3, 3698): 0.31140381421187596,
 (3, 663): 0.3150034658779544,
 (3, 1615): 0.31204227413696295,
 (3, 387): 0.3387398093328951,
 (3, 3388): 0.3194299174426109,
 (3, 2917): 0.30865227664274647,
 (3, 2889): 0.37180613126185214,
 (3, 2879): 0.30085878410422257,
 (3, 577): 0.3933218648888617,
 (3, 3412): 0.37378696263018163

## Get scores for non rated movies

The scores function predicts scores to non rated movies by the users. 

First, for every movie "i" non rated by a user, we calculate the 10 biggest similarites that this movie has, and, at the same time, that the user as rated.

Then, we divide the sum of these similarities times the rating given by the user with the sum of similarities. In this way, we get the predicted score for that movie.

Movies that were already rated by users will be represented in the predictions matrix as -1.

In [39]:
def scores(item):
    
    user=item[0]
    ratings_change=item[1]
    ratings_non_change=ratings_change[:]
    
    for n in range(0,item_movies_len):
        
        if ratings_change[n]==None:
            
            i=items_movies[n]            
            i_dict = {}
            
            # i_dict is going to be a list with top 10 similaritys with movie i, that the user saw
            for item, value in Similarity_Dict.items():
                if (item[0] == i) and ratings_non_change[items_movies.index(item[1])]!=None:
                    i_dict[item] = (value, ratings_non_change[items_movies.index(item[1])])                 
            i_dict = sorted(i_dict.items(), key=lambda x:-x[1][0])[:10]           
            
            # calculate score
            term1=0
            term2=0
            for item, value in i_dict:
                term1=term1+(value[0]*value[1])
                term2=term2+value[0]

            #if the divider is 0, we have to change it to a very small number to continue the calculations    
            if term2==0:
                term2=0.0000000000000000001
                
            score=term1/term2
            ratings_change[n]=score
            
        else:
            ratings_change[n]=-1
            
    return (user,ratings_change)

In [40]:

ScoresRDD=Train_user_RDD.map(lambda data: scores(data))

ScoresRDD.take(2)

                                                                                

[(1,
  [-1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
 

Now the ScoresRDD is a RDD with the predicted scores, scores represented by 0 are movies that, for the user in question, the algorithm didn't foud similarities.

#### Now that we have the predicted scores for all the users, we can choose the users we want to check the predictions.

In this case, we will check the predictions for 10 users, we can choose the users we want to check by changing the userId values present in the User_Check list.

In [49]:
#Change here the users you want to check

User_Check = [1,2,3,4,5,6,7,8,9,10]

In [50]:
# Create RDD with just the users we want to check

User_Scores_RDD=ScoresRDD.filter(lambda x: x[0] in User_Check)

User_Scores_RDD.take(2)

                                                                                

[(1,
  [-1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
 

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

In [51]:
User_Scores = User_Scores_RDD.take(len(User_Check))

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

In [52]:
User_Scores

[(1,
  [-1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
   -1,
 

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

Movies_Code_Dict is going to be a dictionary that correlates the code of a movie with it's name

In [53]:
# Create Dictionary Movie Code:Movie Name

Movies_df=Movies_df.drop(*['genres'])

Movies_CodeRDD = Movies_df.rdd.map(tuple)

Movies_Code_Dict=Movies_CodeRDD.collectAsMap()

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

In [54]:
Movies_Code_Dict

{'1': 'Toy Story (1995)',
 '2': 'Jumanji (1995)',
 '3': 'Grumpier Old Men (1995)',
 '4': 'Waiting to Exhale (1995)',
 '5': 'Father of the Bride Part II (1995)',
 '6': 'Heat (1995)',
 '7': 'Sabrina (1995)',
 '8': 'Tom and Huck (1995)',
 '9': 'Sudden Death (1995)',
 '10': 'GoldenEye (1995)',
 '11': 'American President, The (1995)',
 '12': 'Dracula: Dead and Loving It (1995)',
 '13': 'Balto (1995)',
 '14': 'Nixon (1995)',
 '15': 'Cutthroat Island (1995)',
 '16': 'Casino (1995)',
 '17': 'Sense and Sensibility (1995)',
 '18': 'Four Rooms (1995)',
 '19': 'Ace Ventura: When Nature Calls (1995)',
 '20': 'Money Train (1995)',
 '21': 'Get Shorty (1995)',
 '22': 'Copycat (1995)',
 '23': 'Assassins (1995)',
 '24': 'Powder (1995)',
 '25': 'Leaving Las Vegas (1995)',
 '26': 'Othello (1995)',
 '27': 'Now and Then (1995)',
 '28': 'Persuasion (1995)',
 '29': 'City of Lost Children, The (Cité des enfants perdus, La) (1995)',
 '30': 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)',
 '31': 'Dangerou

Now we get the results for the chosen users:

In [70]:
for item in User_Scores:
    user=item[0]
    print('Some movies with highest recomendation to user: ', user)
    ratings=item[1]
    n=0
    recomended=[]
    for rate in ratings:
        if rate>=4.5:  #Change here the threshold of score you want to analyse
            recomended.append(Movies_Code_Dict[str(items_movies[n])])
        n=n+1
    m=0
    for movie in recomended:
        if m>20:
            print('More ', len(recomended)-20, ' movies.' )
            break
        print(movie, end = ' | ')   
        m=m+1
    print()
    print()

Some movies with highest recomendation to user:  1
Wolf of Wall Street, The (2013) | Interstellar (2014) | Whiplash (2014) | Dangerous Minds (1995) | Courage Under Fire (1996) | Operation Dumbo Drop (1995) | Wallace & Gromit: The Best of Aardman Animation (1996) | Doors, The (1991) | On Golden Pond (1981) | Deer Hunter, The (1978) | Patton (1970) | Field of Dreams (1989) | Lady and the Tramp (1955) | Saturn 3 (1980) | Fast Times at Ridgemont High (1982) | The Lair of the White Worm (1988) | Snow Dogs (2002) | Green Card (1990) | Death Race 2000 (1975) | Mighty Aphrodite (1995) | Muriel's Wedding (1994) | More  1061  movies.


Some movies with highest recomendation to user:  2
Flight of the Navigator (1986) | Troll 2 (1990) | Before Sunrise (1995) | Ocean's Eleven (2001) | Man on Fire (2004) | National Treasure (2004) | Phantom of the Opera, The (2004) | Girl with a Pearl Earring (2003) | Hotel Rwanda (2004) | Quantum of Solace (2008) | He's Just Not That Into You (2009) | Kung Fu Panda

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

## Validate Results

Now we need to validate our results, for this we use the test set, and compare the real values given by the users with the predicted values.

First we transform the test Dataset into a RDD.

In [58]:

Test_RDD =Test.rdd.map(lambda data:(data.userId, data.movieId, data.rating,))

Test_RDD.take(2)

                                                                                

[(1, 362, 5.0), (1, 527, 5.0)]

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

To validate the results we will just use the users we checked in the last section, so we can filter the Test_RDD with that in mind.

In [59]:
Test_RDD=Test_RDD.filter(lambda x: x[0] in User_Check)



To be able to compare the values, we need to collect the obtained results. row_scores is going to be a list with the predicted scores and user_ordered is going to be a list of users by the order encountered in the predicted scores list.

In [60]:
user_ordered = [item[0] for item in User_Scores]

The function PredictedScore finds the predicted score of a test rating in the row_scores list, if any score is found it returns Nones

In [61]:
def PredictedScore(item):
    
    if item[0] in Train_user_ratings and item[1] in Train_movie_ratings:
        user_index=user_ordered.index(item[0])
        
        predicted=User_Scores[user_index][1][items_movies.index(item[1])]                       

        if predicted==0:
            # In this case the algorithm wasn't able to discover a score, becase the similaritys between this movie and other movies are low.
            return(None, None, None, None)
    
        return (item[0], item[1], item[2], predicted)
    
    # This rating ins't possible to get because the Train dataset dind't had the movie related to it 
    else:
        return(None, None, None, None)
    

In [62]:
Test_RDD=Test_RDD.map(lambda data: PredictedScore(data))

Test_RDD.take(4)

                                                                                

[(None, None, None, None),
 (None, None, None, None),
 (None, None, None, None),
 (None, None, None, None)]

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

Eliminating rows with Nones (the ones corresponding to non predicted ratings)

In [63]:
Test_RDD = Test_RDD.filter( lambda x: x[0]!=None)

Test_RDD.take(4)

                                                                                

[(1, 1009, 3.0, 5.0),
 (1, 1029, 5.0, 5.0),
 (1, 1080, 5.0, 5.0),
 (1, 1208, 4.0, 5.0)]

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

Now we we turn the Test_RDD into a DataFrame and get the following results:

In [64]:
Test_predict = Test_RDD.toDF()

Test_predict=Test_predict.toDF(*['userId','movieId','rating','prediction'])

Test_predict.show()

                                                                                

+------+-------+------+------------------+
|userId|movieId|rating|        prediction|
+------+-------+------+------------------+
|     1|   1009|   3.0|               5.0|
|     1|   1029|   5.0|               5.0|
|     1|   1080|   5.0|               5.0|
|     1|   1208|   4.0|               5.0|
|     1|   1291|   5.0|               5.0|
|     1|   2012|   4.0| 4.082240035914576|
|     1|   2048|   5.0|               5.0|
|     1|   2094|   5.0|               5.0|
|     1|   2406|   4.0|               5.0|
|     1|   2648|   4.0|               5.0|
|     1|   3034|   5.0|               5.0|
|     4|    190|   2.0|               4.0|
|     4|   1250|   5.0| 4.319585789296528|
|     4|   1885|   3.0|3.6303145256930645|
|     4|   2019|   2.0|               4.0|
|     4|   2186|   5.0| 4.739947127994415|
|     4|   4033|   4.0|          3.390625|
|     4|   4144|   3.0|3.5971843056019686|
|     6|      6|   4.0|               3.0|
|     6|     36|   5.0|               4.0|
+------+---

[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

To analyse if the results are similar to the real ratings, we use the RMSE algorithm (Root-mean-square error). It shows how the predicted rating, in mean, is different to the real rating.

The RMSE algorithm is calculated in the following way: sum of the predicted rating minus the real rating squared, then divide it with the sum of the predicted ratings, finally square root the result.

In [65]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 

RMSE = evaluator.evaluate(Test_predict)
print(RMSE)


1.1702033707556876


[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

The obtained result, 1.17, is relatively good. It means that a predicted rating is, in mean, separeted from the real value by a score of 1.17. 

Although not perfect, is enough to obtain movies that the user will probably like.

Now we will use the Test_RDD to get the precision at top 10

To calculate it, we take a threshold, in this case we chosed 3.5, that separates movies into to categories, the ones to recomend and the ones to not recomend. Then, we check if the real rating given by the user leads to the same result has the predicted rating (recomend or don't recomend), in the 10 movies with the biggest predicted rating.

In [72]:
Top10_list=[]
Top10_user_list=[]
Top10_len_list=[]
for userID in User_Check:

    userID_RDD = Test_RDD.filter(lambda x: x[0]== userID)
    userID_RDD = userID_RDD.filter(lambda x: x[3]>= 3.5)
    userID_RDD = userID_RDD.map(lambda x: [x[2], x[3]])
    userID_list = userID_RDD.collect()

    if len(userID_list)>=1:
        userID_list = sorted(userID_list, key=lambda x: x[1], reverse=True)
        userID_list = userID_list[:10]
        quantity=0
        pred=0
        for x in userID_list:
            if x[0]>= 3.5: # defined threshold
                pred = pred+1
            quantity=quantity+1
        Top10_list.append(pred/quantity)
        Top10_user_list.append((userID,pred/quantity))
        Top10_len_list.append((userID,len(userID_list)))


print('Length of the predicted data:')
print(Top10_len_list)

print()

print('Top 10 algorithm by user:')
print(Top10_user_list)

print()

print('Top 10 algorithm mean:')
Top10_mean=sum(Top10_list)/len(Top10_list)

print(Top10_mean)

                                                                                

Leght of the predicted data:
[(1, 10), (4, 6), (6, 8), (7, 3), (10, 6)]

Top 10 algorithm by user:
[(1, 0.9), (4, 0.3333333333333333), (6, 0.625), (7, 0.6666666666666666), (10, 0.8333333333333334)]

Top 10 algorithm mean:
0.6716666666666666


[Stage 83:>                 (0 + 1) / 1][Stage 85:>                 (0 + 1) / 1]

The best results we got were the ones of user 1, were 90% of the predictions were good predictions. The other users have lower percentage of good predictions.

In the analysed users, the only user that has at least 10 ratings possible to compare is user 1, so the results obtained for the other users aren't a "real" top 10. This leads to some problems in the analysis of the other users, because some of the movies will have predicted ratings very close to the defined threshold.

The Top 10 mean obtained is 0.67, it isn't bad having the above in mind.