In [1]:
import os
import sys
import findspark 

findspark.init()
# set spark_path to the location where you have extracted spark
spark_path = "D:\spark220hdp27"

os.environ["SPARK_HOME"] = spark_path
os.environ["HADOOP_HOME"] = spark_path

sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

from pyspark import SparkContext
from pyspark import SparkConfx
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = SparkContext()
print(sc.version)
print(spark.version)
print(sc.version)
# do not run this cell again as sparkcntext has been initialized
# and running it again is asking for multiple spark contexts which`
# will raise an error

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by getOrCreate at <ipython-input-1-095e918bd4a8>:23 

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'whatever' as hello")
df.show()

+--------+
|   hello|
+--------+
|whatever|
+--------+



In [3]:
sc = spark.sparkContext
print('sparkContext version ', sc.version)

sparkContext version  2.2.0


## **Predicting Movie Ratings**
#### One of the most common uses of big data is to predict what users want.  This allows Google to show  us relevant ads, Amazon to recommend relevant products, and Netflix to recommend movies that we may like.  We will demonstrate how we can use Apache Spark to recommend movies to a user.  We will start with some basic techniques, and then use the [Spark MLlib][mllib] library's Alternating Least Squares method to make more sophisticated predictions.
#### We will use a subset dataset of 500,000 ratings available on Databricks from the [movielens 10M stable benchmark rating dataset](http://grouplens.org/datasets/movielens/). However, the same code will work for the full dataset, or their latest dataset of 21 million ratings.

In [4]:
import sys
import os

ratingsFilename = 'ratings.dat.gz'
moviesFilename = 'movies.dat'

In [5]:
numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)

def get_ratings_tuple(entry):
    """ Parse a line in the ratings dataset
    Args:
        entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
    Returns:
        tuple: (UserID, MovieID, Rating)
    """
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])


def get_movie_tuple(entry):
    """ Parse a line in the movies dataset
    Args:
        entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
    Returns:
        tuple: (MovieID, Title)
    """
    items = entry.split('::')
    return int(items[0]), items[1]


ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print ('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))
print ('Ratings: %s' % ratingsRDD.take(3))
print ('Movies: %s' % moviesRDD.take(3))

There are 487650 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 661, 3.0), (1, 914, 3.0)]
Movies: [(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)'), (3, 'Grumpier Old Men (1995)')]


In [6]:
print( ratingsRDD.map(lambda x : x[0]).distinct().count())
print( ratingsRDD.map(lambda x : x[1]).distinct().count())

2999
3615


### **Part 1: Basic Recommendations**
#### One way to recommend movies is to always recommend the movies with the highest average rating. In this part, we will use Spark to find the name, number of ratings, and the average rating of the 20 movies with the highest average rating and at least 500 reviews. We want to filter our movies with high ratings but fewer than 500 reviews because movies with few reviews may not have broad appeal to everyone.

(1a) Number of Ratings and Average Ratings for a Movie
Using only Python, implement a helper function getCountsAndAverages() that takes a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...)) and returns a tuple of (MovieID, (number of ratings, averageRating)). For example, given the tuple (100, (10.0, 20.0, 30.0)), your function should return (100, (3, 20.0))

In [7]:
# First, implement a helper function `getCountsAndAverages` using only Python
def getCountsAndAverages(IDandRatingsTuple):
    """ Calculate average rating
    Args:
        IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
    Returns:
        tuple: a tuple of (MovieID, (number of ratings, averageRating))
    """
    x = IDandRatingsTuple
    return (x[0],(len(x[1]),float(sum(x[1]))/len(x[1])))

In [8]:
assert getCountsAndAverages((1, (1, 2, 3, 4)) ) == (1,(4,2.5)),\
'incorrect count and average'
assert getCountsAndAverages((100, (10.0, 20.0, 30.0))) == (100, (3, 20.0)),\
'incorrect getCountsAndAverages() with float list'

#### **(1b) Movies with Highest Average Ratings**
#### Now that we have a way to calculate the average ratings, we will use the `getCountsAndAverages()` helper function with Spark to determine movies with highest average ratings.
#### The steps:
* #### Recall that the `ratingsRDD` contains tuples of the form (UserID, MovieID, Rating). From `ratingsRDD` create an RDD with tuples of the form (MovieID, Python iterable of Ratings for that MovieID). This transformation will yield an RDD of the form: `[(1, <pyspark.resultiterable.ResultIterable object at 0x7f16d50e7c90>), (2, <pyspark.resultiterable.ResultIterable object at 0x7f16d50e79d0>), (3, <pyspark.resultiterable.ResultIterable object at 0x7f16d50e7610>)]`. Note that we will only need to perform two Spark transformations to do this step.
* #### Using `movieIDsWithRatingsRDD` and our `getCountsAndAverages()` helper function, compute the number of ratings and average rating for each movie to yield tuples of the form (MovieID, (number of ratings, average rating)). This transformation will yield an RDD of the form: `[(1, (993, 4.145015105740181)), (2, (332, 3.174698795180723)), (3, (299, 3.0468227424749164))]`. We can do this step with one Spark transformation
* #### We want to see movie names, instead of movie IDs. To `moviesRDD`, apply RDD transformations that use `movieIDsWithAvgRatingsRDD` to get the movie names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form (average rating, movie name, number of ratings). This set of transformations will yield an RDD of the form: `[(1.0, u'Autopsy (Macchie Solari) (1975)', 1), (1.0, u'Better Living (1998)', 1), (1.0, u'Big Squeeze, The (1996)', 3)]`. 
* #### We will need to do two Spark transformations to complete this step: first use the `moviesRDD` with `movieIDsWithAvgRatingsRDD` to create a new RDD with Movie names matched to Movie IDs, then convert that RDD into the form of (average rating, movie name, number of ratings). These transformations will yield an RDD that looks like: `[(3.6818181818181817, u'Happiest Millionaire, The (1967)', 22), (3.0468227424749164, u'Grumpier Old Men (1995)', 299), (2.882978723404255, u'Hocus Pocus (1993)', 94)]`

