# Background

In this project we'll use the famous movie lens dataset to build a recommender system. (Availible here: : http://grouplens.org/datasets/movielens/latest/). We'll use the full one containing 26 million ratings. 

Movies Data File Structure (movies.csv)
---------------------------------------

Movie information is contained in the file `movies.csv`. Each line of this file after the header row represents one movie, and has the following format:

    movieId,title,genres

Movie titles are entered manually or imported from <https://www.themoviedb.org/>, and include the year of release in parentheses. Errors and inconsistencies may exist in these titles.

Genres are a pipe-separated list, and are selected from the following:

* Action
* Adventure
* Animation
* Children's
* Comedy
* Crime
* Documentary
* Drama
* Fantasy
* Film-Noir
* Horror
* Musical
* Mystery
* Romance
* Sci-Fi
* Thriller
* War
* Western
* (no genres listed)

Ratings Data File Structure (ratings.csv)
-----------------------------------------

All ratings are contained in the file `ratings.csv`. Each line of this file after the header row represents one rating of one movie by one user, and has the following format:

    userId,movieId,rating,timestamp

The lines within this file are ordered first by userId, then, within user, by movieId.

Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).

Timestamps represent seconds since midnight Coordinated Universal Time (UTC) of January 1, 1970.




Firstly we'll start a new spark session and load all the required libraries that will be used in this project

In [43]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import avg,count, lit, col
from pyspark.ml.evaluation import RegressionEvaluator
spark = SparkSession.builder.appName('Recommender').getOrCreate()
import time
import math

Next we'll import the ratings and movies datasets

In [4]:
ratings_data = spark.read.csv("/user/a208669/recommendation_data/movie_lens/ratings.csv",inferSchema=True,header=True)
movie_data = spark.read.csv("/user/a208669/recommendation_data/movie_lens/movies.csv",inferSchema=True,header=True)


We'll have a quick look at the structure of the data

In [4]:
print(ratings_data.printSchema())
print(movie_data.printSchema())

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

None
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

None


In [5]:
ratings_data.describe().show()

+-------+-----------------+------------------+------------------+--------------------+
|summary|           userId|           movieId|            rating|           timestamp|
+-------+-----------------+------------------+------------------+--------------------+
|  count|         26024289|          26024289|          26024289|            26024289|
|   mean| 135037.090248114|15849.109677040553|3.5280903543608817| 1.171258432691322E9|
| stddev|78176.19722170367| 31085.25753139178| 1.065442763666231|2.0528887028184628E8|
|    min|                1|                 1|               0.5|           789652004|
|    max|           270896|            176275|               5.0|          1501829870|
+-------+-----------------+------------------+------------------+--------------------+



Let's see which movies are the most popular of all times. Let's only cosider movies that were rated by at least a thousand users so as not to bias the results

In [16]:
mean_ratings = ratings_data.groupBy('movieId').agg(avg("rating").alias("Mean_rating"),count("userId").alias("Number_users"))

In [18]:
mean_ratings_with_name = mean_ratings.join(movie_data,on='movieId',how='inner')

In [19]:
mean_ratings_with_name.filter(mean_ratings_with_name["Number_users"] >= 1000).orderBy('Mean_rating',ascending=False).show(n=10,truncate=False)

+-------+------------------+------------+-------------------------------------------+---------------------------+
|movieId|Mean_rating       |Number_users|title                                      |genres                     |
+-------+------------------+------------+-------------------------------------------+---------------------------+
|318    |4.429014514393623 |91082       |Shawshank Redemption, The (1994)           |Crime|Drama                |
|858    |4.339810758717364 |57070       |Godfather, The (1972)                      |Crime|Drama                |
|50     |4.300188962561792 |59271       |Usual Suspects, The (1995)                 |Crime|Mystery|Thriller     |
|527    |4.266530696698294 |67662       |Schindler's List (1993)                    |Drama|War                  |
|1221   |4.263475012950189 |36679       |Godfather: Part II, The (1974)             |Crime|Drama                |
|2019   |4.255073602972702 |13994       |Seven Samurai (Shichinin no samurai) (1954)|Act

Good choices! Let's build our first recommender. We'll first split our data into a training and test set

In [5]:
train, test = ratings_data.randomSplit([0.7,0.3],seed=8734)

