In [None]:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

In [None]:
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM", encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

ython 3 doesn't let you pass around unpacked tuples,<br>
o we explicitly extract the ratings now.

In [None]:
def makePairs( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

In [None]:
def filterDuplicates( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

In [None]:
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1
    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)
    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))
    return (score, numPairs)

In [None]:
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)

In [None]:
print("\nLoading movie names...")
nameDict = loadMovieNames()

In [None]:
data = sc.textFile("file:///SparkCourse/ml-100k/u.data")

Map ratings to key / value pairs: user ID => movie ID, rating

In [None]:
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

Emit every movie rated together by the same user.<br>
Self-join to find every combination.

In [None]:
joinedRatings = ratings.join(ratings)

At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

Filter out duplicate pairs

In [None]:
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

Now key by (movie1, movie2) pairs.

In [None]:
moviePairs = uniqueJoinedRatings.map(makePairs)

We now have (movie1, movie2) => (rating1, rating2)<br>
Now collect all ratings for each movie pair and compute similarity

In [None]:
moviePairRatings = moviePairs.groupByKey()

We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...<br>
Can now compute similarities.

In [None]:
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

Save the results if desired<br>
oviePairSimilarities.sortByKey()<br>
oviePairSimilarities.saveAsTextFile("movie-sims")

Extract similarities for the movie we care about that are "good".

In [None]:
if (len(sys.argv) > 1):
    scoreThreshold = 0.97
    coOccurenceThreshold = 50
    movieID = int(sys.argv[1])

    # Filter for movies with this sim that are "good" as defined by
    # our quality thresholds above
    filteredResults = moviePairSimilarities.filter(lambda pairSim: \
        (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
        and pairSim[1][0] > scoreThreshold and pairSim[1][1] > coOccurenceThreshold)

    # Sort by quality score.
    results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)
    print("Top 10 similar movies for " + nameDict[movieID])
    for result in results:
        (sim, pair) = result
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID = pair[1]
        print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))