# Goal
This notebook will demonstrate how to use Apache Spark to recommend movies to a user. I will start with some basic techniques, and then use the Spark ML library's Alternating Least Squares method to make more sophisticated predictions.

# Summary of Data Set
This dataset describes 5-star rating and free-text tagging activity from MovieLens, a movie recommendation service. It contains 20000263 ratings and 465564 tag applications across 27278 movies. These data were created by 138493 users between January 09, 1995 and March 31, 2015. This dataset was generated on October 17, 2016.

Users were selected at random for inclusion. All selected users had rated at least 20 movies. No demographic information is included. Each user is represented by an id, and no other information is provided.
[Link of Data Set](http://grouplens.org/datasets/movielens/)

## Reference
F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4, Article 19 (December 2015), 19 pages. DOI=http://dx.doi.org/10.1145/2827872

## Load and Cache
* Why do we need to call cache? [See the perfect answer](http://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd)

In [1]:
ratings_filename = 'ratings.csv'
movies_filename = 'movies.csv'

from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
)

In [2]:
# Read rating data
raw_ratings_df = sqlContext.read.format('com.databricks.spark.csv')\
                           .options(header=True, inferSchema=False)\
                           .schema(ratings_df_schema).load(ratings_filename)
ratings_df = raw_ratings_df.drop('Timestamp')
ratings_df.cache()

# Read movie data
raw_movies_df = sqlContext.read.format('com.databricks.spark.csv')\
                          .options(header=True, inferSchema=False)\
                          .schema(movies_df_schema).load(movies_filename)
movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')
movies_df.cache()

DataFrame[ID: int, title: string]

In [7]:
print "There are {} ratings and {} movies in the datasets.".format(ratings_df.count(), movies_df.count())

There are 20000263 ratings and 27278 movies in the datasets.


In [8]:
print "Ratings:"
ratings_df.show(3)

Ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
+------+-------+------+
only showing top 3 rows



In [9]:
print "Movies:"
movies_df.show(3)

Movies:
+---+--------------------+
| ID|               title|
+---+--------------------+
|  1|    Toy Story (1995)|
|  2|      Jumanji (1995)|
|  3|Grumpier Old Men ...|
+---+--------------------+
only showing top 3 rows



## Basic Recommendations
One way to recommend movies is to always recommend the movies with the highest average rating. In this part, I will use Spark to find the name, number of ratings, and the average rating of the 20 movies with the highest average rating and at least 500 reviews. We should filter our movies with high ratings but greater than or equal to 500 reviews because movies with few reviews may not have broad appeal to everyone.

In [11]:
from pyspark.sql import functions as F
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId')\
                                          .agg(F.count(ratings_df.rating).alias("count"), 
                                               F.avg(ratings_df.rating).alias("average"))
print 'movie_ids_with_avg_ratings_df:'
movie_ids_with_avg_ratings_df.show(3, truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|3997   |2047 |2.0703468490473864|
|1580   |35580|3.55831928049466  |
|3918   |1246 |2.918940609951846 |
+-------+-----+------------------+
only showing top 3 rows



In [12]:
movie_names_df = movie_ids_with_avg_ratings_df.join(movies_df, movie_ids_with_avg_ratings_df.movieId==movies_df.ID, 'inner')
movie_names_with_avg_ratings_df = movie_names_df.select(['movieId', 'title', 'count', 'average'])\
                                                .sort('average', ascending=False)

del movie_names_df
print 'movie_names_with_avg_ratings_df:'
movie_names_with_avg_ratings_df.show(3, truncate=False)

movie_names_with_avg_ratings_df:
+-------+-----------------------------------------------+-----+-------+
|movieId|title                                          |count|average|
+-------+-----------------------------------------------+-----+-------+
|125599 |Always for Pleasure (1978)                     |1    |5.0    |
|104317 |Flight of the Conchords: A Texan Odyssey (2006)|1    |5.0    |
|126219 |Marihuana (1936)                               |1    |5.0    |
+-------+-----------------------------------------------+-----+-------+
only showing top 3 rows



In [13]:
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter("count >= 500")
print 'Movies with highest ratings:'
movies_with_500_ratings_or_more.show(3, truncate=False)

Movies with highest ratings:
+-------+--------------------------------+-----+-----------------+
|movieId|title                           |count|average          |
+-------+--------------------------------+-----+-----------------+
|318    |Shawshank Redemption, The (1994)|63366|4.446990499637029|
|858    |Godfather, The (1972)           |41355|4.364732196832306|
|50     |Usual Suspects, The (1995)      |47006|4.334372207803259|
+-------+--------------------------------+-----+-----------------+
only showing top 3 rows



# Collaborative Filtering

<img src="https://courses.edx.org/c4x/BerkeleyX/CS100.1x/asset/Collaborative_filtering.gif" alt="collaborative filtering" style="float: right;height: 300px"/>
Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly

The image at the right (from Wikipedia) shows an example of predicting of the user's rating using collaborative filtering. At first, people rate different items (like videos, images, games). After that, the system is making predictions about a user's rating for an item, which the user has not rated yet. These predictions are built upon the existing ratings of other users, who have similar ratings with the active user. For instance, in the image below the system has made a prediction, that the active user will not like the video.

*Reference: [ALS 在 Spark MLlib 中的实现](http://mp.weixin.qq.com/s?__biz=MjM5MjAwODM4MA==&mid=206741946&idx=1&sn=cd35e124595d9d998b54a1700296419d#rd)*

## Data Preparation

In [15]:
seed = 1800009193L
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6, 0.2, 0.2], seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print 'Training: {}'.format(training_df.count())
training_df.show(3)
print 'Validation: {}'.format(validation_df.count())
validation_df.show(3)
print 'Test: {}'.format(test_df.count())
test_df.show(3)

Training: 11998918
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     47|   3.5|
+------+-------+------+
only showing top 3 rows

Validation: 4001830
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     32|   3.5|
|     1|    253|   4.0|
|     1|    293|   4.0|
+------+-------+------+
only showing top 3 rows

Test: 3999515
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    318|   4.0|
+------+-------+------+
only showing top 3 rows



### Alternating Least Squares
To determine the best values for the parameters, we will use ALS to train several models, and then we will select the best model and use the parameters from that model in the rest of this notebook.

#### Why are we doing our own cross-validation?
A challenge for collaborative filtering is how to provide ratings to a new user (a user who has not provided any ratings at all). Some recommendation systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings), while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN value when asked to provide a rating for a new user.

