# ** Building a Movie Recommendation Engine with Apache Spark MLlib**

This note is intended to demonstrate using Spark's machine learning library: MLlib and alternating least squares (ALS) method to build a movie recommendation engine. 

The ALS implementation for **Collaborative Filtering** problems is a popular approach to perfectly fit in a recommendation engine. However, collaborative filtering is a costly procedure since it requires updating its model when new user preferences step in and usually there are a lot of users and items (here items are movies). Therefore, having a distributed computation engine, such as Apache Spark, to perform model computation is a must in any real-world recommendation engine. Here we are going to build it.

The main content in this note follows a useful reference: [Movie recommendation](https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#data-set) (at 2014 Spark summit), and a very good tutorial website: [Building a Movie Recommendation Service with Apache Spark & Flask - Part 1](https://www.codementor.io/spark/tutorial/building-a-recommender-with-apache-spark-python-example-app-part1).

## ** The movie rating data **
The movie rating data can be obtained from [Movielens](http://grouplens.org/datasets/movielens/), which provides free data for 100k, 1M, 10M and 20M movie rating records. In this note, we mainly focus on using 1M dataset. There are two datasets we need to use in the CF model, one is the movie rating record, and the other is the movies' profile.


### *Data RDD*

In Spark, we must make all data in RDD format. Let's first load the movie rating records (1M ratings) as a RDD stored in the cache:

In [233]:
rawRatings = sc.textFile('data/ratings_1m.csv',2).map(lambda x: x.replace('\t', ','))
rawRatings.cache()

PythonRDD[30912] at RDD at PythonRDD.scala:43

and load the movie profile as another RDD stored in the cache:

In [234]:
rawMovies = sc.textFile('data/movies_1m.csv',2).map(lambda x: x.replace('\t', ','))
rawMovies.cache()

PythonRDD[30915] at RDD at PythonRDD.scala:43

For the RDDs, we can use "take(3)" method to look at the first three data points. In each point of **rawRatings** RDD, it means **(userID,movieID,rating,timestamp)**.

In [623]:
print rawRatings.take(3)

[u'1::1193::5::978300760', u'1::661::3::978302109', u'1::914::3::978301968']


In each point of **rawMovies** RDD, it describes **(movieID,title,genre)**.

In [622]:
print rawMovies.take(3)

[u"1::Toy Story (1995)::Animation|Children's|Comedy", u"2::Jumanji (1995)::Adventure|Children's|Fantasy", u'3::Grumpier Old Men (1995)::Comedy|Romance']


## **Parse the data**

Note that the data RDDs directly reading from the files are not useful. To perform data analysis, we need to parse the data. In the ALS, we only need the first three columns, **userID, movieID** and **rating**, in **rawRatings** RDD. For recommendation, we also need to parse **rawMovies** RDD and fetch the first two columns, **movieID** and **titles**. But in training model level, only the first RDD is useful. 

In [506]:
from pyspark.mllib.recommendation import Rating
parsedRatings = rawRatings.map(lambda x: x.split('::')).map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2]))).cache()
parsedMovies = rawMovies.map(lambda x: x.split('::')).map(lambda x: (int(x[0]), x[1])).cache()

One can see that now for each user, we explicitly see **userID (user)**, **movieID (product)** and **rating score (rating)**.

In [586]:
print parsedRatings.take(3)
print parsedMovies.take(3)