The hard part is usually to get the data in the required format so that Spark can work with it. Fortunately all the hard work has already been done. spark.ml currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the alternating least squares (ALS) algorithm to learn these latent factors. The implementation in spark.ml has the following parameters:

* numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
* rank is the number of latent factors in the model (defaults to 10).
* maxIter is the maximum number of iterations to run (defaults to 10).
* regParam specifies the regularization parameter in ALS (defaults to 1.0).
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
* nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).

We'll use a regParam value of 0.01 and leave the rest at the default values

In [6]:
start = time.time()
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",seed=9834)
model = als.fit(train)
end = time.time()
print(end - start)

26.17970609664917


Not bad. The model trained in 26 seconds!  Let's evaluate the model by computing the RMSE on the test data

In [7]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root mean square error = " + str(rmse))


Root mean square error = nan


The reason we get nan for the RMSE is due to the fact that the verion of Spark we're using (2.1.0) hasn't implemented the coldStartStrategy  options yet (only availible in Spark 2.2 and newer).

When making predictions using an ALSModel, it is common to encounter users and/or items in the test dataset that were not present during training the model. This typically occurs in two scenarios:

In production, for new users or items that have no rating history and on which the model has not been trained (this is the “cold start problem”).
During cross-validation, the data is split between training and evaluation sets. When using simple random splits as in Spark’s CrossValidator or TrainValidationSplit, it is actually very common to encounter users and/or items in the evaluation set that are not in the training set
By default, Spark assigns NaN predictions during ALSModel.transform when a user and/or item factor is not present in the model. This can be useful in a production system, since it indicates a new user or item, and so the system can make a decision on some fallback to use as the prediction.


So we'll have to manually calculate the RMSE :-)

In [8]:
predictions2= predictions.withColumn('Squared Error',(predictions['rating'] - predictions['prediction'])**2)

In [9]:
MSE = predictions2.na.drop(subset = 'Squared Error').select(avg('Squared Error')).collect()

In [10]:
print("RMSE : {}".format(math.sqrt(MSE[0][0])))

RMSE : 0.8414258539029678


Not brilliant but okay for an initial model

In [11]:
predictions_no_nulls = predictions.na.drop()
predictions_no_nulls.select('rating','prediction').describe().show()

+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|           7804115|           7804115|
|   mean|3.5281759046349266|3.4457082808430863|
| stddev|1.0651522053984133|0.7905352470826391|
|    min|               0.5|        -8.9178915|
|    max|               5.0|         16.661182|
+-------+------------------+------------------+



We see some ratings are predicted as being negative or larger than 5. This is the disadvantage of using the ALS algorithm: the predicted values aren't bounded to be between 0 and 5. 

I've exported the movies I've rated from Movie Lens and will now give myself recommendations! Due to the cold start problem, I'll have to retrain the model however by including the new user. This is unfortunately a disadvantage of the current algorithm

In [14]:
my_ratings = spark.read.csv("/user/a208669/recommendation_data/movie_lens/Riaan_ratings.csv",header=True,inferSchema=True)

In [21]:
my_ratings.show()