In [9]:
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = (ratingsRDD
                          .map(lambda x:(x[1],x[2])).groupByKey())
print ('movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3))

# Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda x:getCountsAndAverages(x))
print ('movieIDsWithAvgRatingsRDD: %s\n' % movieIDsWithAvgRatingsRDD.take(3))

# To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
movieNameWithAvgRatingsRDD = (moviesRDD
                            .join(movieIDsWithAvgRatingsRDD)).map(lambda x:(x[1][1][1],x[1][0],x[1][1][0]))  
print ('movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3))

movieIDsWithRatingsRDD: [(914, <pyspark.resultiterable.ResultIterable object at 0x00000280696A10B8>), (3408, <pyspark.resultiterable.ResultIterable object at 0x00000280696A1BE0>), (2804, <pyspark.resultiterable.ResultIterable object at 0x00000280696A1DD8>)]

movieIDsWithAvgRatingsRDD: [(914, (314, 4.156050955414012)), (3408, (735, 3.8190476190476192)), (2804, (662, 4.2250755287009065))]

movieNameWithAvgRatingsRDD: [(2.676056338028169, 'Waiting to Exhale (1995)', 71), (2.926829268292683, 'Tom and Huck (1995)', 41), (2.3777777777777778, 'Dracula: Dead and Loving It (1995)', 90)]



#### **(1c) Movies with Highest Average Ratings and more than 500 reviews**
#### Now that we have an RDD of the movies with highest averge ratings, we can use Spark to determine the 20 movies with highest average ratings and at least 500 reviews.
#### Apply a single RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the average rating to get the movies in order of their rating (highest rating first). We will end up with an RDD of the form: `[(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, u"Schindler's List (1993)", 1171), (4.512893982808023, u'Godfather, The (1972)', 1047)]`

In [10]:
assert movieIDsWithRatingsRDD.count() == 3615, \
        'incorrect movieIDsWithRatingsRDD.count() (expected 3615)'
movieIDsWithRatingsTakeOrdered = movieIDsWithRatingsRDD.takeOrdered(3)

assert movieIDsWithRatingsTakeOrdered[0][0] == 1 and \
len(list(movieIDsWithRatingsTakeOrdered[0][1])) == 993,\
'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[0] (expected 993)'

assert movieIDsWithRatingsTakeOrdered[1][0] == 2 and \
len(list(movieIDsWithRatingsTakeOrdered[1][1])) == 332,\
'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[1] (expected 332)'

assert movieIDsWithRatingsTakeOrdered[2][0] == 3 and \
len(list(movieIDsWithRatingsTakeOrdered[2][1])) == 299, \
'incorrect count of ratings for movieIDsWithRatingsTakeOrdered[2] (expected 299)'

assert movieIDsWithAvgRatingsRDD.takeOrdered(3) == \
[(1, (993, 4.145015105740181)), (2, (332, 3.174698795180723)),
 (3, (299, 3.0468227424749164))], \
'incorrect movieIDsWithAvgRatingsRDD.takeOrdered(3)'
assert movieNameWithAvgRatingsRDD.count() == 3615, \
                'incorrect movieNameWithAvgRatingsRDD.count() (expected 3615)'
assert movieNameWithAvgRatingsRDD.takeOrdered(3) == \
[(1.0, u'Autopsy (Macchie Solari) (1975)', 1), (1.0, u'Better Living (1998)', 1),
(1.0, u'Big Squeeze, The (1996)', 3)], \
 'incorrect movieNameWithAvgRatingsRDD.takeOrdered(3)'

In [11]:
def sortFunction(tuple):
    """ Construct the sort string (does not perform actual sorting)
    Args:
        tuple: (rating, MovieName)
    Returns:
        sortString: the value to sort with, 'rating MovieName'
    """
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)

In [12]:
movieNameWithAvgRatingsRDD.filter(lambda x: x[2] > 500).take(5)

[(3.921985815602837, 'Twelve Monkeys (1995)', 705),
 (4.43953006219765, 'Star Wars: Episode IV - A New Hope (1977)', 1447),
 (4.296438883541867, 'Pulp Fiction (1994)', 1039),
 (3.453846153846154, 'Stargate (1994)', 520),
 (4.121270452358036, 'Forrest Gump (1994)', 1039)]

In [13]:
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda x: x[2] > 500)
                                    .sortBy(lambda x : -x[0]))
