
# **Predicting Movie Ratings using Collaborative Filtering**
#### According to Wikipedia, collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly. We will use a subset dataset of 500,000 ratings from the [movielens 10M stable benchmark rating dataset](http://grouplens.org/datasets/movielens/). 

In [2]:
import sys
import os
from test_helper import Test

baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab4', 'small')

ratingsFilename = os.path.join(baseDir, inputPath, 'ratings.dat.gz')
moviesFilename = os.path.join(baseDir, inputPath, 'movies.dat')

### **Part 0: Parsing the data**
#### We read in each of the files and create an RDD consisting of parsed lines.
#### Each line in the ratings dataset (`ratings.dat.gz`) is formatted as:
####   `UserID::MovieID::Rating::Timestamp`
#### Each line in the movies (`movies.dat`) dataset is formatted as:
####   `MovieID::Title::Genres`
#### The `Genres` field has the format
####   `Genres1|Genres2|Genres3|...`

#### Parsing the two files yields two RDDS
* #### For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating).  
* #### For each line in the movies dataset, we create a tuple of (MovieID, Title).  

In [3]:
numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)

def get_ratings_tuple(entry):
    """ Parse a line in the ratings dataset
    Args:
        entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
    Returns:
        tuple: (UserID, MovieID, Rating)
    """
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])


def get_movie_tuple(entry):
    """ Parse a line in the movies dataset
    Args:
        entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
    Returns:
        tuple: (MovieID, Title)
    """
    items = entry.split('::')
    return int(items[0]), items[1]


ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)

assert ratingsCount == 487650
assert moviesCount == 3883
assert moviesRDD.filter(lambda (id, title): title == 'Toy Story (1995)').count() == 1
assert (ratingsRDD.takeOrdered(1, key=lambda (user, movie, rating): movie)
        == [(1, 1, 5.0)])

There are 487650 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 914, 3.0), (1, 2355, 5.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]


#### We sort the RDD by *both the key and value*, which we can do by combining the key and value into a single string and then sorting on that string. 

In [1]:
def sortFunction(tuple):
    """ Construct the sort string (does not perform actual sorting)
    Args:
        tuple: (rating, MovieName)
    Returns:
        sortString: the value to sort with, 'rating MovieName'
    """
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)


### **Part 1: Basic Recommendations**
#### One way to recommend movies is to always recommend the movies with the highest average rating. In this part, we will find the name, number of ratings, and the average rating of the 20 movies with the highest average rating and more than 500 reviews. We want to filter our movies with high ratings but fewer than or equal to 500 reviews because movies with few reviews may not have broad appeal to everyone.

#### **(1a) Number of Ratings and Average Ratings for a Movie**
#### The following function `getCountsAndAverages()` takes a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...)) and returns a tuple of (MovieID, (number of ratings, averageRating)). 

In [7]:
 
import math
# First, implement a helper function `getCountsAndAverages` using only Python
def getCountsAndAverages(IDandRatingsTuple):
    """ Calculate average rating
    Args:
        IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
    Returns:
        tuple: a tuple of (MovieID, (number of ratings, averageRating))
    """
    nr= len(IDandRatingsTuple[1])
    av= float(sum(IDandRatingsTuple[1]))/nr
    id = IDandRatingsTuple[0]
    return (id, (nr,av))
print getCountsAndAverages((1, (1, 2, 3, 4)))

(1, (4, 2.5))


#### **(1b) Movies with Highest Average Ratings**
#### We will use the `getCountsAndAverages()` helper function with Spark to determine movies with highest average ratings.


In [9]:
 
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = (ratingsRDD
                          .map(lambda x: (x[1],x[2]))
                          .groupByKey())
print 'movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3)

# Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda x: getCountsAndAverages(x))
print 'movieIDsWithAvgRatingsRDD: %s\n' % movieIDsWithAvgRatingsRDD.take(3)

# To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
movieNameWithAvgRatingsRDD = (moviesRDD
                              .join(movieIDsWithAvgRatingsRDD)
                              .map(lambda x: (x[1]))
                              .map(lambda x: (x[1][1],x[0],x[1][0]))
                              )
print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)

movieIDsWithRatingsRDD: [(2, <pyspark.resultiterable.ResultIterable object at 0xb1f9776c>), (4, <pyspark.resultiterable.ResultIterable object at 0xb1f97c2c>), (6, <pyspark.resultiterable.ResultIterable object at 0xb1f97c4c>)]