Using the ML Pipeline's CrossValidator with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds (e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result, ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make all of the parameters in the parameter grid appear to be equally good (or bad).

This issue has been discussed on [Spark JIRA 14489](https://issues.apache.org/jira/browse/SPARK-14489). There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues.

**For a production application, we should consider the tradeoffs in how to handle new users.**

In [47]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Let's initialize our ALS learner
als = ALS()

# Set the parameters for ALS
als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating')

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

ranks = [4, 8, 12]
min_error = float('inf')
best_model = None
for rank in ranks:
    als.setRank(rank)
    # Create the model with these parameters.
    model = als.fit(training_df)
    # Predict against the validation_df.
    predict_df = model.transform(validation_df)
    # Remove NaN values from prediction
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

    error = reg_eval.evaluate(predicted_ratings_df)
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_model = model

print 'The best model was trained with rank {}'.format(best_model.rank)

For rank 4 the RMSE is 0.827825178948
For rank 8 the RMSE is 0.815401279892
For rank 12 the RMSE is 0.809404403199
The best model was trained with rank 12


Let's test our model to see if this model would be overfitting.

In [24]:
predict_df = best_model.transform(test_df)

# Remove NaN values from prediction
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))
del predict_df

# Evaluation
test_RMSE = reg_eval.evaluate(predicted_test_df)

print 'The model had a RMSE on the test set of {}'.format(test_RMSE)

The model had a RMSE on the test set of 0.808681089623


## Predictions for Myself

In [26]:
my_user_id = 0

my_rated_movies = [
     (my_user_id, 858, 4.0),
     (my_user_id, 7502, 4.8),
     (my_user_id, 58559, 5.0)
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print 'My movie ratings:'
my_ratings_df.show(3, truncate=False)

My movie ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|0     |858    |4.0   |
|0     |7502   |4.8   |
|0     |58559  |5.0   |
+------+-------+------+



Adding my rating data into training dataset

In [29]:
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)
print 'The training dataset now has {} more entries than before.'.format(training_with_my_ratings_df.count() - training_df.count())

The training dataset now has 3 more entries than before.


Training my model.

In [30]:
rank = 12
my_ratings_model = als.setRank(rank).fit(training_with_my_ratings_df)

Evaluating my model on test data.

In [39]:
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))
del my_predict_df

test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print 'The model had a RMSE on the test set of {}'.format(test_RMSE_my_ratings)

The model had a RMSE on the test set of 0.809872586652


Getting the top 5 recommened movies from my model.

In [42]:
# Filter out the movies I already rated.
not_rated_df = movies_df.filter(~movies_df.ID.isin([x[1] for x in my_rated_movies]))

# Rename the "ID" column to be "movieId"
my_unrated_movies_df = not_rated_df.withColumnRenamed('ID', 'movieId')

# Add a new column, userId, with my_user_id
my_unrated_movies_df = my_unrated_movies_df.withColumn('userId', F.lit(my_user_id))

# Use my_rating_model to predict ratings for the movies that I did not manually rate.
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))
del raw_predicted_ratings_df

predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df, ["movieId", "title"], "inner")
predicted_highest_rated_movies_df = predicted_with_counts_df.filter('count > 75')\
                                                            .select(['movieId', 'title', 'prediction', 
                                                                     'average', 'count'])\
                                                            .sort('prediction', ascending=False)

In [48]:
print 'My top 5 rated movies as predicted:'
predicted_highest_rated_movies_df.show(5, truncate=False)

My top 5 rated movies as predicted:
+-------+---------------------------------------------------------+----------+-----------------+-----+
|movieId|title                                                    |prediction|average          |count|
+-------+---------------------------------------------------------+----------+-----------------+-----+
|7153   |Lord of the Rings: The Return of the King, The (2003)    |4.6338167 |4.14238211356367 |31577|
|79132  |Inception (2010)                                         |4.5958385 |4.156172003137702|14023|
|318    |Shawshank Redemption, The (1994)                         |4.593068  |4.446990499637029|63366|
|4993   |Lord of the Rings: The Fellowship of the Ring, The (2001)|4.576647  |4.137925065906852|37553|
|5952   |Lord of the Rings: The Two Towers, The (2002)            |4.5753546 |4.107520546734616|33947|
+-------+---------------------------------------------------------+----------+-----------------+-----+
only showing top 5 rows

