# Movie Recommendation

In this project our goal is to build a movie recommendation system using collaborative filtering. Collaborative filtering is a learning technique used to make predictions (filtering) about the interests of a user by collecting preferences of taste information from many other users (collaboration). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue than that of a randomly chosen person.

For this task, the [Movielens dataset](https://grouplens.org/datasets/movielens/) was used. It contains 20 million ratings and 465000 tag applications applied to 27000 movies by 138000 users.

The Apache Spark framework (and its Machine Learning library) was used to process the data.

## Spark setup

As some functionalities of the Spark SQL component will be used, we need to initialize a `SparkSession` object.

The `SparkContext` object (required for other basic functionalities) is automatically created by PySpark and it is defined in the `sc` variable.

In [1]:
from pyspark.sql import SparkSession

In [2]:
ss = SparkSession.builder.appName("Movie Recommendation").getOrCreate()

## Data load

The MovieLens dataset is composed of three text files:

- `users.dat`: contains information about the genre, age, occupation and zipcode of each user.
- `movies.dat`: contains information about the title, year and genres of each movie.
- `ratings.dat`: contains the ratings that users gave to the movies they watched.

They can be opened as plain text RDDs.

In [3]:
import os

In [4]:
usersRdd0 = sc.textFile(os.path.join("ml-1m", "users.dat"))
usersRdd0.take(5)

['1::F::1::10::48067',
 '2::M::56::16::70072',
 '3::M::25::15::55117',
 '4::M::45::7::02460',
 '5::M::25::20::55455']

In [5]:
moviesRdd0 = sc.textFile(os.path.join("ml-1m", "movies.dat"))
moviesRdd0.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

In [6]:
ratingsRdd0 = sc.textFile(os.path.join("ml-1m", "ratings.dat"))
ratingsRdd0.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

## Data preparation

The plain text RDDs must be converted to structured data so we can use SQL queries and other high level operations to manipulate them. This is done for each RDD separately as each dataset has its own schema.

In [7]:
from pyspark import StorageLevel
from pyspark.sql import Row
import datetime as dt

### Users

According to the MovieLens dataset's documentation, each user record is composed of 5 fields:

- userID: User's unique identification.
- gender: User's gender ("F" for females, "M" for males).
- age: User's age range encoded as a number (see below).
- occupation: User's occupation encoded as a number (see below).
- zipcode: User's zipcode.

We usually need to encode categorical features as numbers (or sets of binary features) in order to use them in predictive models. Here, the `age` and `occupation` field values are already encoded; however, in a preliminary analysis it might be useful to know their actual values for the sake of interpretation. So this default encoding is undone and the codes are translated to the actual values they represent. Later, when needed, we may encode them again using Spark's `StringIndexer` class.

Thus, in the following cells the plain textual data for each user is split, the field values are computed, and the result is put into a structured `DataFrame`.

In [8]:
agesDict = {
    "1": "Under 18",
    "18": "18-24",
    "25": "25-34",
    "35": "35-44",
    "45": "45-49",
    "50": "50-55",
    "56": "Over 56",
}

In [9]:
occupationsDict = {
    "0": "other or unspecified",
    "1": "academic/educator",
    "2": "artist",
    "3": "clerical/admin",
    "4": "college/grad student",
    "5": "customer service",
    "6": "doctor/health care",
    "7": "executive/managerial",
    "8": "farmer",
    "9": "homemaker",
    "10": "K-12 student",
    "11": "lawyer",
    "12": "programmer",
    "13": "retired",
    "14": "sales/marketing",
    "15": "scientist",
    "16": "self-employed",
    "17": "technician/engineer",
    "18": "tradesman/craftsman",
    "19": "unemployed",
    "20": "writer",
}

In [10]:
usersRdd1 = usersRdd0.map(lambda line: line.split("::")) \
                     .map(lambda t: Row(userID=int(t[0]),
                                        gender=t[1],
                                        age=agesDict[t[2]],
                                        occupation=occupationsDict[t[3]],
                                        zipcode=t[4]))
usersRdd1.take(5)

[Row(age='Under 18', gender='F', occupation='K-12 student', userID=1, zipcode='48067'),
 Row(age='Over 56', gender='M', occupation='self-employed', userID=2, zipcode='70072'),
 Row(age='25-34', gender='M', occupation='scientist', userID=3, zipcode='55117'),
 Row(age='45-49', gender='M', occupation='executive/managerial', userID=4, zipcode='02460'),
 Row(age='25-34', gender='M', occupation='writer', userID=5, zipcode='55455')]

In [11]:
usersRdd1.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[10] at RDD at PythonRDD.scala:48

In [12]:
usersDf1 = ss.createDataFrame(usersRdd1)
usersDf1.show(5, truncate=False)

+--------+------+--------------------+------+-------+
|age     |gender|occupation          |userID|zipcode|
+--------+------+--------------------+------+-------+
|Under 18|F     |K-12 student        |1     |48067  |
|Over 56 |M     |self-employed       |2     |70072  |
|25-34   |M     |scientist           |3     |55117  |
|45-49   |M     |executive/managerial|4     |02460  |
|25-34   |M     |writer              |5     |55455  |
+--------+------+--------------------+------+-------+
only showing top 5 rows



In [13]:
usersDf1.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[age: string, gender: string, occupation: string, userID: bigint, zipcode: string]

### Movies

At first, each movie record is composed of 3 fields:

- movieID: Movie's unique identification.
- title: Movie's title and year (see below).
- genres: List of all genres the movie fits to.

The `title` field includes the movie's year because there are movies with same name made (or remade) in different epochs, and the year information would be the only way to distinct them. However, as we have a `movieID` field, this is not really necessary. So we can extract the year from the title and make another field.

Thus, in the following cells the plain textual data for each movie is split, the field values are computed, and the result is put into a structured `DataFrame`.

In [14]:
moviesRdd1 = moviesRdd0.map(lambda line: line.split("::")) \
                       .map(lambda t: Row(movieID=int(t[0]),
                                          title=t[1][:-7],
                                          year=int(t[1][-5:-1]),
                                          genres=t[2].split("|")))
moviesRdd1.take(5)

[Row(genres=['Animation', "Children's", 'Comedy'], movieID=1, title='Toy Story', year=1995),
 Row(genres=['Adventure', "Children's", 'Fantasy'], movieID=2, title='Jumanji', year=1995),
 Row(genres=['Comedy', 'Romance'], movieID=3, title='Grumpier Old Men', year=1995),
 Row(genres=['Comedy', 'Drama'], movieID=4, title='Waiting to Exhale', year=1995),
 Row(genres=['Comedy'], movieID=5, title='Father of the Bride Part II', year=1995)]

In [15]:
moviesRdd1.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[21] at RDD at PythonRDD.scala:48

In [16]:
moviesDf1 = ss.createDataFrame(moviesRdd1)
moviesDf1.show(5, truncate=False)

+--------------------------------+-------+---------------------------+----+
|genres                          |movieID|title                      |year|
+--------------------------------+-------+---------------------------+----+
|[Animation, Children's, Comedy] |1      |Toy Story                  |1995|
|[Adventure, Children's, Fantasy]|2      |Jumanji                    |1995|
|[Comedy, Romance]               |3      |Grumpier Old Men           |1995|
|[Comedy, Drama]                 |4      |Waiting to Exhale          |1995|
|[Comedy]                        |5      |Father of the Bride Part II|1995|
+--------------------------------+-------+---------------------------+----+
only showing top 5 rows



In [17]:
moviesDf1.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[genres: array<string>, movieID: bigint, title: string, year: bigint]

### Ratings

Each rating data is composed of 4 fields:

- userID: ID of the user who gave the rating.
- movieID: ID of the movie which was rated.
- rating: A numerical value ranging from 1 (min) to 5 (max).
- timestamp: A number that encodes the date and time the rating was given at.

The `timestamp` can be easily converted to a `datetime` object, which is much easier to interpret.

Thus, in the following cells the plain textual data for each rating is split, the field values are computed, and the result is put into a structured DataFrame.

In [18]:
ratingsRdd1 = ratingsRdd0.map(lambda line: line.split("::")) \
                         .map(lambda t: Row(userID=int(t[0]),
                                            movieID=int(t[1]),
                                            rating=float(t[2]),
                                            timestamp=dt.datetime.fromtimestamp(int(t[3]))))
ratingsRdd1.take(5)

[Row(movieID=1193, rating=5.0, timestamp=datetime.datetime(2000, 12, 31, 20, 12, 40), userID=1),
 Row(movieID=661, rating=3.0, timestamp=datetime.datetime(2000, 12, 31, 20, 35, 9), userID=1),
 Row(movieID=914, rating=3.0, timestamp=datetime.datetime(2000, 12, 31, 20, 32, 48), userID=1),
 Row(movieID=3408, rating=4.0, timestamp=datetime.datetime(2000, 12, 31, 20, 4, 35), userID=1),
 Row(movieID=2355, rating=5.0, timestamp=datetime.datetime(2001, 1, 6, 21, 38, 11), userID=1)]

In [19]:
ratingsRdd1.persist(StorageLevel.MEMORY_AND_DISK)

PythonRDD[32] at RDD at PythonRDD.scala:48

In [20]:
ratingsDf1 = ss.createDataFrame(ratingsRdd1)
ratingsDf1.show(5, truncate=False)

+-------+------+---------------------+------+
|movieID|rating|timestamp            |userID|
+-------+------+---------------------+------+
|1193   |5.0   |2000-12-31 20:12:40.0|1     |
|661    |3.0   |2000-12-31 20:35:09.0|1     |
|914    |3.0   |2000-12-31 20:32:48.0|1     |
|3408   |4.0   |2000-12-31 20:04:35.0|1     |
|2355   |5.0   |2001-01-06 21:38:11.0|1     |
+-------+------+---------------------+------+
only showing top 5 rows



In [21]:
ratingsDf1.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[movieID: bigint, rating: double, timestamp: timestamp, userID: bigint]

## Exploratory data analysis

Here we'll just do some exploration on the structured datasets in order to better understand all the data we have.

In [22]:
usersDf1.createOrReplaceTempView("users")
moviesDf1.createOrReplaceTempView("movies")
ratingsDf1.createOrReplaceTempView("ratings")

### Users

In [23]:
# Get the gender distribution.
ss.sql("""SELECT gender, 
                 COUNT(gender) as frequency
          FROM users
          GROUP BY gender
          ORDER BY frequency DESC""").show()

+------+---------+
|gender|frequency|
+------+---------+
|     M|     4331|
|     F|     1709|
+------+---------+



In [24]:
# Get the age distribution.
ss.sql("""SELECT age,
                 COUNT(age) as frequency
          FROM users
          GROUP BY age
          ORDER BY frequency DESC""").show()

+--------+---------+
|     age|frequency|
+--------+---------+
|   25-34|     2096|
|   35-44|     1193|
|   18-24|     1103|
|   45-49|      550|
|   50-55|      496|
| Over 56|      380|
|Under 18|      222|
+--------+---------+



In [25]:
# Get the occupation distribution.
ss.sql("""SELECT occupation,
                 COUNT(occupation) as frequency
          FROM users
          GROUP BY occupation
          ORDER BY frequency DESC""").show(21)

+--------------------+---------+
|          occupation|frequency|
+--------------------+---------+
|college/grad student|      759|
|other or unspecified|      711|
|executive/managerial|      679|
|   academic/educator|      528|
| technician/engineer|      502|
|          programmer|      388|
|     sales/marketing|      302|
|              writer|      281|
|              artist|      267|
|       self-employed|      241|
|  doctor/health care|      236|
|        K-12 student|      195|
|      clerical/admin|      173|
|           scientist|      144|
|             retired|      142|
|              lawyer|      129|
|    customer service|      112|
|           homemaker|       92|
|          unemployed|       72|
| tradesman/craftsman|       70|
|              farmer|       17|
+--------------------+---------+



In [26]:
# Get the user groups stratified.
ss.sql("""SELECT gender,
                 age,
                 occupation,
                 COUNT(*) as frequency
          FROM users
          GROUP BY gender, age, occupation
          ORDER BY frequency DESC""").show()

+------+--------+--------------------+---------+
|gender|     age|          occupation|frequency|
+------+--------+--------------------+---------+
|     M|   18-24|college/grad student|      371|
|     M|   25-34|other or unspecified|      206|
|     M|   25-34|executive/managerial|      191|
|     M|   25-34| technician/engineer|      180|
|     M|   35-44|executive/managerial|      177|
|     M|   25-34|          programmer|      164|
|     F|   18-24|college/grad student|      163|
|     M|   25-34|college/grad student|      141|
|     M|   35-44| technician/engineer|      116|
|     M|Under 18|        K-12 student|      100|
|     M|   25-34|     sales/marketing|       97|
|     F|   25-34|other or unspecified|       92|
|     M|   35-44|other or unspecified|       92|
|     M|   25-34|              writer|       89|
|     M|   25-34|   academic/educator|       87|
|     M|   25-34|              artist|       82|
|     M|   35-44|   academic/educator|       77|
|     M| Over 56|   

So most of our users belong to the group of male college/grad students aged 18-24, which was kinda expected. The next 3 most populated groups are also composed of men, with various occupations and aged 25-34. Oddly, female users are a minority (why did women watch/rate about 3 times less movies than men? This is probably worthy a deeper investigation...), as well as people at the limits of the age distribution (under 18 and above 56). Not surprisingly, farmers and tradesman/craftsman are among the least frequent user occupations.

### Movies

In [27]:
# Get the year distribution.
ss.sql("""SELECT year,
                 COUNT(year) as frequency
          FROM movies
          GROUP BY year
          ORDER BY frequency DESC""").show()

+----+---------+
|year|frequency|
+----+---------+
|1996|      345|
|1995|      342|
|1998|      337|
|1997|      315|
|1999|      283|
|1994|      257|
|1993|      165|
|2000|      156|
|1986|      104|
|1992|      102|
|1990|       77|
|1987|       71|
|1988|       69|
|1985|       65|
|1989|       60|
|1984|       60|
|1991|       60|
|1982|       50|
|1981|       43|
|1980|       41|
+----+---------+
only showing top 20 rows



In [28]:
# Get some year statistics.
ss.sql("""SELECT MIN(year) as min_year,
                 MAX(year) as max_year,
                 ROUND(AVG(year), 1) as avg_year
          FROM movies""").show()

+--------+--------+--------+
|min_year|max_year|avg_year|
+--------+--------+--------+
|    1919|    2000|  1986.1|
+--------+--------+--------+



So the movies in our dataset were made between the years of 1919 and 2000. Most of them are from more recent decades, since the mean year is 1986. More precisely, most rated movies are from the 1990s.

### Ratings

In [29]:
# Get the rating distribution.
ss.sql("""SELECT rating,
                 COUNT(rating) as frequency
          FROM ratings
          GROUP BY rating
          ORDER BY frequency DESC""").show()

+------+---------+
|rating|frequency|
+------+---------+
|   4.0|   348971|
|   3.0|   261197|
|   5.0|   226310|
|   2.0|   107557|
|   1.0|    56174|
+------+---------+



In [30]:
# Get the most active users and their average ratings.
ss.sql("""SELECT userID,
                 COUNT(userID) as numRatings,
                 ROUND(AVG(rating), 1) as avgRating
          FROM ratings
          GROUP BY userID
          ORDER BY numRatings DESC""").show()

+------+----------+---------+
|userID|numRatings|avgRating|
+------+----------+---------+
|  4169|      2314|      3.6|
|  1680|      1850|      3.6|
|  4277|      1743|      4.1|
|  1941|      1595|      3.1|
|  1181|      1521|      2.8|
|   889|      1518|      2.8|
|  3618|      1344|      3.0|
|  2063|      1323|      2.9|
|  1150|      1302|      2.6|
|  1015|      1286|      3.7|
|  5795|      1277|      3.1|
|  4344|      1271|      3.3|
|  1980|      1260|      3.5|
|  2909|      1258|      3.8|
|  1449|      1243|      2.8|
|  4510|      1240|      2.8|
|   424|      1226|      3.7|
|  4227|      1222|      2.7|
|  5831|      1220|      3.7|
|  3841|      1216|      3.3|
+------+----------+---------+
only showing top 20 rows



In [31]:
# Get the most rated movies and their average ratings.
ss.sql("""SELECT movies.title,
                 movies.year,
                 COUNT(ratings.movieID) as numRatings,
                 ROUND(AVG(ratings.rating), 1) as avgRating
          FROM ratings
          INNER JOIN movies
          ON ratings.movieID = movies.movieID
          GROUP BY ratings.movieID,
                   movies.title,
                   movies.year
          ORDER BY numRatings DESC""").show(truncate=False)

+----------------------------------------------+----+----------+---------+
|title                                         |year|numRatings|avgRating|
+----------------------------------------------+----+----------+---------+
|American Beauty                               |1999|3428      |4.3      |
|Star Wars: Episode IV - A New Hope            |1977|2991      |4.5      |
|Star Wars: Episode V - The Empire Strikes Back|1980|2990      |4.3      |
|Star Wars: Episode VI - Return of the Jedi    |1983|2883      |4.0      |
|Jurassic Park                                 |1993|2672      |3.8      |
|Saving Private Ryan                           |1998|2653      |4.3      |
|Terminator 2: Judgment Day                    |1991|2649      |4.1      |
|Matrix, The                                   |1999|2590      |4.3      |
|Back to the Future                            |1985|2583      |4.0      |
|Silence of the Lambs, The                     |1991|2578      |4.4      |
|Men in Black            

As 4 stars is the most common rating and 1-2 stars are the least ones, we can say that there aren't many terribly bad movies in the dataset (in our users' oppinions). There are users that rated over 1000 movies (they probably should be rewarded for their contribution!), as well as movies that were rated by over 2000 users. American Beauty (1999) was the most rated movie, followed by the classic Star Wars trilogy (episodes IV-VI).

In [32]:
ss.catalog.dropTempView("users")
ss.catalog.dropTempView("movies")
ss.catalog.dropTempView("ratings")

## Preprocessing

The preprocessing stage aims to put users' and movies' data in good shape for the learning algorithms that come later. This involves feature encoding, scaling, joining, etc. 

In [33]:
from pyspark import keyword_only
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol
from pyspark.ml.param.shared import HasOutputCol
from pyspark.ml.linalg import DenseVector
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [34]:
# Source: http://stackoverflow.com/questions/41259885/sparsevector-vs-densevector-when-using-standardscaler
class AsDenseTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(AsDenseTransformer, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        
        asDense = udf(lambda s: DenseVector(s.toArray()), VectorUDT())
        
        return dataset.withColumn(out_col,  asDense(in_col))

### User features

The `gender` and `age` fields can be simply encoded as numbers, since `gender` has only 2 possible values (`"M" -> 0`, `"F" -> 1`) and `age` is actually an ordinal variable (i.e. there is an implicit ordering of its categories). For the `occupation` field, we use one-hot encoding to convert it to a sequence of binary features. All features are joined together in a single feature vector and then scaled to zero mean and unit standard deviation.

In [35]:
genderIndexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
ageIndexer = StringIndexer(inputCol="age", outputCol="ageIndex")
occupationIndexer = StringIndexer(inputCol="occupation", outputCol="occupationIndex")
occupationEncoder = OneHotEncoder(inputCol="occupationIndex", outputCol="occupationVector")
usersAssembler = VectorAssembler(inputCols=["genderIndex", "ageIndex", "occupationVector"],
                                 outputCol="sparseFeatures")
usersAsDenseTransform = AsDenseTransformer(inputCol="sparseFeatures", outputCol="denseFeatures")
usersScaler = StandardScaler(inputCol="denseFeatures", outputCol="scaledFeatures",
                             withStd=True, withMean=True)

In [36]:
usersPipeline = Pipeline(stages=[genderIndexer,
                                 ageIndexer,
                                 occupationIndexer,
                                 occupationEncoder,
                                 usersAssembler,
                                 usersAsDenseTransform,
                                 usersScaler])
usersTransformer = usersPipeline.fit(usersDf1)

In [37]:
usersDf2 = usersTransformer.transform(usersDf1)
usersDf2.select("userID", "scaledFeatures").show()

+------+--------------------+
|userID|      scaledFeatures|
+------+--------------------+
|     1|[1.59179488929462...|
|     2|[-0.6281176323723...|
|     3|[-0.6281176323723...|
|     4|[-0.6281176323723...|
|     5|[-0.6281176323723...|
|     6|[1.59179488929462...|
|     7|[-0.6281176323723...|
|     8|[-0.6281176323723...|
|     9|[-0.6281176323723...|
|    10|[1.59179488929462...|
|    11|[1.59179488929462...|
|    12|[-0.6281176323723...|
|    13|[-0.6281176323723...|
|    14|[-0.6281176323723...|
|    15|[-0.6281176323723...|
|    16|[1.59179488929462...|
|    17|[-0.6281176323723...|
|    18|[1.59179488929462...|
|    19|[-0.6281176323723...|
|    20|[-0.6281176323723...|
+------+--------------------+
only showing top 20 rows



In [38]:
usersDf2.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[age: string, gender: string, occupation: string, userID: bigint, zipcode: string, genderIndex: double, ageIndex: double, occupationIndex: double, occupationVector: vector, sparseFeatures: vector, denseFeatures: vector, scaledFeatures: vector]

In [39]:
usersRdd1.unpersist()
usersDf1.unpersist()

DataFrame[age: string, gender: string, occupation: string, userID: bigint, zipcode: string]

### Movie features

The `genre` list information is mapped to a set of 18 new binary features, where each one tells whether that movie belongs to some genre or not. These features are then joined to the `year` information in a single feature vector, which is then scaled to zero mean and unit standard deviation. The `title` is not useful in terms of describing the characteristics of a movie (since it is basically a unique identifier), therefore it is discarded.

In [40]:
genresRdd = moviesRdd1.map(lambda row: Row(movieID=row.movieID,
                                           genreAction=int("Action" in row.genres),
                                           genreAdventure=int("Adventure" in row.genres),
                                           genreAnimation=int("Animation" in row.genres),
                                           genreChildrens=int("Children's" in row.genres),
                                           genreComedy=int("Comedy" in row.genres),
                                           genreCrime=int("Crime" in row.genres),
                                           genreDocumentary=int("Documentary" in row.genres),
                                           genreDrama=int("Drama" in row.genres),
                                           genreFantasy=int("Fantasy" in row.genres),
                                           genreFilmNoir=int("Film-Noir" in row.genres),
                                           genreHorror=int("Horror" in row.genres),
                                           genreMusical=int("Musical" in row.genres),
                                           genreMystery=int("Mystery" in row.genres),
                                           genreRomance=int("Romance" in row.genres),
                                           genreSciFi=int("Sci-Fi" in row.genres),
                                           genreThriller=int("Thriller" in row.genres),
                                           genreWar=int("War" in row.genres),
                                           genreWestern=int("Western" in row.genres)))
genresRdd.take(5)

[Row(genreAction=0, genreAdventure=0, genreAnimation=1, genreChildrens=1, genreComedy=1, genreCrime=0, genreDocumentary=0, genreDrama=0, genreFantasy=0, genreFilmNoir=0, genreHorror=0, genreMusical=0, genreMystery=0, genreRomance=0, genreSciFi=0, genreThriller=0, genreWar=0, genreWestern=0, movieID=1),
 Row(genreAction=0, genreAdventure=1, genreAnimation=0, genreChildrens=1, genreComedy=0, genreCrime=0, genreDocumentary=0, genreDrama=0, genreFantasy=1, genreFilmNoir=0, genreHorror=0, genreMusical=0, genreMystery=0, genreRomance=0, genreSciFi=0, genreThriller=0, genreWar=0, genreWestern=0, movieID=2),
 Row(genreAction=0, genreAdventure=0, genreAnimation=0, genreChildrens=0, genreComedy=1, genreCrime=0, genreDocumentary=0, genreDrama=0, genreFantasy=0, genreFilmNoir=0, genreHorror=0, genreMusical=0, genreMystery=0, genreRomance=1, genreSciFi=0, genreThriller=0, genreWar=0, genreWestern=0, movieID=3),
 Row(genreAction=0, genreAdventure=0, genreAnimation=0, genreChildrens=0, genreComedy=1,

In [41]:
genresDf = ss.createDataFrame(genresRdd)

In [42]:
moviesDf2 = moviesDf1.join(genresDf, ["movieID"]).drop("genres")
moviesDf2.printSchema()

root
 |-- movieID: long (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- genreAction: long (nullable = true)
 |-- genreAdventure: long (nullable = true)
 |-- genreAnimation: long (nullable = true)
 |-- genreChildrens: long (nullable = true)
 |-- genreComedy: long (nullable = true)
 |-- genreCrime: long (nullable = true)
 |-- genreDocumentary: long (nullable = true)
 |-- genreDrama: long (nullable = true)
 |-- genreFantasy: long (nullable = true)
 |-- genreFilmNoir: long (nullable = true)
 |-- genreHorror: long (nullable = true)
 |-- genreMusical: long (nullable = true)
 |-- genreMystery: long (nullable = true)
 |-- genreRomance: long (nullable = true)
 |-- genreSciFi: long (nullable = true)
 |-- genreThriller: long (nullable = true)
 |-- genreWar: long (nullable = true)
 |-- genreWestern: long (nullable = true)



In [43]:
moviesAssembler = VectorAssembler(inputCols=["year"] + [x for x in moviesDf2.schema.names if "genre" in x],
                                  outputCol="sparseFeatures")
moviesAsDenseTransform = AsDenseTransformer(inputCol="sparseFeatures", outputCol="denseFeatures")
moviesScaler = StandardScaler(inputCol="denseFeatures", outputCol="scaledFeatures",
                              withStd=True, withMean=True)

In [44]:
moviesPipeline = Pipeline(stages=[moviesAssembler,
                                  moviesAsDenseTransform,
                                  moviesScaler])
moviesTransformer = moviesPipeline.fit(moviesDf2)

In [45]:
moviesDf3 = moviesTransformer.transform(moviesDf2)
moviesDf3.select("movieID", "scaledFeatures").show()

+-------+--------------------+
|movieID|      scaledFeatures|
+-------+--------------------+
|     26|[0.52871716866845...|
|     29|[0.52871716866845...|
|    474|[0.41034378579074...|
|    964|[-2.3122440203966...|
|   1677|[0.64709055154616...|
|   1697|[0.46953047722959...|
|   1806|[0.70627724298502...|
|   1950|[-1.1285101916195...|
|   2040|[-0.9509501173029...|
|   2214|[-3.2000443919795...|
|   2250|[0.23278371147416...|
|   2453|[-0.0039630542812...|
|   2509|[0.64709055154616...|
|   2529|[-1.0693235001806...|
|   2927|[-2.3714307118355...|
|   3091|[-0.3590832029144...|
|   3506|[-0.4182698943532...|
|   3764|[0.35115709435188...|
|     65|[0.58790386010731...|
|    191|[0.52871716866845...|
+-------+--------------------+
only showing top 20 rows



In [46]:
moviesDf3.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[movieID: bigint, title: string, year: bigint, genreAction: bigint, genreAdventure: bigint, genreAnimation: bigint, genreChildrens: bigint, genreComedy: bigint, genreCrime: bigint, genreDocumentary: bigint, genreDrama: bigint, genreFantasy: bigint, genreFilmNoir: bigint, genreHorror: bigint, genreMusical: bigint, genreMystery: bigint, genreRomance: bigint, genreSciFi: bigint, genreThriller: bigint, genreWar: bigint, genreWestern: bigint, sparseFeatures: vector, denseFeatures: vector, scaledFeatures: vector]

In [47]:
moviesRdd1.unpersist()
moviesDf1.unpersist()

DataFrame[genres: array<string>, movieID: bigint, title: string, year: bigint]

### Rating values

What to do when we have to recommend movies to a new user that never rated anything before (i.e. we know nothing about his/her tastes)? A common approach is to just recommend the overall top rated movies. This can be done in a simple manner by computing the rates relative to their respective averages and then, in the end, adding the average rate to each movie's predicted score.

In [48]:
avgRatingsDf = ratingsDf1.groupBy("movieID").avg("rating")
avgRatingsDf.show()

+-------+------------------+
|movieID|       avg(rating)|
+-------+------------------+
|     29| 4.062034739454094|
|   1806| 2.892857142857143|
|    474| 3.825102880658436|
|   2453|              3.25|
|   2529| 3.712564543889845|
|   2040|         2.9453125|
|     26|              3.53|
|   3506|3.3652694610778444|
|   3091| 4.283687943262412|
|   2250|3.1451612903225805|
|   1677|2.7567567567567566|
|   1950| 4.129310344827586|
|   3764|2.6408163265306124|
|   2927|  4.08080808080808|
|    964| 3.392156862745098|
|   2509|2.7777777777777777|
|   2214|               3.0|
|   1840|           3.38125|
|   1277| 3.884297520661157|
|    541| 4.273333333333333|
+-------+------------------+
only showing top 20 rows



In [49]:
avgRatingsDf.cache()

DataFrame[movieID: bigint, avg(rating): double]

In [50]:
ratingsDf2 = ratingsDf1.join(avgRatingsDf, ["movieID"]) \
                       .select("*", col("rating") - col("avg(rating)")) \
                       .withColumnRenamed("(rating - avg(rating))", "adjustedRating")

In [51]:
ratingsDf2.select("userID", "movieID", "rating", "adjustedRating").show()

+------+-------+------+-------------------+
|userID|movieID|rating|     adjustedRating|
+------+-------+------+-------------------+
|    18|     26|   4.0| 0.4700000000000002|
|    69|     26|   4.0| 0.4700000000000002|
|   229|     26|   4.0| 0.4700000000000002|
|   342|     26|   4.0| 0.4700000000000002|
|   524|     26|   3.0|-0.5299999999999998|
|   655|     26|   3.0|-0.5299999999999998|
|   748|     26|   5.0| 1.4700000000000002|
|   881|     26|   3.0|-0.5299999999999998|
|   890|     26|   3.0|-0.5299999999999998|
|   918|     26|   4.0| 0.4700000000000002|
|   963|     26|   4.0| 0.4700000000000002|
|   973|     26|   4.0| 0.4700000000000002|
|  1015|     26|   3.0|-0.5299999999999998|
|  1069|     26|   3.0|-0.5299999999999998|
|  1120|     26|   3.0|-0.5299999999999998|
|  1150|     26|   3.0|-0.5299999999999998|
|  1182|     26|   2.0|-1.5299999999999998|
|  1203|     26|   4.0| 0.4700000000000002|
|  1279|     26|   3.0|-0.5299999999999998|
|  1314|     26|   1.0|         

In [52]:
ratingsDf2.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[movieID: bigint, rating: double, timestamp: timestamp, userID: bigint, avg(rating): double, adjustedRating: double]

In [53]:
ratingsRdd1.unpersist()
ratingsDf1.unpersist()

DataFrame[movieID: bigint, rating: double, timestamp: timestamp, userID: bigint]

## Model training

In total, three types of models are going to be trained: 

* The collaborative filtering model, which is the main engine we use for the recommendations;
* Two secondary, content-based models: one that learns to predict scores based on movie features for each user, and other that learns to predict scores based on user features for each movie.

The goal is to use one of the secondary models as a substitute for the collaborative filtering when the movie has no rating at all, because in such situation the collaborative filtering doesn't have enough information to predict anything.

### Collaborative filtering (Alternating Least Squares)

The collaborative filtering technique only requires us to provide the user IDs, the movie IDs and the rating each user gave to each movie they watched.

Model tuning (finding the best combination of model parameters) was performed using a grid-search with a 3-fold cross-validation.

In [54]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [55]:
# Alternating Least Squares (ALS) estimator.
als = ALS(userCol="userID",
          itemCol="movieID",
          ratingCol="adjustedRating",
          maxIter=10)

In [56]:
# Build the parameter grid for tuning.
alsParamGrid = ParamGridBuilder().addGrid(als.regParam, [0.001, 0.01, 0.1, 1, 10]) \
                                 .build()

In [57]:
# 3-fold cross-validation.
alsCV = CrossValidator(estimator=als,
                       estimatorParamMaps=alsParamGrid,
                       evaluator=RegressionEvaluator(labelCol="adjustedRating"),
                       numFolds=3)

In [58]:
alsModel = alsCV.fit(ratingsDf2)

In [59]:
alsPredictionsDf = alsModel.transform(ratingsDf2)
alsPredictionsDf.select("userID", "movieID", "rating", "adjustedRating", "prediction") \
                .show()

+------+-------+------+-------------------+-----------+
|userID|movieID|rating|     adjustedRating| prediction|
+------+-------+------+-------------------+-----------+
|    53|    148|   5.0|  2.217391304347826|    1.88291|
|   673|    148|   5.0|  2.217391304347826|  2.2567441|
|  4169|    148|   3.0|0.21739130434782616| 0.72547275|
|  4227|    148|   2.0|-0.7826086956521738| -1.0292696|
|  5333|    148|   3.0|0.21739130434782616| -0.7330352|
|  3184|    148|   4.0| 1.2173913043478262|  1.2678249|
|  4387|    148|   1.0|-1.7826086956521738| -1.4056144|
|  4784|    148|   3.0|0.21739130434782616|  1.0220977|
|  2383|    148|   2.0|-0.7826086956521738| -0.6611125|
|  1242|    148|   3.0|0.21739130434782616|-0.18989654|
|  3539|    148|   3.0|0.21739130434782616| 0.11645946|
|  1069|    148|   2.0|-0.7826086956521738| -0.8080865|
|  1605|    148|   2.0|-0.7826086956521738|-0.93036544|
|   840|    148|   1.0|-1.7826086956521738| -0.8153674|
|   216|    148|   2.0|-0.7826086956521738|-0.24

In [60]:
alsPredictionsDf.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[movieID: bigint, rating: double, timestamp: timestamp, userID: bigint, avg(rating): double, adjustedRating: double, prediction: float]

In [61]:
# Evaluate using the root mean squared error (RMSE).
rmse = RegressionEvaluator(labelCol="adjustedRating",
                           predictionCol="prediction",
                           metricName="rmse").evaluate(alsPredictionsDf)

In [62]:
# Evaluate using the coefficient of determination (R^2 score).
r2 = RegressionEvaluator(labelCol="adjustedRating",
                         predictionCol="prediction",
                         metricName="r2").evaluate(alsPredictionsDf)

In [63]:
print("Root Mean Squared Error (RMSE) = %g" % rmse)
print("Coefficient of Determination (R^2) = %g" % r2)

Root Mean Squared Error (RMSE) = 0.763837
Coefficient of Determination (R^2) = 0.385886


**NOTE**: The reason I trained and tested the model on the same dataset (instead of randomly splitting it into two disjoint datasets, one for training and the other for test) is [this](https://issues.apache.org/jira/browse/SPARK-14489). Apparently the problem was fixed in Spark 2.2.0 with the addition of a new input parameter for ALS, but I'm currently using Spark 2.1.0 (latest stable version when I wrote this), so it is not available yet.

### Content-based

## Results