In [1]:
preset = 'file:///root/lab/ws/dsml-learning/spark_python_handson/dataset/'
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName('CollaborativeFiltering') \
                  .set('spark.executor.instance', '10')
sc = SparkContext(conf = conf)

In [2]:
import sys
from math import sqrt

In [3]:
def loadMovieNames():
    movieNames = {}
    with open(preset[7:] + 'ml-100k/u.ITEM', encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [4]:
def makePairs( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

In [5]:
nameDict = loadMovieNames()

data = sc.textFile(preset + 'ml-100k/u.data')

In [6]:
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

In [7]:
joinedRatings = ratings.join(ratings)
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")

In [8]:
moviepairs = moviePairSimilarities.collect()
%time

CPU times: user 6 µs, sys: 1 µs, total: 7 µs
Wall time: 18.6 µs


In [9]:
sc.getConf().getAll()

[('spark.driver.port', '4444'),
 ('spark.executor.memory', '512m'),
 ('spark.ui.proxyBase', '/proxy/application_1594458378641_0044'),
 ('spark.app.id', 'application_1594458378641_0044'),
 ('spark.driver.appUIAddress', 'http://node-master:4040'),
 ('spark.yarn.am.memory', '1G'),
 ('spark.executor.id', 'driver'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'node-master'),
 ('spark.driver.memory', '512m'),
 ('spark.master', 'yarn'),
 ('spark.executorEnv.PYTHONPATH',
  '{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.app.name', 'CollaborativeFiltering'),
 ('spark.executor.instance', '10'),
 ('spark.rdd.compress', 'True'),
 ('spark.blockManager.port', '4445'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.isPython', 'true'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'node-master'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.ui.showConsolePro

In [10]:
# Extract similarities for the movie we care about that are "good".

scoreThreshold = 0.97
coOccurenceThreshold = 50

movieID = 50

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter(lambda pairSim: \
    (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
    and pairSim[1][0] > scoreThreshold and pairSim[1][1] > coOccurenceThreshold)

# Sort by quality score.
results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)

print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
    (sim, pair) = result
    # Display the similarity result that isn't the movie we're looking at
    similarMovieID = pair[0]
    if (similarMovieID == movieID):
        similarMovieID = pair[1]
    print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))

Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980)	score: 0.9895522078385338	strength: 345
Return of the Jedi (1983)	score: 0.9857230861253026	strength: 480
Raiders of the Lost Ark (1981)	score: 0.981760098872619	strength: 380
20,000 Leagues Under the Sea (1954)	score: 0.9789385605497993	strength: 68
12 Angry Men (1957)	score: 0.9776576120448436	strength: 109
Close Shave, A (1995)	score: 0.9775948291054827	strength: 92
African Queen, The (1951)	score: 0.9764692222674887	strength: 138
Sting, The (1973)	score: 0.9751512937740359	strength: 204
Wrong Trousers, The (1993)	score: 0.9748681355460885	strength: 103
Wallace & Gromit: The Best of Aardman Animation (1996)	score: 0.9741816128302572	strength: 58
