# Movie Recommender using SparkML ALS

This notebook implements a recommender system using Apache Spark to recommend movies to a user.  More specifically, the sparkml library's Alternating Least Squares method is used to make predictions. The core algorithm behind ALS is Matrix Factorization with the added benefit of being able to run in parallel in a cluster.  

We will be using the popular MovieLens dataset. The movies.csv and ratings.csv files have been imported and stored as sql tables 'movies_small_csv' and 'ratings_small_csv' for easy access. A smaller version of the dataset has been used for demo purposes but the same code can run a much larger dataset using a bigger cluster.

In [2]:
# load and cache data
#sqlContext = SQLContext(sc)
raw_ratings_df = sqlContext.sql("SELECT * FROM ratings_small_csv")
raw_movies_df = sqlContext.sql("SELECT * FROM movies_small_csv")

# drop the timestamp and genre columns since we won't be using them
ratings_df = raw_ratings_df.drop('timestamp')
movies_df = raw_movies_df.drop('genres')

# cache the dataframes
ratings_df.cache()
movies_df.cache()

raw_ratings_count = raw_ratings_df.count()
ratings_count = ratings_df.count()
raw_movies_count = raw_movies_df.count()
movies_count = movies_df.count()

In [3]:
print(raw_ratings_count)
print(raw_movies_count)

In [4]:
# verify some of the data
assert ratings_df.is_cached
assert movies_df.is_cached
assert raw_ratings_count == ratings_count
assert raw_movies_count == movies_count
assert ratings_count ==  100836 # full dataset has 27,753,444
assert movies_count == 9742 # full dataset has 58,098

In [5]:
display(movies_df)

movieId,title
1,Toy Story (1995)
2,Jumanji (1995)
3,Grumpier Old Men (1995)
4,Waiting to Exhale (1995)
5,Father of the Bride Part II (1995)
6,Heat (1995)
7,Sabrina (1995)
8,Tom and Huck (1995)
9,Sudden Death (1995)
10,GoldenEye (1995)


In [6]:
display(ratings_df)

userId,movieId,rating
1,1,4.0
1,3,4.0
1,6,4.0
1,47,5.0
1,50,5.0
1,70,3.0
1,101,5.0
1,110,4.0
1,151,5.0
1,157,5.0


The image below shows the movie ratings matrix. The rows in the ratings matrix represents the ratings of each user and the columns represent each movie (item in general) for which we may have a rating. 

Since not all users have rated all movies, we do not know all of the entries in this matrix, therefore we need an algorithm like collaborative filtering to predict the missing ratings. In fact, in practice these rating matrices are often very sparse. In matrix factorization (MF) the idea is to approximate the ratings matrix by factorizing it as the product of two matrices: one that describes properties of each user (shown in green), and one that describes properties of each movie (shown in blue).

<img alt="factorization" src="http://spark-mooc.github.io/web-assets/images/matrix_factorization.png" style="width: 885px"/>
<br clear="all"/>

Instead of directly decomposing the ratings matrix (e.g. by singular value decomposition) we use machine learning to to find two matrices such that the error for the available ratings is minimized. The [Alternating Least Squares][als] algorithm does this by first randomly filling the users matrix with values and then optimizing the value of the movies such that the error is minimized.  Then, it holds the movies matrix constant and optimizes the value of the user's matrix.  This alternation between which matrix to optimize is the reason for the "alternating" in the name.

The image on the right shows this alternating optimization. Using the a fixed set of user factors and the known ratings, we optimize for the best values for the movie factors.  Then we "alternate" and optimize for the best user factors using the latest fixed movie factors.

[als]: https://en.wikiversity.org/wiki/Least-Squares_Method
[mllib]: http://spark.apache.org/docs/latest/mllib-guide.html
[collab]: https://en.wikipedia.org/?title=Collaborative_filtering
[collab2]: http://recommender-systems.org/collaborative-filtering/

#### Splitting the Dataset into Training, Validation and Test Sets

We can use the pySpark randomSplit() transformation. randomSplit() takes a set of splits and a seed and returns multiple DataFrames.

In [9]:
# We will use a  60-20-20 split got for training, validation, and testing
seed = 123
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6, 0.2, 0.2], seed=seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

#### Tuning Parameters for Alternating Least Squares

ALS takes a training dataset (DataFrame) and a few model parameters. The most important parameter is the rank for matrix factorization (i.e. number of latent factors for users and items). In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to [overfitting](https://en.wikipedia.org/wiki/Overfitting).  We will train models using the `training_df` dataset with ranks [10, 15, 20]. Regularization parameter is set to 0.1 (can also try other values but 0.1 works well for most models). Then we use the validation set to evaluate each model and keep the model with the best error. 

You can read the documentation here: [ALS](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS).

#### Why are we doing our own cross-validation?

A challenge for collaborative filtering is how to provide ratings to a new user (a user who has not provided *any* ratings at all). Some recommendation systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings), while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN (`Not a Number`) value when asked to provide a rating for a new user.

