In [1]:
import sys
import os


baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab4', 'small')

ratingsFilename = os.path.join(baseDir, inputPath, 'ratings.dat.gz')
moviesFilename = os.path.join(baseDir, inputPath, 'movies.dat')

In [2]:
numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)

def get_ratings_tuple(entry):
    """ Parse a line in the ratings dataset
    Args:
        entry (str): a line in the ratings dataset in the form of UserID::MovieID::Rating::Timestamp
    Returns:
        tuple: (UserID, MovieID, Rating)
    """
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])


def get_movie_tuple(entry):
    """ Parse a line in the movies dataset
    Args:
        entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
    Returns:
        tuple: (MovieID, Title)
    """
    items = entry.split('::')
    return int(items[0]), items[1]


ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)

assert ratingsCount == 487650
assert moviesCount == 3883
assert moviesRDD.filter(lambda (id, title): title == 'Toy Story (1995)').count() == 1
assert (ratingsRDD.takeOrdered(1, key=lambda (user, movie, rating): movie)
        == [(1, 1, 5.0)])

There are 487650 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 914, 3.0), (1, 2355, 5.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]


In [47]:
tmp1 = [(1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'delta')]
tmp2 = [(1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha'), (1, u'epsilon'), (1, u'alpha')]

oneRDD = sc.parallelize(tmp1)
twoRDD = sc.parallelize(tmp2)
oneSorted = oneRDD.sortByKey(True).collect()
twoSorted = twoRDD.sortByKey(True).collect()
print oneSorted
print twoSorted
assert set(oneSorted) == set(twoSorted)     # Note that both lists have the same elements
assert twoSorted[0][0] < twoSorted.pop()[0] # Check that it is sorted by the keys
assert oneSorted[0:2] != twoSorted[0:2]     # Note that the subset consisting of the first two elements does not matct

[(1, u'alpha'), (1, u'epsilon'), (1, u'delta'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]
[(1, u'delta'), (1, u'epsilon'), (1, u'alpha'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]


In [48]:
def sortFunction(tuple):
    """ Construct the sort string (does not perform actual sorting)
    Args:
        tuple: (rating, MovieName)
    Returns:
        sortString: the value to sort with, 'rating MovieName'
    """
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)


print oneRDD.sortBy(sortFunction, True).collect()
print twoRDD.sortBy(sortFunction, True).collect()



[(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]
[(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]


In [49]:
oneSorted1 = oneRDD.takeOrdered(oneRDD.count(),key=sortFunction)
twoSorted1 = twoRDD.takeOrdered(twoRDD.count(),key=sortFunction)
print 'one is %s' % oneSorted1
print 'two is %s' % twoSorted1
assert oneSorted1 == twoSorted1

one is [(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]
two is [(1, u'alpha'), (1, u'delta'), (1, u'epsilon'), (2, u'alpha'), (2, u'beta'), (3, u'alpha')]


In [52]:
from __future__ import division

def getCountsAndAverages(IDandRatingsTuple):
    """ Calculate average rating
    Args:
        IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
    Returns:
        tuple: a tuple of (MovieID, (number of ratings, averageRating))
    """
    key, rates = IDandRatingsTuple[0], list(IDandRatingsTuple[1])
    return (key, (len(rates), sum(rates)/len(rates)))

getCountsAndAverages((1, (1, 2, 3, 4)))
getCountsAndAverages((100, (10.0, 20.0, 30.0)))

(100, (3, 20.0))

In [55]:
# TODO: Replace <FILL IN> with appropriate code

# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = ratingsRDD.map(lambda l: (l[1], l[2])).groupByKey()
movieIDsWithRatingsRDD.count()
print 'movieIDsWithRatingsRDD: %s\n' % movieIDsWithRatingsRDD.take(3)

movieIDsWithRatingsRDD: [(2, <pyspark.resultiterable.ResultIterable object at 0x7f3bd4d93a50>), (4, <pyspark.resultiterable.ResultIterable object at 0x7f3bd4d93d90>), (6, <pyspark.resultiterable.ResultIterable object at 0x7f3bd4d93b10>)]



In [56]:
# Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(getCountsAndAverages)
print 'movieIDsWithAvgRatingsRDD: %s\n' % movieIDsWithAvgRatingsRDD.take(3)

movieIDsWithAvgRatingsRDD: [(2, (332, 3.1746987951807228)), (4, (71, 2.676056338028169)), (6, (442, 3.7918552036199094))]



In [57]:
# To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
moviesRDD.join(movieIDsWithAvgRatingsRDD).take(2)
movieNameWithAvgRatingsRDD = moviesRDD.join(movieIDsWithAvgRatingsRDD).map(lambda l: (l[1][1][1], l[1][0], l[1][1][0]))
print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.takeOrdered(3)

movieNameWithAvgRatingsRDD: [(1.0, u'Autopsy (Macchie Solari) (1975)', 1), (1.0, u'Better Living (1998)', 1), (1.0, u'Big Squeeze, The (1996)', 3)]



In [58]:
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda l: l[2]>500)
                                    .sortBy(sortFunction, False))
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)

Movies with highest ratings: [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.5157984628522634, u"Schindler's List (1993)", 1171), (4.5128939828080226, u'Godfather, The (1972)', 1047), (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195), (4.5054151624548737, u'Usual Suspects, The (1995)', 831), (4.4572564612326042, u'Rear Window (1954)', 503), (4.4546850998463903, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651), (4.4395300621976501, u'Star Wars: Episode IV - A New Hope (1977)', 1447), (4.4000000000000004, u'Sixth Sense, The (1999)', 1110), (4.3942857142857141, u'North by Northwest (1959)', 700), (4.3795066413662243, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776), (4.363975155279503, u'Godfather: Part II, The (1974)', 805), (4.3588162762022193, u"One Flew Over the Cuckoo's Nest (1975)", 811), (4.3581730769230766, u'Silence of the Lambs, The (1991)', 1248), (4.3358264771877337, u'Saving Private Ryan (1998

##  Collaborative Filtering

In [6]:
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

assert trainingRDD.count() == 292716
assert validationRDD.count() == 96902
assert testRDD.count() == 98032

assert trainingRDD.filter(lambda t: t == (1, 914, 3.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 2355, 5.0)).count() == 1
assert trainingRDD.filter(lambda t: t == (1, 595, 5.0)).count() == 1

assert validationRDD.filter(lambda t: t == (1, 1287, 5.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 594, 4.0)).count() == 1
assert validationRDD.filter(lambda t: t == (1, 1270, 5.0)).count() == 1

assert testRDD.filter(lambda t: t == (1, 1193, 5.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 2398, 4.0)).count() == 1
assert testRDD.filter(lambda t: t == (1, 1035, 5.0)).count() == 1

Training: 292716, validation: 96902, test: 98032

[(1, 914, 3.0), (1, 2355, 5.0), (1, 595, 5.0)]
[(1, 1287, 5.0), (1, 594, 4.0), (1, 1270, 5.0)]
[(1, 1193, 5.0), (1, 2398, 4.0), (1, 1035, 5.0)]


In [16]:
import math
from operator import add

def computeError(predictedRDD, actualRDD):
    """ Compute the root mean squared error between predicted and actual
    Args:
        predictedRDD: predicted ratings for each movie and each user where each entry is in the form
                      (UserID, MovieID, Rating)
        actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
    Returns:
        RSME (float): computed RSME value
    """
    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = predictedRDD.map(lambda l:((l[0], l[1]), l[2]))

    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = actualRDD.map(lambda l:((l[0], l[1]), l[2]))

    # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    squaredErrorsRDD = (predictedReformattedRDD
                        .join(actualReformattedRDD).map(lambda l:((l[1][0]-l[1][1])*(l[1][0]-l[1][1]))))
    
    # Compute the total squared error - do not use collect()
    totalError = squaredErrorsRDD.reduce(add)

    # Count the number of entries for which you computed the total squared error
    numRatings = squaredErrorsRDD.count()
    
    # Using the total squared error and the number of entries, compute the RSME
    return sqrt(totalError/numRatings)


# sc.parallelize turns a Python list into a Spark RDD.
testPredicted = sc.parallelize([
    (1, 1, 5),
    (1, 2, 3),
    (1, 3, 4),
    (2, 1, 3),
    (2, 2, 2),
    (2, 3, 4)])
testActual = sc.parallelize([
     (1, 2, 3),
     (1, 3, 5),
     (2, 1, 5),
     (2, 2, 1)])
testPredicted2 = sc.parallelize([
     (2, 2, 5),
     (1, 2, 5)])
testError = computeError(testPredicted, testActual)
print 'Error for test dataset (should be 1.22474487139): %s' % testError

testError2 = computeError(testPredicted2, testActual)
print 'Error for test dataset2 (should be 3.16227766017): %s' % testError2

testError3 = computeError(testActual, testActual)
print 'Error for testActual dataset (should be 0.0): %s' % testError3

[0, 1, 1, 4]
Error for test dataset (should be 1.22474487139): 1.0
[4, 16]
Error for test dataset2 (should be 3.16227766017): 3.16227766017
[0, 0, 0, 0]
Error for testActual dataset (should be 0.0): 0.0


In [29]:
from pyspark.mllib.recommendation import ALS

validationForPredictRDD = validationRDD.map(lambda l:(l[0], l[1]))
print trainingRDD.take(2)
print validationForPredictRDD.take(2)

seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

print 'The best model was trained with rank %s' % bestRank

[(1, 914, 3.0), (1, 2355, 5.0)]
[(1, 1287), (1, 594)]
[0.9919850913761287, 0.7308841607618389, 0.4447787839044933, 0.020703528101573377, 6.253586766206488, 1.9486400211930441, 0.652154801621091, 0.021861424145762573, 0.004193419403361522, 0.07208473150414463]
For rank 4 the RMSE is 0.901241353926
[0.8123632518359044, 0.5399659496228423, 0.6717832899640659, 8.853286114449032e-05, 6.816273009154465, 1.5821849362534983, 0.780415662891169, 0.07491699185589988, 0.005722882454294841, 0.0004617349348192543]
For rank 8 the RMSE is 0.887031659461
[0.3849102233192685, 0.5017035995104497, 1.076973948839098, 0.060668702870516504, 6.810756160764324, 1.769657772976631, 0.5412144701096345, 0.23090620211467877, 0.00017576550699966538, 1.4196528738265675]
For rank 12 the RMSE is 0.892111420368
The best model was trained with rank 8


In [26]:
trainingRDD.getNumPartitions()
validationForPredictRDD.count()
validationForPredictRDD.filter(lambda t: t == (1, 1907)).count()

1

In [27]:
errors[0]

0.90124135392598292

In [31]:
myModel = ALS.train(trainingRDD, 8, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda l:(l[0], l[1]))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)

testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

[0.3833259498947061, 1.4289609196277218, 0.43414129417550756, 1.852812427764695, 0.4308278638385558, 1.9423366362829346, 0.15945296700715988, 3.5456966725461423, 0.04572008933769131, 0.055543183180212834]
The model had a RMSE on the test set of 0.8891071528


In [39]:
print trainingRDD.take(10)
trainingAvgRating = trainingRDD.map(lambda l:l[2]).reduce(add)/trainingRDD.count()
print 'The average rating for movies in the training set is %s' % trainingAvgRating

testRDD.take(3)

# testForAvgRDD = testRDD.<FILL IN>
# testAvgRMSE = computeError(testRDD, testForAvgRDD)
# print 'The RMSE on the average set is %s' % testAvgRMSE



[(1, 914, 3.0), (1, 2355, 5.0), (1, 595, 5.0), (1, 2321, 3.0), (1, 1545, 4.0), (1, 2294, 4.0), (1, 1566, 4.0), (1, 1, 5.0), (1, 260, 4.0), (1, 2028, 5.0)]
The average rating for movies in the training set is 3.57409571052


[(1, 1193, 5.0), (1, 2398, 4.0), (1, 1035, 5.0)]

In [59]:
print 'Most rated movies:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(50):
    print ratingsTuple

Most rated movies:
(average rating, movie name, number of reviews)
(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088)
(4.5157984628522634, u"Schindler's List (1993)", 1171)
(4.5128939828080226, u'Godfather, The (1972)', 1047)
(4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195)
(4.5054151624548737, u'Usual Suspects, The (1995)', 831)
(4.4572564612326042, u'Rear Window (1954)', 503)
(4.4546850998463903, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651)
(4.4395300621976501, u'Star Wars: Episode IV - A New Hope (1977)', 1447)
(4.4000000000000004, u'Sixth Sense, The (1999)', 1110)
(4.3942857142857141, u'North by Northwest (1959)', 700)
(4.3795066413662243, u'Citizen Kane (1941)', 527)
(4.375, u'Casablanca (1942)', 776)
(4.363975155279503, u'Godfather: Part II, The (1974)', 805)
(4.3588162762022193, u"One Flew Over the Cuckoo's Nest (1975)", 811)
(4.3581730769230766, u'Silence of the Lambs, The (1991)', 1248)
(4.3358264771877337, u'Sav

In [61]:
myUserID = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
myRatedMovies = [
    (myUserID, 1447, 5),
    (myUserID, 860, 5),
    (myUserID, 603, 5),
    (myUserID, 993, 5),
    (myUserID, 941, 5)

     #   (myUserID, 260, 5),
    ]
myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(10)

My movie ratings: [(0, 1447, 5), (0, 860, 5), (0, 603, 5), (0, 993, 5), (0, 941, 5)]


In [66]:
myRatedMoviesRDD = sc.parallelize(myRatedMovies)
trainingWithMyRatingsRDD = trainingRDD.union(myRatedMoviesRDD)

print ('The training dataset now has %s more entries than the original training dataset' %
       (trainingWithMyRatingsRDD.count() - trainingRDD.count()))
assert (trainingWithMyRatingsRDD.count() - trainingRDD.count()) == myRatingsRDD.count()


The training dataset now has 5 more entries than the original training dataset


In [64]:
bestRank

8

In [65]:
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)

In [70]:
testMyRatedMoviesRDD = myRatedMoviesRDD.map(lambda l:(l[0], l[1]))
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testMyRatedMoviesRDD)
print predictedTestMyRatingsRDD.take(10)
testRMSEMyRatings = computeError(myRatedMoviesRDD, predictedTestMyRatingsRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings

[Rating(user=0, product=860, rating=4.804611337773756), Rating(user=0, product=941, rating=4.81914361956077), Rating(user=0, product=993, rating=4.726090632544357), Rating(user=0, product=603, rating=4.75252466353325), Rating(user=0, product=1447, rating=4.849740482510428)]
[0.0750263415799507, 0.061244042159331186, 0.032709030345579426, 0.03817672932656125, 0.022577922596199063]
The model had a RMSE on the test set of 0.21435207767