[Rating(user=1, product=1193, rating=5.0), Rating(user=1, product=661, rating=3.0), Rating(user=1, product=914, rating=3.0)]
[(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]


In [587]:
print type(parsedMovies.take(3)[0])

<type 'tuple'>


Each record is represented in terms of a tuple, and we use 'product' and 'rating' methods to fetch the movie title and rating score. The following shows an example from the second rating record: 

In [495]:
print parsedRatings.take(3)[1].product, parsedRatings.take(3)[1].rating

661 3.0


We can also examined the number of rating records and the numer of movies in the movie profile separately:

In [503]:
print parsedRatings.count()
print parsedMovies.count()

1000209
3883


In [225]:
sc.setCheckpointDir('data/checkpoint/')

## ** Collaborative filtering **

[Collaborative filtering model](http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html) is a machine learning approach to predict unrated movies for the users, and further judge how likely the users would like to watch them or not. It is an iterative process, each of iteration is to perform linear regression with regularization, but both movies features and the users' preferences are unknown. We only have some of users' rating on certain movies (but not all movies).

Suppose there are $\scriptsize N_u$ users and $\scriptsize N_m$ movies, the rating table will be a $ \scriptsize N_u \times N_m$ matrix **R**; $\scriptsize r_{ij}$ is the rating score given by the user-$\scriptsize i$ on the movie-$\scriptsize j$. 
Note $\scriptsize r_{ij}=0$ means the user-$\scriptsize i$ has not rated the movie-$\scriptsize j$ yet.
Here $\scriptsize N_u$ and $\scriptsize N_m$ are usually large numbers; could be up to millions or more.

In ALS, we factorize the big $ \scriptsize N_u \times N_m$ matrix **R** into two lower-rank matrices,  **R=WX**, where dimensions of **W** and **X** are respectively $ \scriptsize N_u \times f$ and $ \scriptsize f\times N_m$, where $\scriptsize f$ is an integer. Each $\scriptsize f$-dimensional row vector $\scriptsize \mathbf{w}^T_i$ of the **W** matrix describes user-$\scriptsize i$, whereas each $\scriptsize f$-dimensional column vector $\scriptsize \mathbf{x}_j$ in the **X** matrix describes movie-$\scriptsize j$. Here $\scriptsize f$ is the number of latent factors; could be up to few hundreds. In the content-based model, these latent factors can be potential characterized as movie types (action, comedy..), years etc. But in the CF model, it is not necessary to have a such analogy. 

The parameters in $\scriptsize \mathbf{w}^T_i$ and $\scriptsize \mathbf{x}_j$ and  are obtained by minimizing 
$$  \min_{\mathbf{x},\mathbf{w}}\sum_{i,j}\Big(r_{ij}-\mathbf{w}^T_i \mathbf{x}_j \Big)^2+\lambda\Big( \sum_i \Vert \mathbf{w}_i \Vert^2 + \sum_j \Vert \mathbf{x}_j \Vert^2   \Big) $$
There are a lot of awesome materials, such as a [presentation slides](http://www.slideshare.net/vidhyamurali/building-data-pipelines-for-music-recommendations-at-spotify?qid=5f59118d-b34d-43ca-8baa-6bad00682c44&v=&b=&from_search=1) from Spotify and Andrew Ng's machine learning [recommendation lectures](https://www.youtube.com/watch?v=gnlq-1Zjh2M&list=PLnnr1O8OWc6ZYcnoNWQignIiP5RRtu3aS), providing more systematic introduction and algorithm details.  Fortunately, Spark MLlib offers [ALS](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS) alorithm to solve the problem. Meanwhile, [here](https://datajobs.com/data-science-repo/Recommender-Systems-[Netflix].pdf) is the research paper about the recommender using matrix factorization. Here we will demonstrate how to manipulate Spark to implement it.

### *Separate data into training, validation and test datasets*

Recall that there are nearly 1M rating records in the RDD:

In [588]:
print parsedRatings.count()

1000209


In [589]:
weights = [.6, .2, .2]
seed = 42
# Use randomSplit with weights and seed
trainData, valData, testData = parsedRatings.randomSplit(weights, seed)

Let's check after the separation, the total number of data points of the datasets doesn't change:

In [240]:
print trainData.count()+valData.count()+testData.count()

1000209


In [590]:
print trainData.take(3), trainData.count()

[Rating(user=1, product=1193, rating=5.0), Rating(user=1, product=661, rating=3.0), Rating(user=1, product=914, rating=3.0)] 599270


**trainData** RDD will be used to train models. Here we additionally parse validation and test dataset for testing our trained model later:

In [428]:
X_Val = valData.map(lambda x: (x.user, x.product))
X_Test = testData.map(lambda x: (x.user, x.product))

In [242]:
print X_Val.take(3)

[(1, 2355), (1, 938), (1, 2797)]


### *Training model*

Here we temporarily focus on $\scriptsize f=8$ and train the ALS model:

In [494]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
import math

seed =5L
iterations = 6
regularization_parameter = 0.1
tolerance = 0.02
model = ALS.train(trainData, rank=8, seed=seed, iterations=6, lambda_=regularization_parameter)

In [246]:
ALS.checkpointInterval=2

In [247]:
model

<pyspark.mllib.recommendation.MatrixFactorizationModel at 0xb0e9c4ec>

### *Model accuracy: RMSE*

We used the training dataset to train the model and now implement the validation dataset **valData** RDD to evaluate the accuracy:

In [248]:
predictions = model.predictAll(X_Val).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = valData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print error

0.881078564893


The **predictions** RDD gives predicted rating scores for each movie and each user in the validation set. For example, our model predicts that userID=821 will rate movieID=1084: $\scriptsize r^{pred}_{821, 1084} \simeq 4.3$.

In [158]:
print predictions.take(3)

[((821, 1084), 4.296423747883905), ((504, 1084), 3.5368562366066496), ((655, 1084), 3.0883837863791395)]


Remind that CF is a supervised learning model. Therefore, for some users' predictions, we may already have the target values $\scriptsize r_{ij}$ and one can compute the root-mean-squared-error (RMSE) to evaluate the model accuracy

$$  \textrm{RMSE} = \frac{1}{N_{r_{ij}\ne 0}} \sum_{i,j; r_{ij} \ne 0} \Big( r_{ij}-r^{pred}_{ij} \Big)^2. $$

We need to withdraw the records in the validation dataset which have target values, like

In [159]:
print valData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).take(3)

[((305, 451), 3.0), ((62, 257), 2.0), ((291, 1042), 4.0)]


Then we can generate **rates_and_preds** RDD, by joining **valData** RDD (using join( ) method) and **predictions** RDD. The new RDD has two tuples for each user, reads as [(userID, movieID),($\scriptsize r_{ij}$, $\scriptsize r^{pred}_{i,j}$)], and the second tuple explicitly indicates both the known target values and the predictions:

In [160]:
print valData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions).take(3)