Using the ML Pipeline's [CrossValidator](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds (e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result, ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make *all* of the parameters in the parameter grid appear to be equally good (or bad).

You can read the discussion on [Spark JIRA 14489](https://issues.apache.org/jira/browse/SPARK-14489) about this issue. There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues. We have chosen to have RMSE drop NaN values. While this does not solve the underlying issue of ALS not predicting a value for a new user, it does provide some evaluation value. We manually implement the parameter grid search process using a for loop (below) and remove the NaN values before using RMSE.

For a production application, you would want to consider the tradeoffs in how to handle new users.

In [11]:
from pyspark.ml.recommendation import ALS

# Initialize our ALS model object
als = ALS()

# Now we set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setUserCol('userId') \
   .setItemCol('movieId') \
   .setRatingCol('rating') \
   .setRegParam(0.1)
   #.setPredictionCol('prediction')

# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03
ranks = [10, 15, 20]
errors = []
models = []
model_index = 0
min_error = float('inf')
best_rank = -1
best_reg = -1
best_model_index = -1
for rank in ranks:
  # Set the rank
  als.setRank(rank)
    
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(validation_df)

  # Remove NaN values from prediction (due to SPARK-14489)
  #predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))
  predicted_ratings_df = predict_df[predict_df.prediction != float('nan')]

  # Run the previously created RMSE evaluator, 
  # reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors.append(error)
  models.append(model)
  print('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = rank
    best_model_index = model_index
  model_index += 1

als.setRank(best_rank)
print('The best model is model[%s] with rank %s and a RMSE on the validation set of %s' % (best_model_index, best_rank, errors[best_model_index]))
my_model = models[best_model_index]

#### Test the Model
Now that we have picked a good rank for matrix factorization, let's test our model with the test dataset and see if we can get a comparable RMSE.

In [13]:
# Use the trained model to get predictions. a new column 'prediction' will be added to the dataframe 
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

#### Make Prediction for a New User
Let's use our trained model to make some movie prediction for a new user. In order to do that we need to add some new ratings to ratings_df dataset. 

We will first look at some popular movies to help us pick movies to rate.

In [15]:
from pyspark.sql import functions as F

# From ratingsDF, create a movie_ids_with_avg_ratings_df that combines the two DataFrames
# movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))

movie_ids_with_avg_ratings_df = sqlContext.sql(
  "SELECT movieId, COUNT(rating) as count, AVG(rating) as average \
  FROM ratings_small_csv \
  group by movieId"
)

print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(3, truncate=False)

movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movies_df, 'movieId')

print('movie_names_with_avg_ratings_df:')
movie_names_with_avg_ratings_df.show(3, truncate=False)

# Let's filter the dataframe to only those movies with at least 100 ratings
movies_with_100_ratings_or_more = movie_names_with_avg_ratings_df.filter(movie_names_with_avg_ratings_df['count'] >= 100)
print('Movies with highest ratings:')
display(movies_with_100_ratings_or_more.orderBy(movies_with_100_ratings_or_more['average'].desc()).take(100))

movieId,count,average,title
318,317,4.429022082018927,"Shawshank Redemption, The (1994)"
858,192,4.2890625,"Godfather, The (1972)"
2959,218,4.272935779816514,Fight Club (1999)
1221,129,4.25968992248062,"Godfather: Part II, The (1974)"
48516,107,4.252336448598131,"Departed, The (2006)"
1213,126,4.25,Goodfellas (1990)
912,100,4.24,Casablanca (1942)
58559,149,4.238255033557047,"Dark Knight, The (2008)"
50,204,4.237745098039215,"Usual Suspects, The (1995)"
1197,142,4.232394366197183,"Princess Bride, The (1987)"


The user ID 0 is unassigned, so we will use it for our ratings. We set the variable `my_user_ID` to 0 for the new user. Next, create a new DataFrame called `my_ratings_df`, with our ratings for at least 10 movie ratings. Each entry should be formatted as `(my_user_id, movieID, rating)`.  As in the original dataset, ratings should be between 1 and 5 (inclusive).

In [17]:
from pyspark.sql import Row
my_user_id = 0

# The format of each line is (my_user_id, movie ID, your rating)
# For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, we add the following line:
#   (my_user_id, 260, 5),
my_rated_movies = [
  (my_user_id, 231, 4.5), # Dumb and Dumber
  (my_user_id, 296, 4.5), # Pulp Fiction
  (my_user_id, 356, 5.0), # Forrest Gump
  (my_user_id, 4993, 5.0), # Lord of the Rings: Fellowship of the Rings
  (my_user_id, 1222, 3.5), # Full Metal Jacket
  (my_user_id, 32, 3.5), # 12 Monkeys
  (my_user_id, 4995, 5.0), # A Beautiful Mind
  (my_user_id, 1206, 2.0), # A Clockwork Orange
  (my_user_id, 1200, 4.0), # Aliens
  (my_user_id, 1, 3.5), # Toy Story
  (my_user_id, 589, 5.0), # Terminator 2: Judgement Day
  (my_user_id, 7438, 5.0), # Kill Bill Vol.2
  (my_user_id, 5445, 4.5), # Minority Report
  (my_user_id, 34, 2.0), # Babe
  (my_user_id, 44191, 2.0) # V for Vendetta
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print('My movie ratings:')
display(my_ratings_df.limit(10))

userId,movieId,rating
0,231,4.5
0,296,4.5
0,356,5.0
0,4993,5.0
0,1222,3.5
0,32,3.5
0,4995,5.0
0,1206,2.0
0,1200,4.0
0,1,3.5


#### Adding Our Ratings to Training Dataset
Let's now add our movie ratings to the training dataset so that our model can take our preferences into consideration.

In [19]:
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)

print ('The training dataset now has %s more entries than the original training dataset' %
       (training_with_my_ratings_df.count() - training_df.count()))
assert (training_with_my_ratings_df.count() - training_df.count()) == my_ratings_df.count()

### Train a Model with Your Ratings

Now, train a model with your ratings added and the parameters used in the previous part.

In [21]:
# Reset the parameters for the ALS object.
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setUserCol('userId') \
   .setItemCol('movieId') \
   .setRatingCol('rating') \
   .setRegParam(0.1) \
   .setRank(best_rank) #10

# Create the model with these parameters.
my_ratings_model = als.fit(training_with_my_ratings_df)

# Compute the prediction for this new model on the test set.
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))