print ('Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20))

Movies with highest ratings: [(4.5349264705882355, 'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, "Schindler's List (1993)", 1171), (4.512893982808023, 'Godfather, The (1972)', 1047), (4.510460251046025, 'Raiders of the Lost Ark (1981)', 1195), (4.505415162454874, 'Usual Suspects, The (1995)', 831), (4.457256461232604, 'Rear Window (1954)', 503), (4.45468509984639, 'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651), (4.43953006219765, 'Star Wars: Episode IV - A New Hope (1977)', 1447), (4.4, 'Sixth Sense, The (1999)', 1110), (4.394285714285714, 'North by Northwest (1959)', 700), (4.379506641366224, 'Citizen Kane (1941)', 527), (4.375, 'Casablanca (1942)', 776), (4.363975155279503, 'Godfather: Part II, The (1974)', 805), (4.358816276202219, "One Flew Over the Cuckoo's Nest (1975)", 811), (4.358173076923077, 'Silence of the Lambs, The (1991)', 1248), (4.335826477187734, 'Saving Private Ryan (1998)', 1337), (4.326241134751773, 'Chinatown (1

In [14]:
assert movieLimitedAndSortedByRatingRDD.count()  == 194,\
                'incorrect movieLimitedAndSortedByRatingRDD.count()'
assert movieLimitedAndSortedByRatingRDD.take(20) == \
              [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088),
               (4.515798462852263, u"Schindler's List (1993)", 1171),
               (4.512893982808023, u'Godfather, The (1972)', 1047),
               (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195),
               (4.505415162454874, u'Usual Suspects, The (1995)', 831),
               (4.457256461232604, u'Rear Window (1954)', 503),
               (4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651),
               (4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447),
               (4.4, u'Sixth Sense, The (1999)', 1110), (4.394285714285714, u'North by Northwest (1959)', 700),
               (4.379506641366224, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776),
               (4.363975155279503, u'Godfather: Part II, The (1974)', 805),
               (4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811),
               (4.358173076923077, u'Silence of the Lambs, The (1991)', 1248),
               (4.335826477187734, u'Saving Private Ryan (1998)', 1337),
               (4.326241134751773, u'Chinatown (1974)', 564),
               (4.325383304940375, u'Life Is Beautiful (La Vita \ufffd bella) (1997)', 587),
               (4.324110671936759, u'Monty Python and the Holy Grail (1974)', 759),
               (4.3096, u'Matrix, The (1999)', 1250)], 'incorrect sortedByRatingRDD.take(20)'

## **Part 2: Collaborative Filtering**
#### Spark exposes some higher level functionality; in particular, Machine Learning using a component of Spark called [MLlib][mllib].  
#### We are going to use a technique called [collaborative filtering][collab]. Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a person chosen randomly. 
#### The image below (from [Wikipedia][collab]) shows an example of predicting of the user's rating using collaborative filtering. At first, people rate different items (like videos, images, games). After that, the system is making predictions about a user's rating for an item, which the user has not rated yet. These predictions are built upon the existing ratings of other users, who have similar ratings with the active user. For instance, in the image below the system has made a prediction, that the active user will not like the video.
![collaborative filtering](https://courses.edx.org/c4x/BerkeleyX/CS100.1x/asset/Collaborative_filtering.gif)


#### Since not all users have rated all movies, we do not know all of the entries in this matrix, which is precisely why we need collaborative filtering.  For each user, we have ratings for only a subset of the movies.  With collaborative filtering, the idea is to approximate the ratings matrix by factorizing it as the product of two matrices: one that describes properties of each user (shown in green), and one that describes properties of each movie (shown in blue).
![factorization](http://spark-mooc.github.io/web-assets/images/matrix_factorization.png)
#### We want to select these two matrices such that the error for the users/movie pairs where we know the correct ratings is minimized.  The [Alternating Least Squares][als] algorithm does this by first randomly filling the users matrix with values and then optimizing the value of the movies such that the error is minimized.  Then, it holds the movies matrix constrant and optimizes the value of the user's matrix.  This alternation between which matrix to optimize is the reason for the "alternating" in the name.
#### This optimization is what's being shown on the right in the image above.  Given a fixed set of user factors (i.e., values in the users matrix), we use the known ratings to find the best values for the movie factors using the optimization written at the bottom of the figure.  Then we "alternate" and pick the best user factors given fixed movie factors.


#### **(2a) Creating a Training Set**
#### Before we jump into using machine learning, we need to break up the `ratingsRDD` dataset into three pieces:
* #### A training set (RDD), which we will use to train models
* #### A validation set (RDD), which we will use to choose the best model
* #### A test set (RDD), which we will use for our experiments
#### To randomly split the dataset into the multiple groups, we can use the pySpark [randomSplit()](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.randomSplit) transformation. `randomSplit()` takes a set of splits and and seed and returns multiple RDDs.

In [11]:
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)

print ('Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count()))
print (trainingRDD.take(3))
print (validationRDD.take(3))
print (testRDD.take(3))

Training: 293180, validation: 96898, test: 97572

[(1, 1193, 5.0), (1, 661, 3.0), (1, 2355, 5.0)]
[(1, 914, 3.0), (1, 3408, 4.0), (1, 2321, 3.0)]
[(1, 1197, 3.0), (1, 1287, 5.0), (1, 2804, 5.0)]


#### **(2b) Root Mean Square Error (RMSE)**
####  We will use the [Root Mean Square Error](https://en.wikipedia.org/wiki/Root-mean-square_deviation) (RMSE) or Root Mean Square Deviation (RMSD) to compute the error of each model.  RMSE is a frequently used measure of the differences between values (sample and population values) predicted by a model or an estimator and the values actually observed. The RMSD represents the sample standard deviation of the differences between predicted values and observed values. These individual differences are called residuals when the calculations are performed over the data sample that was used for estimation, and are called prediction errors when computed out-of-sample. The RMSE serves to aggregate the magnitudes of the errors in predictions for various times into a single measure of predictive power. RMSE is a good measure of accuracy, but only to compare forecasting errors of different models for a particular variable and not between variables, as it is scale-dependent.
####  The RMSE is the square root of the average value of the square of `(actual rating - predicted rating)` for all users and movies for which we have the actual rating. Versions of Spark MLlib beginning with Spark 1.4 include a [RegressionMetrics](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RegressionMetrics) modiule that can be used to compute the RMSE. However, since we are using Spark 1.3.1, we will write our own function.
#### Write a function to compute the sum of squared error given `predictedRDD` and `actualRDD` RDDs. Both RDDs consist of tuples of the form (UserID, MovieID, Rating)
#### Given two ratings RDDs, *x* and *y* of size *n*, we define RSME as follows: $ RMSE = \sqrt{\frac{\sum_{i = 1}^{n} (x_i - y_i)^2}{n}}$
#### To calculate RSME, the steps to perform are:
* #### Transform `predictedRDD` into the tuples of the form ((UserID, MovieID), Rating). For example, tuples like `[((1, 1), 5), ((1, 2), 3), ((1, 3), 4), ((2, 1), 3), ((2, 2), 2), ((2, 3), 4)]`. 
* #### Transform `actualRDD` into the tuples of the form ((UserID, MovieID), Rating). For example, tuples like `[((1, 2), 3), ((1, 3), 5), ((2, 1), 5), ((2, 2), 1)]`. 
* #### Using only RDD transformations, compute the squared error for each *matching* entry (i.e., the same (UserID, MovieID) in each RDD) in the reformatted RDDs. Note that not every (UserID, MovieID) pair will appear in both RDDs - if a pair does not appear in both RDDs, then it does not contribute to the RMSE. We will end up with an RDD with entries of the form $ (x_i - y_i)^2$ 
* #### Using an RDD action,compute the total squared error: $ SE = \sum_{i = 1}^{n} (x_i - y_i)^2 $
* #### Compute *n* by using an RDD action (but **not** `collect()`), to count the number of pairs for which we have computed the total squared error
* #### Using the total squared error and the number of pairs, compute the RSME. We have to compute this value as a [float]

In [12]:
# TODO: Replace <FILL IN> with appropriate code
import math

def computeError(predictedRDD, actualRDD):
    """ Compute the root mean squared error between predicted and actual
    Args:
        predictedRDD: predicted ratings for each movie and each user where each entry is in the form
                      (UserID, MovieID, Rating)
        actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
    Returns:
        RSME (float): computed RSME value
    """
    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = predictedRDD.map(lambda x:((x[0],x[1]),x[2]))
   
    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = actualRDD.map(lambda x:((x[0],x[1]),x[2]))
 
    # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    squaredErrorsRDD = (predictedReformattedRDD
                        .join(actualReformattedRDD)).map(lambda x:(x[0], 
                        pow(float((x[1][0])-float(x[1][1])),2)))
    # Compute the total squared error - do not use collect()
    totalError = squaredErrorsRDD.map(lambda x:(float(x[1]))).sum()

    # Count the number of entries for which you computed the total squared error
    numRatings = squaredErrorsRDD.count()
   
    # Using the total squared error and the number of entries, compute the RSME
    return math.sqrt(float(totalError)/numRatings)


# sc.parallelize turns a Python list into a Spark RDD.
testPredicted = sc.parallelize([
    (1, 1, 5),
    (1, 2, 3),
    (1, 3, 4),
    (2, 1, 3),
    (2, 2, 2),
    (2, 3, 4)])
testActual = sc.parallelize([
     (1, 2, 3),
     (1, 3, 5),
     (2, 1, 5),
     (2, 2, 1)])
testPredicted2 = sc.parallelize([
     (2, 2, 5),
     (1, 2, 5)])
testError = computeError(testPredicted, testActual)
print ('Error for test dataset (should be 1.22474487139): %s' % testError)

testError2 = computeError(testPredicted2, testActual)
print ('Error for test dataset2 (should be 3.16227766017): %s' % testError2)

testError3 = computeError(testActual, testActual)
print ('Error for testActual dataset (should be 0.0): %s' % testError3)

Error for test dataset (should be 1.22474487139): 1.224744871391589
Error for test dataset2 (should be 3.16227766017): 3.1622776601683795
Error for testActual dataset (should be 0.0): 0.0


In [13]:
# Using the canned RegressionMetrics from the pyspark.mllib.evaluation library
from pyspark.mllib.evaluation import RegressionMetrics
predictedReformattedRDD = testPredicted.map(lambda x:((x[0],x[1]),x[2]))
actualReformattedRDD = testActual.map(lambda x:((x[0],x[1]),x[2]))
actual_and_predicted = predictedReformattedRDD.join(actualReformattedRDD).map(
lambda x : ( float(x[1][0]),float(x[1][1])))
print(actual_and_predicted.take(3))
arm = RegressionMetrics(actual_and_predicted)
print(arm.rootMeanSquaredError)

[(3.0, 3.0), (4.0, 5.0), (2.0, 1.0)]
1.224744871391589


#### **(2c) Using ALS.train()**
#### In this part, we will use the MLlib implementation of Alternating Least Squares, [ALS.train()](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS). ALS takes a training dataset (RDD) and several parameters that control the model creation process. To determine the best values for the parameters, we will use ALS to train several models, and then we will select the best model and use the parameters from that model in the rest of this lab exercise.
#### The process we will use for determining the best model is as follows:
* #### Pick a set of model parameters. The most important parameter to `ALS.train()` is the *rank*, which is the number of rows in the Users matrix (green in the diagram above) or the number of columns in the Movies matrix (blue in the diagram above). (In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to [overfitting](https://en.wikipedia.org/wiki/Overfitting).)  We will train models with ranks of 4, 8, and 12 using the `trainingRDD` dataset.
* #### Create a model using `ALS.train(trainingRDD, rank, seed=seed, iterations=iterations, lambda_=regularizationParameter)` with three parameters: an RDD consisting of tuples of the form (UserID, MovieID, rating) used to train the model, an integer rank (4, 8, or 12), a number of iterations to execute (we will use 5 for the `iterations` parameter), and a regularization coefficient (we will use 0.1 for the `regularizationParameter`).
* #### For the prediction step, create an input RDD, `validationForPredictRDD`, consisting of (UserID, MovieID) pairs extracted from `validationRDD`. We will end up with an RDD of the form: `[(1, 1287), (1, 594), (1, 1270)]`
* #### Using the model and `validationForPredictRDD`, we can predict rating values by calling [model.predictAll()](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.MatrixFactorizationModel.predictAll) with the `validationForPredictRDD` dataset, where `model` is the model we generated with ALS.train().  `predictAll` accepts an RDD with each entry in the format (userID, movieID) and outputs an RDD with each entry in the format (userID, movieID, rating).
* #### Evaluate the quality of the model by using the `computeError()` function we wrote in part (2b) to compute the error between the predicted ratings and the actual ratings in `validationRDD`.
####  Which rank produces the best model, based on the RMSE with the `validationRDD` dataset?
#### Note: It is likely that this operation will take a noticeable amount of time (around a minute in our VM); We can observe its progress on the [Spark Web UI](http://localhost:4040). Probably most of the time will be spent running your `computeError()` function, since, unlike the Spark ALS implementation (and the Spark 1.4 [RegressionMetrics](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RegressionMetrics) module), this does not use a fast linear algebra library and needs to run some Python code for all 100k entries.

In [14]:
# TODO: Replace <FILL IN> with appropriate code
from pyspark.mllib.recommendation import ALS

validationForPredictRDD = validationRDD.map(lambda x:(x[0],x[1]))

seed = 5
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print ('For rank %s the RMSE is %s' % (rank, error))
    if error < minError:
        minError = error
        bestRank = rank

print ('The best model was trained with rank %s' % bestRank)


For rank 4 the RMSE is 0.8923714161029934
For rank 8 the RMSE is 0.8954064345072859
For rank 12 the RMSE is 0.8912742324386047
The best model was trained with rank 12


In [15]:
assert trainingRDD.getNumPartitions()  == 2,\
                  'incorrect number of partitions for trainingRDD (expected 2)'
assert validationForPredictRDD.count() == 96898,\
                  'incorrect size for validationForPredictRDD (expected 96898)'
assert validationForPredictRDD.filter(lambda t: t == (1, 150)).count() == 1, \
                  'incorrect content for validationForPredictRDD'
assert errors[0] - 0.883710109497 < tolerance, 'incorrect errors[0]'
assert errors[1] - 0.878486305621 < tolerance, 'incorrect errors[1]'
assert errors[2] - 0.876832795659 < tolerance, 'incorrect errors[2]'

#### **(2d) Testing Your Model**
#### So far, we used the `trainingRDD` and `validationRDD` datasets to select the best model.  Since we used these two datasets to determine what model is best, we cannot use them to test how good the model is - otherwise we would be very vulnerable to [overfitting](https://en.wikipedia.org/wiki/Overfitting).  To decide how good our model is, we need to use the `testRDD` dataset.  We will use the `bestRank` we determined in part (2c) to create a model for predicting the ratings for the test dataset and then we will compute the RMSE.
#### The steps we will perform are:
* #### Train a model, using the `trainingRDD`, `bestRank` from part (2c), and the parameters we used in in part (2c): `seed=seed`, `iterations=iterations`, and `lambda_=regularizationParameter` - we will include **all** of the parameters.
* #### For the prediction step, create an input RDD, `testForPredictRDD`, consisting of (UserID, MovieID) pairs that we extract from `testRDD`. We will end up with an RDD of the form: `[(1, 1287), (1, 594), (1, 1270)]`
* #### Use [myModel.predictAll()](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.MatrixFactorizationModel.predictAll) to predict rating values for the test dataset.
* #### For validation, use the `testRDD`and your `computeError` function to compute the RMSE between `testRDD` and the `predictedTestRDD` from the model.
* #### Evaluate the quality of the model by using the `computeError()` function we wrote in part (2b) to compute the error between the predicted ratings and the actual ratings in `testRDD`.

In [16]:
myModel = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda x:(x[0],x[1]))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)

testRMSE = computeError(testRDD, predictedTestRDD)

print ('The model had a RMSE on the test set of %s' % testRMSE)

The model had a RMSE on the test set of 0.8928043392048276


In [17]:
assert abs(testRMSE - 0.87809838344) < tolerance, 'incorrect testRMSE'

#### **(2e) Comparing Our Model**
#### Looking at the RMSE for the results predicted by the model versus the values in the test set is one way to evalute the quality of our model. Another way to evaluate the model is to evaluate the error from a test set where every rating is the average rating for the training set.
#### The steps:
* #### Use the `trainingRDD` to compute the average rating across all movies in that training dataset.
* #### Use the average rating just determined and the `testRDD` to create an RDD with entries of the form (userID, movieID, average rating).
* #### Use the `testRDD` to create an RDD with entries of the form (userID, movieID, rating).
* #### Use `computeError` function to compute the RMSE between the `testForRMSERDD` validation RDD that we just created and the `testForAvgRDD`

In [21]:
trainingAvgRating = trainingRDD.map(lambda x: x[2]).sum()/trainingRDD.count()
print ('The average rating for movies in the training set is %s' % trainingAvgRating)

testForAvgRDD = testRDD.map(lambda x:(x[0],x[1],trainingAvgRating))
testForRMSERDD = testRDD.map(lambda x: x)
testAvgRMSE = computeError(testForRMSERDD, testForAvgRDD)
print ('The RMSE on the average set is %s' % testAvgRMSE)

The average rating for movies in the training set is 3.571601064192646
The RMSE on the average set is 1.1144120501597654


In [22]:
assert abs(trainingAvgRating - 3.571601) < 0.000001, \
                'incorrect trainingAvgRating (expected 3.571601064192646)'
assert abs(testAvgRMSE - 1.114412) < 0.000001, \
                'incorrect testAvgRMSE (expected assert 1.1144120501597654)'

In [18]:
# Understand how the predictions are made
# Each row of UserFeatures is the inferred feature weights vector for the particular user
# Each row of ProductFeatues is the inferred feature weights vector for the product
print(myModel.predict(4,10))
fourUser = myModel.userFeatures().filter( lambda x : x[0] == 4)
print(fourUser.collect())
tenMovie = myModel.productFeatures().filter(lambda x : x[0] == 10 )
print(tenMovie.collect())
fourTenRating = sum([ x[0] * x[1] for x in zip(fourUser.collect()[0][1],
                                               tenMovie.collect()[0][1])])
print(fourTenRating)

3.1335955760308405
[(4, array('d', [-0.6683604717254639, 0.6176535487174988, -0.9526308178901672, 0.7968285083770752, 1.2056505680084229, 0.31765681505203247, -0.1131814643740654, -0.12488371878862381, 1.5376293659210205, 0.4793440103530884, -0.5800007581710815, -0.44496551156044006]))]
[(10, array('d', [-0.582744300365448, -0.003755934303626418, -0.428880512714386, 0.43934208154678345, 0.22229070961475372, 0.2934376001358032, -0.14676639437675476, -0.7182983756065369, 0.7954515218734741, 0.2937465310096741, -0.3217548131942749, 0.06804702430963516]))]
3.1335955760308405


## **Part 3: Predictions for Yourself**
#### The ultimate goal of this lab exercise is to predict what movies to recommend to oneself.  In order to do that, we will first need to add ratings for ourselves to the `ratingsRDD` dataset.

### Let us see the most popular movies with number of ratings, names and ids

In [19]:
moviesMap = moviesRDD.map(lambda x : (x[1],x[0])).collectAsMap()

In [20]:
print ('Most rated movies:')
print ('(number of ratings, (movie name, movie ID))')
# movieRatingsIDAndName = moveLimitedAndSortedByRatingRDD.
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(50):
    print( ratingsTuple[2],(ratingsTuple[1],moviesMap[ratingsTuple[1]]),)

Most rated movies:
(number of ratings, (movie name, movie ID))
1088 ('Shawshank Redemption, The (1994)', 318)
1171 ("Schindler's List (1993)", 527)
1047 ('Godfather, The (1972)', 858)
1195 ('Raiders of the Lost Ark (1981)', 1198)
831 ('Usual Suspects, The (1995)', 50)
503 ('Rear Window (1954)', 904)
651 ('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 750)
1447 ('Star Wars: Episode IV - A New Hope (1977)', 260)
1110 ('Sixth Sense, The (1999)', 2762)
700 ('North by Northwest (1959)', 908)
527 ('Citizen Kane (1941)', 923)
776 ('Casablanca (1942)', 912)
805 ('Godfather: Part II, The (1974)', 1221)
811 ("One Flew Over the Cuckoo's Nest (1975)", 1193)
1248 ('Silence of the Lambs, The (1991)', 593)
1337 ('Saving Private Ryan (1998)', 2028)
564 ('Chinatown (1974)', 1252)
587 ('Life Is Beautiful (La Vita � bella) (1997)', 2324)
759 ('Monty Python and the Holy Grail (1974)', 1136)
1250 ('Matrix, The (1999)', 2571)
1438 ('Star Wars: Episode V - The Empire Strikes B

#### The user ID 0 is unassigned, so we will use it for our ratings. We set the variable `myUserID` to 0  Next, create a new RDD `myRatingsRDD` with our ratings for at least 10 movie ratings. Each entry should be formatted as `(myUserID, movieID, rating)` (i.e., each entry should be formatted in the same way as `trainingRDD`).  As in the original dataset, ratings should be between 1 and 5 (inclusive). If we have not seen at least 10 of these movies, we can increase the parameter passed to `take()` in the above cell until there are 10 movies that we have seen (or we can also guess what our rating would be for movies we have not seen).

In [21]:
# TODO: Replace <FILL IN> with appropriate code
myUserID = 0
print(moviesRDD.filter( lambda x : x[0] in [318,527,858,1158,50,1447,
                                           811,600,1129,699]).collect())

myRatedMovies = [
(myUserID, 318,4),
(myUserID, 527,4),
(myUserID, 858,4),
(myUserID, 1158,4),
(myUserID, 50,5),
(myUserID, 1447,3),
(myUserID, 811,5),
(myUserID, 600,2),
(myUserID, 1129,4),
(myUserID, 699,2)
     # The format of each line is (myUserID, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)"
     # a five rating, you would add the following line:
     #   (myUserID, 260, 5),
    ]
myRatingsRDD = sc.parallelize(myRatedMovies)
print ('My movie ratings: %s' % myRatingsRDD.take(10))

[(50, 'Usual Suspects, The (1995)'), (318, 'Shawshank Redemption, The (1994)'), (527, "Schindler's List (1993)"), (600, 'Love and a .45 (1994)'), (699, 'To Cross the Rubicon (1991)'), (811, 'Bewegte Mann, Der (1994)'), (858, 'Godfather, The (1972)'), (1129, 'Escape from New York (1981)'), (1158, 'Here Comes Cookie (1935)'), (1447, "Gridlock'd (1997)")]
My movie ratings: [(0, 318, 4), (0, 527, 4), (0, 858, 4), (0, 1158, 4), (0, 50, 5), (0, 1447, 3), (0, 811, 5), (0, 600, 2), (0, 1129, 4), (0, 699, 2)]


#### **(3b) Add our Movies to Training Dataset**
#### Now that we have ratings for ourself, we need to add your ratings to the `training` dataset so that the model we train will incorporate your preferences.  Spark's [union()](http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#union) transformation combines two RDDs; use `union()` to create a new training dataset that includes our ratings and the data in the original training dataset.

In [22]:
# TODO: Replace <FILL IN> with appropriate code
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

print ('The training dataset now has %s more entries than the original training dataset' %
       (trainingWithMyRatingsRDD.count() - trainingRDD.count()))
assert trainingWithMyRatingsRDD.count() - trainingRDD.count() == myRatingsRDD.count()

The training dataset now has 10 more entries than the original training dataset


#### **(3c) Train a Model with our Ratings**
#### Now, train a model with our ratings added and the parameters we used in in part (2c): `bestRank`, `seed=seed`, `iterations=iterations`, and `lambda_=regularizationParameter` - make sure to include **all** of the parameters.

In [23]:
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)

#### **(3d) Check RMSE for the New Model with our Ratings**
#### Compute the RMSE for this new model on the test set.
* #### For the prediction step, we reuse `testForPredictRDD`, consisting of (UserID, MovieID) pairs that we extracted from `testRDD`. The RDD has the form: `[(1, 1287), (1, 594), (1, 1270)]`
* #### Use `myRatingsModel.predictAll()` to predict rating values for the `testForPredictRDD` test dataset, set this as `predictedTestMyRatingsRDD`
* #### For validation, use the `testRDD`and our `computeError` function to compute the RMSE between `testRDD` and the `predictedTestMyRatingsRDD` from the model.

In [24]:
# TODO: Replace <FILL IN> with appropriate code
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(testRDD,predictedTestMyRatingsRDD)
print ('The model had a RMSE on the test set of %s' % testRMSEMyRatings)

The model had a RMSE on the test set of 0.8920077469667276


#### **(3e) Predict Our Ratings**
#### So far, we have only used the `predictAll` method to compute the error of the model.  Here, we use the `predictAll` to predict what ratings we would give to the movies that you did not already provide ratings for.
#### The steps:
* #### Use the Python list `myRatedMovies` to transform the `moviesRDD` into an RDD with entries that are pairs of the form (myUserID, Movie ID) and that does not contain any movies that we have rated. This transformation will yield an RDD of the form: `[(0, 1), (0, 2), (0, 3), (0, 4)]`.
* #### For the prediction step, use the input RDD, `myUnratedMoviesRDD`, with myRatingsModel.predictAll() to predict our ratings for the movies.

In [29]:
print(moviesRDD.take(3))
print(myRatedMovies)

[(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)'), (3, 'Grumpier Old Men (1995)')]
[(0, 318, 4), (0, 527, 4), (0, 858, 4), (0, 1158, 4), (0, 50, 5), (0, 1447, 3), (0, 811, 5), (0, 600, 2), (0, 1129, 4), (0, 699, 2)]


In [25]:
# Use the Python list myRatedMovies to transform the moviesRDD into an RDD with entries that are pairs of the form 
# (myUserID, Movie ID) and that does not contain any movies that we have rated.
myUnratedMoviesRDD = (moviesRDD
                      .filter(lambda x:[x[0] != [y[1] for y in myRatedMovies]])).map(
    lambda x:(myUserID,x[0]))

# Use the input RDD, myUnratedMoviesRDD, with myRatingsModel.predictAll() to predict our ratings for the movies
predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)

In [31]:
predictedRatingsRDD.take(10)

[Rating(user=0, product=1084, rating=3.7912112276586187),
 Rating(user=0, product=3456, rating=3.116877299732263),
 Rating(user=0, product=3764, rating=2.1358960685234214),
 Rating(user=0, product=3272, rating=3.289825991446695),
 Rating(user=0, product=428, rating=3.601721794812863),
 Rating(user=0, product=1900, rating=2.7345154087855703),
 Rating(user=0, product=1328, rating=2.0423276968313004),
 Rating(user=0, product=464, rating=2.988989953889424),
 Rating(user=0, product=1336, rating=2.0597167230511713),
 Rating(user=0, product=912, rating=3.933520087436215)]

#### **(3f) Predict Your Ratings**
#### We have our predicted ratings. Now we can print out the 25 movies with the highest predicted ratings.
#### The steps you should perform are:
* #### From Parts (1b) and (1c), we know that we should look at movies with a reasonable number of reviews (e.g., more than 75 reviews). We can experiment with a lower threshold, but fewer ratings for a movie may yield higher prediction errors. Transform `movieIDsWithAvgRatingsRDD` from Part (1b), which has the form (MovieID, (number of ratings, average rating)), into an RDD of the form (MovieID, number of ratings): `[(2, 332), (4, 71), (6, 442)]`
* #### We want to see movie names, instead of movie IDs. Transform `predictedRatingsRDD` into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating): `[(3456, -0.5501005376936687), (1080, 1.5885892024487962), (320, -3.7952255522487865)]`
* #### Use RDD transformations with `predictedRDD` and `movieCountsRDD` to yield an RDD with tuples of the form (Movie ID, (Predicted Rating, number of ratings)): `[(2050, (0.6694097486155939, 44)), (10, (5.29762541533513, 418)), (2060, (0.5055259373841172, 97))]`
* #### Use RDD transformations with `predictedWithCountsRDD` and `moviesRDD` to yield an RDD with tuples of the form (Predicted Rating, Movie Name, number of ratings), _for movies with more than 75 ratings._ For example: `[(7.983121900375243, u'Under Siege (1992)'), (7.9769201864261285, u'Fifth Element, The (1997)')]`

In [26]:
# Transform movieIDsWithAvgRatingsRDD from part (1b), which has the form
# (MovieID, (number of ratings, average rating)), into and RDD of the form 
# (MovieID, number of ratings)
movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda x:(x[0],x[1][0]))
print(movieCountsRDD.take(2))

# Transform predictedRatingsRDD into an RDD with entries that are pairs of the form 
# (Movie ID, Predicted Rating)
predictedRDD = predictedRatingsRDD.map(lambda x:(x[1],x[2]))
print(predictedRDD.take(2))

# Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD 
# with tuples of the form (Movie ID, (Predicted Rating, number of ratings))
predictedWithCountsRDD  = (predictedRDD
                           .join(movieCountsRDD))
print(predictedWithCountsRDD.take(2))

# Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD 
# with tuples of the form (Predicted Rating, Movie Name, number of ratings),
# for movies with more than 75 ratings
ratingsWithNamesRDD = (predictedWithCountsRDD
                       .filter(lambda x: x[1][1] > 75)).join(moviesRDD).map(
    lambda x:(x[1][0][0],x[1][1],x[1][0][1]))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

[(914, 314), (3408, 735)]
[(1084, 3.7912112276586187), (3456, 3.116877299732263)]
[(3456, (3.116877299732263, 32)), (912, (3.933520087436215, 776))]
My highest rated movies as predicted (for movies with more than 75 reviews):
(4.429178504032896, '2001: A Space Odyssey (1968)', 811)
(4.409992561242355, 'Alien (1979)', 941)
(4.384725604951456, 'Blade Runner (1982)', 845)
(4.359801417823164, 'Aliens (1986)', 832)
(4.355473169212698, 'Star Wars: Episode IV - A New Hope (1977)', 1447)
(4.350864366078438, 'Godfather, The (1972)', 1047)
(4.3239603368480966, 'Lawrence of Arabia (1962)', 402)
(4.296675765818163, 'Silence of the Lambs, The (1991)', 1248)
(4.2965038296550455, 'Raiders of the Lost Ark (1981)', 1195)
(4.268855057588508, 'Matrix, The (1999)', 1250)
(4.262010359939317, 'Star Wars: Episode V - The Empire Strikes Back (1980)', 1438)
(4.2527804503102455, 'Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 278)
(4.248588504665106, 'Yojimbo (1961)', 110)
(4.238349406954

# Part 4 : Item similarities
### We can recommend products based on similarties between them. Having carried out the matrix factorization we have the product features available for all the movies in the movie set
### The steps that we will follow to compute similarities for a particular movie would be 
### Devise a formula to compare vectors - we will use cosine similarity - the ratio of the dot product to the product of the norms of the vectors
### a. Filter the product features to the movie of interest
### b. Cross join it with the full product features set and get similarities for each pair
### c. Create means to get movie ids and names together
### d. Sort on similarities descending to get the item recommnedations based on item similarities


In [27]:
# Lets create a simple vector and see how we can get the dot product
simpv = [3,3]
sum( simpv[x] * simpv[x] for x in range(len(simpv))) 


18

In [28]:
# Lets create a function that we will plug in to calculate the cosine similarities
def cos_sim(f,s):
    from math import sqrt
    dot_product = sum([f[x] * s[x] for x in range(len(f))])
    print(dot_product)
    norms_product = sqrt( sum( x * x for x in f) ) * sqrt( sum(x * x for x in s))
    print(norms_product)
    return dot_product/norms_product

In [29]:
# Verify the cosine similarity formula
cos_sim([3,0],[3,4])

9
15.0


0.6

In [30]:
# Lets take a look at the product features
model.productFeatures().first()[1]

array('d', [-0.20404702425003052, 0.4111984074115753, -0.8286676406860352, 0.418993204832077, 0.2826428711414337, 0.7812122702598572, -0.3807153105735779, -0.18545562028884888, 0.23406963050365448, 0.0689709261059761, -0.3330400288105011, 0.07071979343891144])

In [31]:
# Create a movies dictionary so that we can comfortably get movie names from ids
movies_map = moviesRDD.collectAsMap()

In [43]:
# Lets see the most rated movies
movieCountsRDD.sortBy(lambda x : -x[1]).map( lambda x : (x[0],movies_map[x[0]],x[1])).take(5)

[(2858, 'American Beauty (1999)', 1775),
 (260, 'Star Wars: Episode IV - A New Hope (1977)', 1447),
 (1196, 'Star Wars: Episode V - The Empire Strikes Back (1980)', 1438),
 (480, 'Jurassic Park (1993)', 1423),
 (1210, 'Star Wars: Episode VI - Return of the Jedi (1983)', 1390)]

In [32]:
# Lets collect the product features for a particular movie
movie_of_interest_id = 3386
mi_pf = model.productFeatures().filter(lambda x : x[0] == movie_of_interest_id)
mi_pf.collect()

[(3386,
  array('d', [-0.35212433338165283, -0.08273603022098541, -0.3896622061729431, 0.5738535523414612, 0.5002975463867188, 0.6489229202270508, -0.09155140817165375, -0.35801205039024353, 0.9639317393302917, 0.3876153826713562, -0.3232026696205139, -0.1436186134815216]))]

In [33]:
# We will do a cartesian of the movie of interest with the entire movie set
# Apply the cosine similarity to the ratings
# Map to movie names using movies_map
# And sort descending on movie names to get the most similar movies
mi_pf.cartesian(model.productFeatures()).map(lambda x : (x[0][0],x[1][0],cos_sim(x[0][1],x[1][1]) )).sortBy(
lambda x : -x[2]).map( lambda x : (x[1],movies_map[x[1]],x[2])).take(100)

[(3386, 'JFK (1991)', 1.0),
 (1466, 'Donnie Brasco (1997)', 0.9781235133055158),
 (1343, 'Cape Fear (1991)', 0.9747616463577208),
 (1082, 'Candidate, The (1972)', 0.9725997033804972),
 (98, 'Shopping (1994)', 0.9674208860310151),
 (475, 'In the Name of the Father (1993)', 0.965667025224523),
 (108, 'Catwalk (1995)', 0.9644767361391873),
 (2989, 'For Your Eyes Only (1981)', 0.9630112607491665),
 (2745, 'Mission, The (1986)', 0.962785087996856),
 (3020, 'Falling Down (1993)', 0.9627160790910411),
 (3050, 'Light It Up (1999)', 0.9616743720574553),
 (3001, 'Suburbans, The (1999)', 0.9579631970386354),
 (1384, 'Substance of Fire, The (1996)', 0.9578912101231464),
 (3852, 'Tao of Steve, The (2000)', 0.9572181418567901),
 (3263, "White Men Can't Jump (1992)", 0.9568034032965055),
 (2866, 'Buddy Holly Story, The (1978)', 0.9558337355447672),
 (3614, 'Honeymoon in Vegas (1992)', 0.9556058221725713),
 (3203, 'Dead Calm (1989)', 0.9537977435761551),
 (1532, 'Sprung (1997)', 0.9528833746832107),
 