[((780, 204), (5.0, 3.707398556712158)), ((472, 584), (1.0, 4.316314914799886)), ((109, 365), (4.0, 3.4444997894714335))]


In the above first example, the userID =780 user rated 5.0 on movieID=204, and our model predicted it as 3.7; thus the erro is 1.3. With the new RDD, it is more convenient to compute the RMSE over all observations.

## **Grid Search**

Previously we only focused on $\scriptsize f=8$. In machine learning, we can perform grid search to find the best parameters which minimize the RMSE; different values of $\scriptsize f$ and $\scriptsize \lambda$ provide different models, and grid search is able to help us to search for the optimal parameters.

In [249]:
ranks = [4, 8, 12]
errors = []
min_error = float('inf')
best_rank = -1
best_reg = -1.0
best_iteration = -1
best_model = None

So here we try $\scriptsize f = [4,8,12]$ and $\scriptsize \lambda = [0.001, 0.01, 0.1, 0.2, 1.0]$.

In [250]:
for rank in ranks:
    for reg in [1e-3, 0.01, 0.1, 0.2, 1.0]:
        model = ALS.train(trainData, rank, seed=seed, iterations=iterations, lambda_=reg)
        predictions = model.predictAll(X_Val).map(lambda r: ((r[0], r[1]), r[2]))
        rates_and_preds = valData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
        error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
        errors.append(error)
        print 'For rank %s, reg %s, the RMSE is %s' % (rank, reg, error)
        if error < min_error:
            best_model = model
            min_error = error
            best_rank = rank
            best_reg = reg

print 'The best model was trained with rank %s and reg %s' % (best_rank, best_reg)