### Predict Ratings for New User

Now let's predict what ratings we would give to the movies that we did not already provide ratings for.

We will first Filter out the movies you already rated manually. (Use the `my_rated_movie_ids` variable.) Put the results in a new `not_rated_df`. Then we use our new ratings model to make prediuctions on the unrated movies.

In [23]:
# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]

# Filter out the movies we already rated.
not_rated_df = movies_df.filter(~ movies_df["movieId"].isin(my_rated_movie_ids)) # "NOT IN"
display(not_rated_df)

movieId,title
2,Jumanji (1995)
3,Grumpier Old Men (1995)
4,Waiting to Exhale (1995)
5,Father of the Bride Part II (1995)
6,Heat (1995)
7,Sabrina (1995)
8,Tom and Huck (1995)
9,Sudden Death (1995)
10,GoldenEye (1995)
11,"American President, The (1995)"


In [24]:
from pyspark.sql.functions import lit
# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.withColumn("userId", lit(my_user_id))

# needed to add this line to avoid the exception
# org.apache.spark.sql.AnalysisException: Detected implicit cartesian product 
spark.conf.set( "spark.sql.crossJoin.enabled" , "true" )

# Use my_rating_model to predict ratings for the movies that we did not manually rate.
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

predicted_ratings_df.show(10)

Let's clean the predictions and give top 25 prediction with hightest scores.

In [26]:
# First let's join our predicted_ratings_df with movie_names_with_avg_ratings_df to ontain the ratings counts for each movie.
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df, ["movieId", "title"])

# Then sort the resulting DataFrame (`predicted_with_counts_df`) by predicted rating (highest ratings first), and remove any ratings with a count of 50 or less.
predicted_highest_rated_movies_df = predicted_with_counts_df.filter(predicted_with_counts_df['count'] >= 50)


# Finally, print the top 25 movies that remain.
top_n = 25
print ('My %s highest rated movies as predicted (for movies with more than 50 reviews):' %top_n)
display(predicted_highest_rated_movies_df.orderBy(predicted_ratings_df['prediction'].desc()).take(top_n))


movieId,title,userId,prediction,count,average
76093,How to Train Your Dragon (2010),0,4.8831048011779785,53,3.943396226415094
112852,Guardians of the Galaxy (2014),0,4.844204902648926,59,4.050847457627119
58559,"Dark Knight, The (2008)",0,4.7725911140441895,149,4.238255033557047
91529,"Dark Knight Rises, The (2012)",0,4.751042366027832,76,3.9934210526315783
54286,"Bourne Ultimatum, The (2007)",0,4.729414939880371,81,3.697530864197531
116797,The Imitation Game (2014),0,4.69578742980957,50,4.02
68954,Up (2009),0,4.692851543426514,105,4.004761904761905
72998,Avatar (2009),0,4.669037818908691,97,3.6030927835051543
89745,"Avengers, The (2012)",0,4.662612438201904,69,3.869565217391304
2571,"Matrix, The (1999)",0,4.645972728729248,278,4.192446043165468