movieIDsWithAvgRatingsRDD: [(2, (332, 3.174698795180723)), (4, (71, 2.676056338028169)), (6, (442, 3.7918552036199094))]

movieNameWithAvgRatingsRDD: [(3.6818181818181817, u'Happiest Millionaire, The (1967)', 22), (3.0468227424749164, u'Grumpier Old Men (1995)', 299), (2.882978723404255, u'Hocus Pocus (1993)', 94)]



#### **(1c) Movies with Highest Average Ratings and more than 500 reviews**
#### The 20 movies with highest average ratings and more than 500 reviews.

In [12]:
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda x: x[2]>500)
                                    .sortBy(sortFunction, False))
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)

Movies with highest ratings: [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, u"Schindler's List (1993)", 1171), (4.512893982808023, u'Godfather, The (1972)', 1047), (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195), (4.505415162454874, u'Usual Suspects, The (1995)', 831), (4.457256461232604, u'Rear Window (1954)', 503), (4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651), (4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447), (4.4, u'Sixth Sense, The (1999)', 1110), (4.394285714285714, u'North by Northwest (1959)', 700), (4.379506641366224, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776), (4.363975155279503, u'Godfather: Part II, The (1974)', 805), (4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811), (4.358173076923077, u'Silence of the Lambs, The (1991)', 1248), (4.335826477187734, u'Saving Private Ryan (1998)', 1337), (4.32624113475177

## **Part 2: Collaborative Filtering**
#### Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly.  
#### The image below (from [Wikipedia][collab]) shows an example of predicting of the user's rating using collaborative filtering. At first, people rate different items (like videos, images, games). After that, the system is making predictions about a user's rating for an item, which the user has not rated yet. These predictions are built upon the existing ratings of other users, who have similar ratings with the active user. For instance, in the image below the system has made a prediction, that the active user will not like the video.
![collaborative filtering](https://courses.edx.org/c4x/BerkeleyX/CS100.1x/asset/Collaborative_filtering.gif)
[mllib]: https://spark.apache.org/mllib/
[collab]: https://en.wikipedia.org/?title=Collaborative_filtering
[collab2]: http://recommender-systems.org/collaborative-filtering/

#### For movie recommendations, we start with a matrix whose entries are movie ratings by users (shown in red in the diagram below).  Each column represents a user (shown in green) and each row represents a particular movie (shown in blue).
#### Since not all users have rated all movies, we do not know all of the entries in this matrix, which is precisely why we need collaborative filtering.  For each user, we have ratings for only a subset of the movies.  With collaborative filtering, the idea is to approximate the ratings matrix by factorizing it as the product of two matrices: one that describes properties of each user (shown in green), and one that describes properties of each movie (shown in blue).
![factorization](http://spark-mooc.github.io/web-assets/images/matrix_factorization.png)
#### We want to select these two matrices such that the error for the users/movie pairs where we know the correct ratings is minimized.  The [Alternating Least Squares][als] algorithm does this by first randomly filling the users matrix with values and then optimizing the value of the movies such that the error is minimized.  Then, it holds the movies matrix constrant and optimizes the value of the user's matrix.  This alternation between which matrix to optimize is the reason for the "alternating" in the name.
#### This optimization is what's being shown on the right in the image above.  Given a fixed set of user factors (i.e., values in the users matrix), we use the known ratings to find the best values for the movie factors using the optimization written at the bottom of the figure.  Then we "alternate" and pick the best user factors given fixed movie factors.

#### **(2a) Creating a Training Set**
 

In [14]:
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

assert trainingRDD.count() == 292716
assert validationRDD.count() == 96902
assert testRDD.count() == 98032

assert trainingRDD.filter(lambda t: t == (1, 914, 3.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 2355, 5.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 595, 5.0)).count() == 1

assert validationRDD.filter(lambda t: t == (1, 1287, 5.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 594, 4.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 1270, 5.0)).count() == 1

assert testRDD.filter(lambda t: t == (1, 1193, 5.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 2398, 4.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 1035, 5.0)).count() == 1

Training: 292716, validation: 96902, test: 98032

[(1, 914, 3.0), (1, 2355, 5.0), (1, 595, 5.0)]
[(1, 1287, 5.0), (1, 594, 4.0), (1, 1270, 5.0)]
[(1, 1193, 5.0), (1, 2398, 4.0), (1, 1035, 5.0)]


#### After splitting the dataset, your training set has about 293,000 entries and the validation and test sets each have about 97,000 entries.

#### **(2b) Root Mean Square Error (RMSE)**


#### Given two ratings RDDs, *x* and *y* of size *n*, we define RSME as follows: $ RMSE = \sqrt{\frac{\sum_{i = 1}^{n} (x_i - y_i)^2}{n}}$


In [2]:
 
import math

def computeError(predictedRDD, actualRDD):
    """ Compute the root mean squared error between predicted and actual
    Args:
        predictedRDD: predicted ratings for each movie and each user where each entry is in the form
                      (UserID, MovieID, Rating)
        actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
    Returns:
        RSME (float): computed RSME value
    """
    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = predictedRDD.map(lambda x: ((x[0],x[1]),x[2]))
    print predictedReformattedRDD.top(3)
    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = actualRDD.map(lambda x: ((x[0],x[1]),x[2]))
    print actualReformattedRDD.top(3)
    # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    squaredErrorsRDD = (predictedReformattedRDD
                        .join(actualReformattedRDD)
                        .map(lambda x: x[1])
                        .map(lambda x: (x[1]-x[0])**2))
    print squaredErrorsRDD.top(3)
    # Compute the total squared error - do not use collect()
    totalError = squaredErrorsRDD.reduce(lambda x,y: x+y)
    print totalError
    # Count the number of entries for which you computed the total squared error
    numRatings = squaredErrorsRDD.count()
    print numRatings
    # Using the total squared error and the number of entries, compute the RSME
    return  math.sqrt(totalError/float(numRatings))



#### **(2c) Using ALS.train()**
#### In this part, we will use the MLlib implementation of Alternating Least Squares, [ALS.train()](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS). ALS takes a training dataset (RDD) and several parameters that control the model creation process. To determine the best values for the parameters, we will use ALS to train several models, and then we will select the best model and use the parameters from that model in the rest of this lab exercise.
#### The process we will use for determining the best model is as follows:
* #### Pick a set of model parameters. The most important parameter to `ALS.train()` is the *rank*, which is the number of rows in the Users matrix (green in the diagram above) or the number of columns in the Movies matrix (blue in the diagram above). (In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to [overfitting](https://en.wikipedia.org/wiki/Overfitting).)  We will train models with ranks of 4, 8, and 12 using the `trainingRDD` dataset.
* #### Create a model using `ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, lambda_=regularizationParameter)` with three parameters: an RDD consisting of tuples of the form (UserID, MovieID, rating) used to train the model, an integer rank (4, 8, or 12), a number of iterations to execute (we will use 5 for the `iterations` parameter), and a regularization coefficient (we will use 0.1 for the `regularizationParameter`).
* #### For the prediction step, create an input RDD, `validationForPredictRDD`, consisting of (UserID, MovieID) pairs that you extract from `validationRDD`. You will end up with an RDD of the form: `[(1, 1287), (1, 594), (1, 1270)]`
* #### Using the model and `validationForPredictRDD`, we can predict rating values by calling [model.predictAll()](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.MatrixFactorizationModel.predictAll) with the `validationForPredictRDD` dataset, where `model` is the model we generated with ALS.train().  `predictAll` accepts an RDD with each entry in the format (userID, movieID) and outputs an RDD with each entry in the format (userID, movieID, rating).
* #### Evaluate the quality of the model by using the `computeError()` function in part (2b) to compute the error between the predicted ratings and the actual ratings in `validationRDD`.


In [17]:
 
from pyspark.mllib.recommendation import ALS

validationForPredictRDD = validationRDD.map(lambda x: (x[0],x[1]))

seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

print 'The best model was trained with rank %s' % bestRank

[((2999, 3799), 1.1362232320180086), ((2999, 3397), 3.30229790721614), ((2999, 3396), 3.544613411484317)]
[((2999, 3799), 1.0), ((2999, 3397), 4.0), ((2999, 3396), 4.0)]
[26.597665192219818, 22.457186439678672, 18.24543666717033]
77180.6903794
96842
For rank 4 the RMSE is 0.892734779484
[((2999, 3799), 1.0684802630578876), ((2999, 3397), 3.2465350442298857), ((2999, 3396), 3.4882563067864822)]
[((2999, 3799), 1.0), ((2999, 3397), 4.0), ((2999, 3396), 4.0)]
[16.733415005766243, 15.681245112060461, 15.124201521856964]
76729.4578332
96842
For rank 8 the RMSE is 0.890121292255
[((2999, 3799), 1.4082225731698261), ((2999, 3397), 3.3587292421282733), ((2999, 3396), 3.45221004873277)]
[((2999, 3799), 1.0), ((2999, 3397), 4.0), ((2999, 3396), 4.0)]
[17.371523035238063, 17.276989028017876, 16.396599835118717]
76745.8069392
96842
For rank 12 the RMSE is 0.890216118367
The best model was trained with rank 8


#### **(2d) Testing The Model**
#### So far, we used the `trainingRDD` and `validationRDD` datasets to select the best model. To decide how good our model is, we need to use the `testRDD` dataset.  We will use the `bestRank` you determined in part (2c) to create a model for predicting the ratings for the test dataset and then we will compute the RMSE.
 

In [19]:
 
myModel = ALS.train(trainingRDD, 8, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda x: (x[0],x[1]))

predictedTestRDD = myModel.predictAll(testForPredictingRDD)
   
testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

[((2999, 2720), 1.0), ((2999, 2450), 1.0), ((2999, 2429), 2.0)]
[((2999, 2720), 1.4927219427895995), ((2999, 2450), 1.5746678541203836), ((2999, 2429), 2.2985564300693095)]
[19.67187198212714, 16.590083688002007, 16.213685704043076]
77792.9394322
97980
The model had a RMSE on the test set of 0.891048561304


#### **(2e) Comparing The Model**
#### Looking at the RMSE for the results predicted by the model versus the values in the test set is one way to evalute the quality of our model. Another way to evaluate the model is to evaluate the error from a test set where every rating is the average rating for the training set.

In [27]:
 

trainingAvgRating = trainingRDD.map(lambda x: x[2]).mean()
print 'The average rating for movies in the training set is %s' % trainingAvgRating

testForAvgRDD = testRDD.map(lambda x: (x[0],x[1], trainingAvgRating))
print testRDD.top(3)
print testForAvgRDD.top(3)
testAvgRMSE = computeError(testRDD, testForAvgRDD)
print 'The RMSE on the average set is %s' % testAvgRMSE

The average rating for movies in the training set is 3.57409571052
[(2999, 2720, 1.0), (2999, 2450, 1.0), (2999, 2429, 2.0)]
[(2999, 2720, 3.5740957105180646), (2999, 2450, 3.5740957105180646), (2999, 2429, 3.5740957105180646)]
[((2999, 2720), 1.0), ((2999, 2450), 1.0), ((2999, 2429), 2.0)]
[((2999, 2720), 3.5740957105180646), ((2999, 2450), 3.5740957105180646), ((2999, 2429), 3.5740957105180646)]
[6.6259687269075, 6.6259687269075, 6.6259687269075]
123051.930024
98032
The RMSE on the average set is 1.12036693569


## **Part 3: Predictions for Yourself**
#### You can now predict what movies to recommend to a new user.  In order to do that, we first need to add his ratings to the `ratingsRDD` dataset.

In [29]:
print 'Most rated movies:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(50):
    print ratingsTuple

Most rated movies:
(average rating, movie name, number of reviews)
(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088)
(4.515798462852263, u"Schindler's List (1993)", 1171)
(4.512893982808023, u'Godfather, The (1972)', 1047)
(4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195)
(4.505415162454874, u'Usual Suspects, The (1995)', 831)
(4.457256461232604, u'Rear Window (1954)', 503)
(4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651)
(4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447)
(4.4, u'Sixth Sense, The (1999)', 1110)
(4.394285714285714, u'North by Northwest (1959)', 700)
(4.379506641366224, u'Citizen Kane (1941)', 527)
(4.375, u'Casablanca (1942)', 776)
(4.363975155279503, u'Godfather: Part II, The (1974)', 805)
(4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811)
(4.358173076923077, u'Silence of the Lambs, The (1991)', 1248)
(4.335826477187734, u'Saving Private Ryan (1998)', 13

#### The user ID 0 is unassigned, so we will use it for his ratings. We set the variable `myUserID` to 0 for him. Next, we create a new RDD `myRatingsRDD` with his ratings for at least 10 movie ratings. 

In [33]:
 
myUserID = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
myRatedMovies = [
    (myUserID, 260, 5),
    (myUserID, 3948, 3),
    (myUserID, 3876, 5),
    (myUserID, 3885, 2),
    (myUserID, 3879, 3),
    (myUserID, 1, 5),
    (myUserID, 2, 5),
    (myUserID, 3, 4),
    (myUserID, 4, 5),
    (myUserID, 5, 5),
     # The format of each line is (myUserID, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (myUserID, 260, 5),
    ]
myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(10)

My movie ratings: [(0, 260, 5), (0, 3948, 3), (0, 3876, 5), (0, 3885, 2), (0, 3879, 3), (0, 1, 5), (0, 2, 5), (0, 3, 4), (0, 4, 5), (0, 5, 5)]


#### **(3b) Add His Movies to Training Dataset**
#### We have ratings for a new user, we need to add his ratings to the `training` dataset so that the model will incorporate his preferences. 

In [35]:
 
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

print ('The training dataset now has %s more entries than the original training dataset' %
       (trainingWithMyRatingsRDD.count() - trainingRDD.count()))
assert (trainingWithMyRatingsRDD.count() - trainingRDD.count()) == myRatingsRDD.count()

The training dataset now has 10 more entries than the original training dataset


#### **(3c) Train a Model with New Ratings**
#### Now, we train a model with new ratings added.

In [37]:
 
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter )

#### **(3d) Check RMSE for the New Model with New Ratings**

In [38]:
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(testRDD, predictedTestMyRatingsRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings

[((2999, 2720), 1.0), ((2999, 2450), 1.0), ((2999, 2429), 2.0)]
[((2999, 2720), 1.453789410642706), ((2999, 2450), 1.5093341667743103), ((2999, 2429), 2.386441121159943)]
[16.107175301086112, 16.01345286023443, 15.941615789776838]
77952.6193731
97980
The model had a RMSE on the test set of 0.891962587976


#### **(3e) Predict His Ratings**
#### We use the `predictAll` to predict what ratings this user would give to the movies that he did not already provide ratings for.

In [39]:
print moviesRDD.top(3)

[(3952, u'Contender, The (2000)'), (3951, u'Two Family House (2000)'), (3950, u'Tigerland (2000)')]


In [41]:
 

# Use the Python list myRatedMovies to transform the moviesRDD into an RDD with entries that are pairs of the form (myUserID, Movie ID) and that does not contain any movies that you have rated.
myUnratedMoviesRDD = (moviesRDD
                      .map(lambda x: (0,x[0])))

# Use the input RDD, myUnratedMoviesRDD, with myRatingsModel.predictAll() to predict your ratings for the movies
predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)

In [63]:
 
# Transform movieIDsWithAvgRatingsRDD from part (1b), which has the form (MovieID, (number of ratings, average rating)), into and RDD of the form (MovieID, number of ratings)
movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda x: (x[0],x[1][0]))
print movieCountsRDD.top(1)
# Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating)
print predictedRatingsRDD.top(1)[0][2]
predictedRDD = predictedRatingsRDD.map(lambda x: (x[1],x[2]))
print predictedRDD.top(1)
# Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD with tuples of the form (Movie ID, (Predicted Rating, number of ratings))
predictedWithCountsRDD  = (predictedRDD
                           .join(movieCountsRDD))
print predictedWithCountsRDD.top(2)
# Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD with tuples of the form (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings
ratingsWithNamesRDD = (predictedWithCountsRDD
                       .join(moviesRDD)
                       .filter(lambda x: x[1][0][1]>75)
                       .map(lambda x: (x[1][0][0], x[1][1] )))
print ratingsWithNamesRDD.top(2)
predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

[(3952, 316)]
4.28224032239
[(3952, 4.282240322386408)]
[(3952, (4.282240322386408, 316)), (3951, (4.668850755533288, 35))]
[(5.41807559132474, u'Inherit the Wind (1960)'), (5.306571052974393, u"Schindler's List (1993)")]
My highest rated movies as predicted (for movies with more than 75 reviews):
(5.41807559132474, u'Inherit the Wind (1960)')
(5.306571052974393, u"Schindler's List (1993)")
(5.220141784039909, u'Chariots of Fire (1981)')
(5.216282204562345, u'Gone with the Wind (1939)')
(5.130799798108811, u'My Fair Lady (1964)')
(5.128664502096896, u'Mis\ufffdrables, Les (1995)')
(5.123707139367181, u'Green Mile, The (1999)')
(5.115580855427306, u'Shakespeare in Love (1998)')
(5.110952729867436, u'Gandhi (1982)')
(5.106296435804017, u'Miracle on 34th Street (1947)')
(5.104918432078982, u'Titanic (1997)')
(5.096117700197223, u'Mr. Smith Goes to Washington (1939)')
(5.093636792263315, u'October Sky (1999)')
(5.082998368860042, u'Philadelphia (1993)')
(5.072781109322637, u'Remember the T