For rank 4, reg 0.001, the RMSE is 0.910312338409
For rank 4, reg 0.01, the RMSE is 0.891084251005
For rank 4, reg 0.1, the RMSE is 0.887770473498
For rank 4, reg 0.2, the RMSE is 0.91249944548
For rank 4, reg 1.0, the RMSE is 1.35183397737
For rank 8, reg 0.001, the RMSE is 0.978756192817
For rank 8, reg 0.01, the RMSE is 0.917378365349
For rank 8, reg 0.1, the RMSE is 0.881078564893
For rank 8, reg 0.2, the RMSE is 0.913655162102
For rank 8, reg 1.0, the RMSE is 1.35158770674
For rank 12, reg 0.001, the RMSE is 1.06306140395
For rank 12, reg 0.01, the RMSE is 0.943030373855
For rank 12, reg 0.1, the RMSE is 0.883414721033
For rank 12, reg 0.2, the RMSE is 0.919456329739
For rank 12, reg 1.0, the RMSE is 1.3518822329
The best model was trained with rank 8 and reg 0.1


By the grid search, the optimal parameters are $\scriptsize f=8$ and $\scriptsize \lambda=0.1$, determined by minimal RMSE.

### *Evaluate the optimal model using test dataset*

After using validation dataset to search for optimal parameters, we should use the unbiased test dataset to evaluate the model accuracy. The test dataset RDD is **testData**.

In [426]:
predictions = best_model.predictAll(X_Test).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = testData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'the RMSE for the best model is %s' %  error

the RMSE for the best model is 0.88335878468


The RMSE is similar to that given by validation dataset ($\scriptsize \sim 0.8811$).

In [431]:
print rates_and_preds.take(4)

[((822, 1620), (4.0, 3.4643136588967844)), ((3792, 2315), (1.0, 2.191543239966298)), ((3389, 423), (2.0, 2.3547061157306146)), ((4107, 2014), (3.0, 3.5690930526005222))]


In [433]:
print rates_and_preds.map(lambda x: x[1][1]).min(), rates_and_preds.map(lambda x: x[1][1]).max()

 -0.295762245689 6.02103417518


## **Baseline Model: average ratings for each movie**

The simplest model for the rating problem is to consider the rating scores being independent of users. In this manner, the movie rating is given by the average rating score

$$\textrm{Average}=\frac{\textrm{total rated scores}}{\textrm{number of times being rated}}$$

### *Number of times that the movie has been rated*

Like word counting, we can compute how many distinct movies have been rated. This is the typical map-reduce problem. We first map the **parsedRatings** RDD to ('key', 'value')=(movieID, 1), and reduce by "key" = "movieID":

In [591]:
from operator import add

In [514]:
movie_ID_counts = parsedRatings.map(lambda x: (x[1], 1)).reduceByKey(add)
print movie_ID_counts.take(4)

[(2, 701), (4, 170), (6, 940), (8, 68)]


This means movieID=2 has been rated 701 times, movieID=4 appears 170 times... We can examine if our mapreduce works in the right way. We can sum the number of rated times each movies and compare to that from the primitive dataset **parsedRatings** RDD.

In [508]:
print movie_ID_counts.map(lambda x: x[1]).sum()  ## check total number of rated movies
print movie_ID_counts.count()                    ## how many of distinct movies
print parsedMovies.count()

1000209
3706
3883


'1000209' is the number of rating records, and the number of rated movies is '3706'. Thus, there are **177** unrated movies. We can prepare the dictionary which has {("movie": "frequency")...}. For example, (2:701) gives that the number of times for movieID=2 movie being rated is 701. 

In [515]:
movie_ID_counts = dict(movie_ID_counts.collect())
print movie_ID_counts[2]

701


### *Total rating scores for each movie*

Still perform map-reduce, but we map to the tuple ('key', 'rated score'):

In [401]:
movie_ID_sumratings = parsedRatings.map(lambda x: (x[1], x[2])).reduceByKey(add)

In [402]:
movie_ID_sumratings.take(4)  

[(2, 2244.0), (4, 464.0), (6, 3646.0), (8, 205.0)]

In [397]:
print parsedRatings.take(4)

[Rating(user=1, product=1193, rating=5.0), Rating(user=1, product=661, rating=3.0), Rating(user=1, product=914, rating=3.0), Rating(user=1, product=3408, rating=4.0)]