+-------+-------+-------+------+--------------+--------------------+------+
|movieId|imdb_id|tmdb_id|rating|average_rating|               title|userId|
+-------+-------+-------+------+--------------+--------------------+------+
|    260|  76759|     11|   4.0|       4.12524|    Star Wars (1977)|270897|
|    608| 116282|    275|   3.5|       4.10569|        Fargo (1996)|270897|
|    910|  53291|    239|   4.0|       4.03157|Some Like It Hot ...|270897|
|   1193|  73486|    510|   4.0|       4.22633|One Flew Over the...|270897|
|   1196|  80684|   1891|   4.0|       4.13686|The Empire Strike...|270897|
|   1210|  86190|   1892|   4.0|       3.98753|Return of the Jed...|270897|
|   1907| 120762|  10674|   4.0|       3.70162|        Mulan (1998)|270897|
|   2067|  59113|    907|   5.0|       3.91923|Doctor Zhivago (1...|270897|
|   2115|  87469|     87|   3.0|       3.69097|Indiana Jones and...|270897|
|   2712| 120663|    345|   3.0|       3.23929|Eyes Wide Shut (1...|270897|
|   2762| 16

I'll give myself a new UserId that's not part of the original dataset

In [18]:
my_ratings = my_ratings.withColumn('userId',lit(270897)).withColumnRenamed('movie_id','movieId')

In [19]:
my_ratings.printSchema() 

root
 |-- movieId: integer (nullable = true)
 |-- imdb_id: integer (nullable = true)
 |-- tmdb_id: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: integer (nullable = false)



In [20]:
my_ratings.show()

+-------+-------+-------+------+--------------+--------------------+------+
|movieId|imdb_id|tmdb_id|rating|average_rating|               title|userId|
+-------+-------+-------+------+--------------+--------------------+------+
|    260|  76759|     11|   4.0|       4.12524|    Star Wars (1977)|270897|
|    608| 116282|    275|   3.5|       4.10569|        Fargo (1996)|270897|
|    910|  53291|    239|   4.0|       4.03157|Some Like It Hot ...|270897|
|   1193|  73486|    510|   4.0|       4.22633|One Flew Over the...|270897|
|   1196|  80684|   1891|   4.0|       4.13686|The Empire Strike...|270897|
|   1210|  86190|   1892|   4.0|       3.98753|Return of the Jed...|270897|
|   1907| 120762|  10674|   4.0|       3.70162|        Mulan (1998)|270897|
|   2067|  59113|    907|   5.0|       3.91923|Doctor Zhivago (1...|270897|
|   2115|  87469|     87|   3.0|       3.69097|Indiana Jones and...|270897|
|   2712| 120663|    345|   3.0|       3.23929|Eyes Wide Shut (1...|270897|
|   2762| 16

In [22]:
new_train =train.select(train.columns[0:-1]).union(my_ratings.select(train.columns[0:-1]))


In [24]:
start = time.time()
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",seed=9834)
model2 = als.fit(new_train)
end = time.time()
print(end - start)

15.18872356414795


Next we'll create a list of all movies I haven't watched yet, so that we can run it through the recommendation system

In [25]:
new_user_ratings_ids = my_ratings.select('movieId')

new_user_unrated_movies = movie_data.select('movieId').join(new_user_ratings_ids,how='left_anti',on='movieId')\
.withColumn('userId',lit(270897))

In [26]:
new_user_recommendations =model2.transform(new_user_unrated_movies).join(movie_data,on='movieId',how='left')\
.na.drop().cache()

Let's see the recommendations

In [29]:
new_user_recommendations.orderBy('prediction',ascending=False).select('title','genres','prediction').show(n=10,truncate=False)

+---------------------------------------------+--------------------------+----------+
|title                                        |genres                    |prediction|
+---------------------------------------------+--------------------------+----------+
|The Tom Green Subway Monkey Hour (2002)      |Comedy                    |11.610085 |
|Pumpkin Pie Wars (2016)                      |Romance                   |9.63825   |
|James White (2015)                           |Drama                     |9.420428  |
|The Winslow Boy (1948)                       |Children|Crime|Drama      |8.738054  |
|The Phenom (2016)                            |Drama                     |8.634741  |
|Romantically Speaking (2015)                 |Children|Romance          |8.3375635 |
|Naked as We Came (2013)                      |(no genres listed)        |8.240689  |
|Apache Blood (1975)                          |Western                   |8.086406  |
|Disney Sing Along Songs: Under the Sea (1990)|Animati

Can't say a lot of these titles speak to me, but I'm more of a Drama and Film-Noir fan! We also see a lot of ratings higher than 5

In [41]:
new_user_recommendations.filter(col('genres').like("%Drama%")).orderBy('prediction',ascending=False).\
show(n=10,truncate=False)

+-------+------+----------+-----------------------------+------------------------+
|movieId|userId|prediction|title                        |genres                  |
+-------+------+----------+-----------------------------+------------------------+
|152123 |270897|9.420428  |James White (2015)           |Drama                   |
|69072  |270897|8.738054  |The Winslow Boy (1948)       |Children|Crime|Drama    |
|160678 |270897|8.634741  |The Phenom (2016)            |Drama                   |
|170997 |270897|7.2841096 |Shala (2011)                 |Drama                   |
|112577 |270897|7.179922  |Willie & Phil (1980)         |Comedy|Drama|Romance    |
|118236 |270897|7.142202  |Bleak Night (2010)           |Drama                   |
|153026 |270897|6.96825   |Oriental Elegy (1996)        |Drama                   |
|169682 |270897|6.963565  |Fare (2017)                  |Drama|Thriller          |
|113632 |270897|6.9356084 |Other F Word, The (2011)     |Comedy|Documentary|Drama|
|907

Looks much more interesting!