In [1]:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt


In [13]:
def loadMovieNames():
    movieNames = {}
    with open("/home/cloudera/Downloads/ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
    return movieNames

In [3]:
def makePairs((user, ratings)):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

In [4]:
def filterDuplicates( (userID, ratings) ):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

In [5]:
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

In [7]:
#conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
#sc = SparkContext(conf = conf)

In [14]:
print "\nLoading movie names..."
nameDict = loadMovieNames()


Loading movie names...


In [15]:
data = sc.textFile("file:/home/cloudera/Downloads/ml-100k/u.data")

In [16]:
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

In [20]:
ratings.take(10)

[(196, (242, 3.0)),
 (186, (302, 3.0)),
 (22, (377, 1.0)),
 (244, (51, 2.0)),
 (166, (346, 1.0)),
 (298, (474, 4.0)),
 (115, (265, 2.0)),
 (253, (465, 5.0)),
 (305, (451, 3.0)),
 (6, (86, 3.0))]

In [21]:
# Emit every movie rated together by the same user.
# Self-join to find every combination.
joinedRatings = ratings.join(ratings)

In [23]:
joinedRatings.take(20)

[(2, ((292, 4.0), (292, 4.0))),
 (2, ((292, 4.0), (251, 5.0))),
 (2, ((292, 4.0), (50, 5.0))),
 (2, ((292, 4.0), (314, 1.0))),
 (2, ((292, 4.0), (297, 4.0))),
 (2, ((292, 4.0), (290, 3.0))),
 (2, ((292, 4.0), (312, 3.0))),
 (2, ((292, 4.0), (281, 3.0))),
 (2, ((292, 4.0), (13, 4.0))),
 (2, ((292, 4.0), (280, 3.0))),
 (2, ((292, 4.0), (303, 4.0))),
 (2, ((292, 4.0), (308, 3.0))),
 (2, ((292, 4.0), (307, 3.0))),
 (2, ((292, 4.0), (257, 4.0))),
 (2, ((292, 4.0), (316, 5.0))),
 (2, ((292, 4.0), (315, 1.0))),
 (2, ((292, 4.0), (301, 4.0))),
 (2, ((292, 4.0), (313, 5.0))),
 (2, ((292, 4.0), (279, 4.0))),
 (2, ((292, 4.0), (299, 4.0)))]

In [24]:
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

In [25]:
uniqueJoinedRatings.take(10)

[(2, ((292, 4.0), (314, 1.0))),
 (2, ((292, 4.0), (297, 4.0))),
 (2, ((292, 4.0), (312, 3.0))),
 (2, ((292, 4.0), (303, 4.0))),
 (2, ((292, 4.0), (308, 3.0))),
 (2, ((292, 4.0), (307, 3.0))),
 (2, ((292, 4.0), (316, 5.0))),
 (2, ((292, 4.0), (315, 1.0))),
 (2, ((292, 4.0), (301, 4.0))),
 (2, ((292, 4.0), (313, 5.0)))]

In [26]:
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)

In [28]:
moviePairs.take(20)

[((292, 314), (4.0, 1.0)),
 ((292, 297), (4.0, 4.0)),
 ((292, 312), (4.0, 3.0)),
 ((292, 303), (4.0, 4.0)),
 ((292, 308), (4.0, 3.0)),
 ((292, 307), (4.0, 3.0)),
 ((292, 316), (4.0, 5.0)),
 ((292, 315), (4.0, 1.0)),
 ((292, 301), (4.0, 4.0)),
 ((292, 313), (4.0, 5.0)),
 ((292, 299), (4.0, 4.0)),
 ((292, 298), (4.0, 3.0)),
 ((292, 295), (4.0, 4.0)),
 ((292, 305), (4.0, 3.0)),
 ((292, 293), (4.0, 4.0)),
 ((292, 294), (4.0, 1.0)),
 ((292, 310), (4.0, 4.0)),
 ((292, 309), (4.0, 1.0)),
 ((292, 306), (4.0, 4.0)),
 ((292, 311), (4.0, 5.0))]

In [29]:
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

In [30]:
moviePairRatings.take(20)

[((505, 1131), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677c50>),
 ((218, 1664), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677e10>),
 ((388, 846), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677e50>),
 ((320, 652), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677e90>),
 ((1038, 1132), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677ed0>),
 ((490, 1108), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677f10>),
 ((1062, 1110), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677f50>),
 ((681, 1669), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677f90>),
 ((293, 759), <pyspark.resultiterable.ResultIterable at 0x7fe1b1677fd0>),
 ((114, 404), <pyspark.resultiterable.ResultIterable at 0x7fe1b167d050>),
 ((1058, 1568), <pyspark.resultiterable.ResultIterable at 0x7fe1b167d090>),
 ((142, 996), <pyspark.resultiterable.ResultIterable at 0x7fe1b167d0d0>),
 ((440, 1084), <pyspark.resultiterable.ResultIterable at 0x7fe1b167d110>),
 ((330, 588), <pyspark.resu

In [31]:
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()


In [32]:
# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")

In [35]:
sys.argv[1] = 54
if (len(sys.argv) > 1):

    scoreThreshold = 0.97
    coOccurenceThreshold = 50

    movieID = int(sys.argv[1])

    # Filter for movies with this sim that are "good" as defined by
    # our quality thresholds above
    filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
        (pair[0] == movieID or pair[1] == movieID) \
        and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

    # Sort by quality score.
    results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

    print "Top 10 similar movies for " + nameDict[movieID]
    for result in results:
        (sim, pair) = result
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID = pair[1]
        print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])

Top 10 similar movies for Outbreak (1995)
Crimson Tide (1995)	score: 0.976323576179	strength: 68
Time to Kill, A (1996)	score: 0.976199912697	strength: 57
Ransom (1996)	score: 0.972265504235	strength: 67
In the Line of Fire (1993)	score: 0.971513119024	strength: 61