### *Average rating scores for each movie*

The following is to compute the average rating for each movie:

In [403]:
movie_ID_avgratings = parsedRatings.map(lambda x: (x[1], x[2]/float(movie_ID_count[x[1]]))).reduceByKey(add)

In [516]:
print movie_ID_avgratings.take(4)

[(2, 3.201141226818817), (4, 2.7294117647058806), (6, 3.878723404255302), (8, 3.0147058823529407)]


We can check the rating range; minimum is 1.0, and maximum is 5.0:

In [408]:
print movie_ID_avgratings.map(lambda x: x[1]).min(), movie_ID_avgratings.map(lambda x: x[1]).max()

1.0 5.0


## **Introduce new user data**

Suppose now we have a new user, whose ID =0, and has provided the some movies rating scores:

In [326]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     Rating(0,260,4), # Star Wars (1977)
     Rating(0,1,3), # Toy Story (1995)
     Rating(0,16,3), # Casino (1995)
     Rating(0,25,4), # Leaving Las Vegas (1995)
     Rating(0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     Rating(0,335,1), # Underneath (1995)
     Rating(0,379,1), # Timecop (1994)
     Rating(0,296,3), # Pulp Fiction (1994)
     Rating(0,858,5) , # Godfather, The (1972)
     Rating(0,50,4) # Usual Suspects, The (1995)
    ]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'New user ratings: %s' % new_user_ratings_RDD.take(10)

New user ratings: [Rating(user=0, product=260, rating=4.0), Rating(user=0, product=1, rating=3.0), Rating(user=0, product=16, rating=3.0), Rating(user=0, product=25, rating=4.0), Rating(user=0, product=32, rating=4.0), Rating(user=0, product=335, rating=1.0), Rating(user=0, product=379, rating=1.0), Rating(user=0, product=296, rating=3.0), Rating(user=0, product=858, rating=5.0), Rating(user=0, product=50, rating=4.0)]


### *Use union( ) method to incorporate the data RDD*

How can we incorporate the new user's data into the previous 1M rating data RDD **parsedRatings**? Using union( ) method: 

   **incorporatedData_RDD = old_RDD.union(new_RDD)**

In [520]:
newRatings = parsedRatings.union(new_user_ratings_RDD)

With the new 10 rating records, we can count, now the number of rating records becomes:

In [521]:
print newRatings.count()

1000219


Note that the new rating records are all arranged behind the old data. The first few records are still the same:

In [519]:
print newRatings.take(4)

[Rating(user=196, product=242, rating=3.0), Rating(user=186, product=302, rating=3.0), Rating(user=22, product=377, rating=1.0), Rating(user=244, product=51, rating=2.0)]


But the last four records are 

In [526]:
print newRatings.collect()[-4:]

[Rating(user=0, product=379, rating=1.0), Rating(user=0, product=296, rating=3.0), Rating(user=0, product=858, rating=5.0), Rating(user=0, product=50, rating=4.0)]


## **Efficiency of training a model with new user data**

### *Train a new model*

Note that when everytime adding new user data, training the ALS model is not efficient. But we temporarily ignore this. Now with the new data, let us try to train a new model with the new rating data:

In [608]:
from time import time

t0 = time()
new_user_model = ALS.train(newRatings, best_rank, seed=seed, 
                              iterations=6, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

New model trained in 7.008 seconds


We can see that it takes 7 seconds to train a new model and the time to train the model is not neglectable at all. Therefore we don't have to do the batch process everytime when there is new data coming. Next let's evaluate the new model accuracy.

In [596]:
X_newUser = new_user_ratings_RDD.map(lambda x: (x[0], x[1]))
print X_newUser.collect()

[(0, 260), (0, 1), (0, 16), (0, 25), (0, 32), (0, 335), (0, 379), (0, 296), (0, 858), (0, 50)]


The predictions are

In [415]:
predictions = new_user_model.predictAll(X_newUser).map(lambda r: ((r[0], r[1]), r[2]))
print predictions.take(10)
print predictions.count()

[((0, 1), 3.1546767428626934), ((0, 379), 1.5156141222677617), ((0, 858), 3.8448698787876063), ((0, 296), 3.851785411064457), ((0, 50), 3.819495347810666), ((0, 16), 3.1210757203890944), ((0, 260), 3.4734090371574187), ((0, 25), 3.1972075449144137), ((0, 335), 2.089996427991622), ((0, 32), 3.1483276475325024)]
10


With the predictions, we can compare predicted results by the new ALS model and the known rating:

In [528]:
rates_and_preds = new_user_ratings_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
print rates_and_preds.take(4)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'the RMSE for the best model is %s' %  error

[((0, 32), (4.0, 3.1483276475325024)), ((0, 858), (5.0, 3.8448698787876063)), ((0, 50), (4.0, 3.819495347810666)), ((0, 16), (3.0, 3.1210757203890944))]
the RMSE for the best model is 0.723332554079


### *Use the old model to predict?*

What about instead training a new model, can we directly use the old (optimal) model to predict the rating?

In [614]:
predictions = best_model.predictAll(X_newUser).map(lambda r: ((r[0], r[1]), r[2]))
print predictions.take(10)

[]


Note that here we are unable to directly use the old model (**best_model**) to predict the rating, since the old model doesn't have 'userID=0' user. Thus the resulting predictions RDD are zero. To solve this problem, we should resort to the **similarity function** to see which other users are close to userID=0, and then recommend the movies which the new user has not watched yet but were rated with high scores by the other users to the new user.

## **Recommend movies to the new user**

Our goal is to recommend movies, which have not been watched (rated) by userID =0, but the new user is likely to watch.

### *Prepare the unrated movie list*

In [418]:
print parsedMovies.take(3), parsedMovies.count()

[(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')] 3883


We first have rated movie list, and then **filter out** unrated movies:

In [536]:
new_user_rated_Movie_ids = new_user_ratings_RDD.map(lambda x: x[1]).collect() # get rated movie IDs
print new_user_rated_Movie_ids

new_user_unrated_movies = parsedMovies.filter(lambda x: x[0] not in new_user_rated_Movie_ids).\
                           map(lambda x: (new_user_ID, x[0]))

[260, 1, 16, 25, 32, 335, 379, 296, 858, 50]


In [537]:
print new_user_unrated_movies.take(10), new_user_unrated_movies.count()

[(0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0, 10), (0, 11)] 3873


### *Use the trained model to predict the unrated movies*

Now we have unrated movie list; the **new_user_unrated_moives** RDD is the list where movies have not been rated by the new user (userID=0). There are 3873 unrated movives, and we would like to recommend the movies among the list and having high scores to the new user. We used the new trained model **new_user_model** to predict the rating scores for the unrated movies, and denote the predictions as **new_user_recommendations** RDD.

In [538]:
# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations = new_user_model.predictAll(new_user_unrated_movies)
print new_user_recommendations.take(6)[:]
print new_user_recommendations.count()

[Rating(user=0, product=1084, rating=3.150378823864287), Rating(user=0, product=3586, rating=2.587454495625936), Rating(user=0, product=3702, rating=2.7462810089207728), Rating(user=0, product=3007, rating=3.439501452462919), Rating(user=0, product=667, rating=0.8629082469663336), Rating(user=0, product=1053, rating=1.8922340026800455)]
3696


Next step is to incorporate **new_user_recommendations** RDD and the movie titles:

In [539]:
new_user_recommendations = new_user_recommendations.map(lambda x: (x.product, x.rating)).join(parsedMovies)

In [540]:
print new_user_recommendations.take(4)

[(2049, (1.847318538896483, u'Happiest Millionaire, The (1967)')), (3, (1.924401278509853, u'Grumpier Old Men (1995)')), (2052, (1.0800896001797606, u'Hocus Pocus (1993)')), (6, (3.147812047246716, u'Heat (1995)'))]


To be more concise, we parse the RDD and include the number of times the movies to be previously rated:

In [541]:
new_user_recommendations = new_user_recommendations.map(lambda x: (x[0], x[1][1], x[1][0], movie_ID_counts[x[0]]))
print new_user_recommendations.take(4)

[(2049, u'Happiest Millionaire, The (1967)', 1.847318538896483, 37), (3, u'Grumpier Old Men (1995)', 1.924401278509853, 478), (2052, u'Hocus Pocus (1993)', 1.0800896001797606, 183), (6, u'Heat (1995)', 3.147812047246716, 940)]


Now in **new_user_recommendations** RDD, each element indicates (**Moive id, title, predicted rating, counts** (from history)). There are few strategies to rank the movies:

### *Recommend movies by naive rating scores*

In Spark, we can use the [soryBy( )](https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#sortBy) method to sort the rating scores:

In [563]:
sortedRating_new_user_recommd = new_user_recommendations.sortBy(lambda x: -x[2])

In [474]:
print sorted_new_user_recommd.count()

3696


Note in the recommended movie list, there are 3696 = 3706-10 movies, where **movie_ID_counts** RDD has 3706 distinct rated movies (in primitive movie profile **parsedMovies** RDD, there are 3883 movies; this means 177 movies have never been rated). The following is the top 10 predicted rating movies:

In [562]:
for id, name, rating, count in sortedRating_new_user_recommd.take(10)[:]:
    print id, name, rating, count

2309 Inheritors, The (Die Siebtelbauern) (1998) 4.47033313001 2
572 Foreign Student (1994) 4.12571469048 2
2760 Gambler, The (A J�t�kos) (1997) 4.1165612064 7
1851 Leather Jacket Love Story (1997) 4.10416077163 2
2198 Modulations (1998) 4.09171776958 2
3236 Zachariah (1971) 4.02226761275 2
3522 Sacco and Vanzetti (Sacco e Vanzetti) (1971) 4.00766146452 3
2858 American Beauty (1999) 3.93179378927 3428
2999 Man of the Century (1999) 3.89898425827 4
318 Shawshank Redemption, The (1994) 3.85251953868 2227


The predicted rating scores' range is 

In [573]:
print sortedRating_new_user_recommd.map(lambda x: x[2]).min()
print sortedRating_new_user_recommd.map(lambda x: x[2]).max()

-0.607967817091
4.47033313001


### *Spark SQL*

We can present the output in a more neat way by using [Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection) and converting the **sortedRating_new_user_recommd** RDD to dataframe:

In [572]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
schema = sqlContext.createDataFrame(sortedRating_new_user_recommd)
schema.registerTempTable("naive_rating")
pd = sqlContext.sql("SELECT * from naive_rating limit 10")
pd.show()

_1   _2                   _3                 _4  
2309 Inheritors, The (... 4.4703331300068045 2   
572  Foreign Student (... 4.125714690476534  2   
2760 Gambler, The (A J... 4.116561206401522  7   
1851 Leather Jacket Lo... 4.104160771626819  2   
2198 Modulations (1998)   4.091717769583291  2   
3236 Zachariah (1971)     4.02226761274766   2   
3522 Sacco and Vanzett... 4.007661464515667  3   
2858 American Beauty (... 3.931793789266366  3428
2999 Man of the Centur... 3.898984258265619  4   
318  Shawshank Redempt... 3.852519538676051  2227


In [583]:
print type(pd)

<class 'pyspark.sql.dataframe.DataFrame'>


By the naive rating scores ranking, we see the top-1 movie (movie=2309) was only rated twices. This doesn't really mean that, this movie is good and will be worthy to recommend. Thus such naive ranking way is not fair. Note that the Spark SQL is in the **pyspark.sql.dataframe** class. 

In the following, we consider other evaluation strategies.

### *Recommend movies by the number of times being rated*

Here we consider to rank the movies by the number of rating times:

In [466]:
sortedCount_new_user_recommd = new_user_recommendations.sortBy(lambda x: -x[3])

In [580]:
schema = sqlContext.createDataFrame(sortedCount_new_user_recommd)
schema.registerTempTable("counts")
pd = sqlContext.sql("SELECT * from counts limit 10")
pd.show() 

_1   _2                   _3                 _4  
2858 American Beauty (... 3.931793789266366  3428
1196 Star Wars: Episod... 3.338165554219379  2990
1210 Star Wars: Episod... 3.0646767124418197 2883
480  Jurassic Park (1993) 2.551643263248053  2672
2028 Saving Private Ry... 3.5307875320119755 2653
589  Terminator 2: Jud... 2.9358652867066897 2649
2571 Matrix, The (1999)   3.271048433099926  2590
1270 Back to the Futur... 3.001712144691443  2583
593  Silence of the La... 3.6216257757183246 2578
1580 Men in Black (1997)  2.4587206291779373 2538


Now the last column denotes the number of rating times for the corresponding movie.

### *Recommend movies by the rating socres but with a threshold counting*

The more fair and better way is to rank the movies by the rating scores, as well as considering enough number of rated times:

In [578]:
sortedRating_new_user_recommd_2 = new_user_recommendations.filter(lambda x: x[3]>=25).sortBy(lambda x: -x[2])

In [581]:
schema = sqlContext.createDataFrame(sortedRating_new_user_recommd_2)
schema.registerTempTable("rating")
pd = sqlContext.sql("SELECT * from rating limit 10")
pd.show()     

_1   _2                   _3                 _4  
2858 American Beauty (... 3.931793789266366  3428
318  Shawshank Redempt... 3.852519538676051  2227
1193 One Flew Over the... 3.7951673607346383 1725
2324 Life Is Beautiful... 3.7074607470296477 1152
1213 GoodFellas (1990)    3.6947402082308813 1657
527  Schindler's List ... 3.690192661141925  2304
2905 Sanjuro (1962)       3.6849196517424017 69  
2494 Last Days, The (1... 3.6426480975017337 27  
1221 Godfather: Part I... 3.635764413703111  1692
593  Silence of the La... 3.6216257757183246 2578


By this convention, one can recommend **movieID =2858, 'American Beauty..'** first, and then **movieID=318, 'Shawshank Red..'** to the new user. ML tells us that the new user is more likely to watch these higher-rating movies than others. Meanwhile, we can roughly see that the top-5 recommended movies also have higher numbers of rating times (meaning they have been more frequently watched) too.

## **Persisting (saving) the model**

Here we consider a trick. Think about if training a model takes a lot time, we can persist the model in the server. You will see in your filesystem that there are folder with product and user data into **Parquet** format files.

In [618]:
from pyspark.mllib.recommendation import MatrixFactorizationModel
import os

model_path = 'data/models_1/movie_lens_als'
# Save and load model
new_user_model.save(sc, model_path)

In [619]:
same_model = MatrixFactorizationModel.load(sc, model_path)

After saving in the server, and call out to use, we still have the same prediction accuracy (as using **new_user_model**).

In [621]:
predictions = same_model.predictAll(X_newUser).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = new_user_ratings_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'the RMSE for the best model is %s' %  error

the RMSE for the best model is 0.73267943946


## ** Data size dependence**

### *100k rating records*

Here we just show that, if dataset is not sufficiently big, the prediction will be less accurate.

In [543]:
ratings_100k = sc.textFile('data/ratings.csv',2).map(lambda x: x.replace('\t', ',')).map(lambda x: x.split(',')).\
             map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2]))).cache()
print ratings_100k.count()

100000


In [553]:
trainData, valData, testData = ratings_100k.randomSplit(weights, seed)

In [554]:
t0 = time()
model_100k = ALS.train(trainData, best_rank, seed=seed, 
                              iterations=6, lambda_=regularization_parameter)
t_100k = time() - t0

print "New model trained in %s seconds" % round(t_100k,3)

New model trained in 1.599 seconds


In [558]:
predictions = model_100k.predictAll(valData.map(lambda x: (x[0], x[1]))).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = valData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'the RMSE for the best model is %s' %  error

the RMSE for the best model is 0.955833042749


In [559]:
predictions = model_100k.predictAll(testData.map(lambda x: (x[0], x[1]))).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = testData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'the RMSE for the best model is %s' %  error

the RMSE for the best model is 0.94899654139


We see that comparing to 1M rating data, the model using 100k dataset shows larger errors.