# Collaborative Filtering

We are performing collaborative filtering to see how much movies are similar to each other to give suggestions.

New methods are used here:


- ``mapValues``Pass each value in the key-value pair RDD through a map function without changing the keys

- ``cache`` Persist this RDD with the default storage level
- ``take`` Takes the first number of elements of the RDD
- argument ``local[*]`` in setMaster method to use spark built in cluster manager and use more than one core of the pc. 
- ``saveAsTextFile`` saving the rdd as text file in the current folder. It will generate one file for each executer (core).

We also see using a chain of ``map`` methods.

In [1]:
import findspark
findspark.init()

import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)

In [2]:
def loadMovieNames():
    movieNames = {}
    with open("c:/SparkCourse/ml-100k/u.ITEM", encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

#Python 3 doesn't let you pass around unpacked tuples,
#so we explicitly extract the ratings now.
def makePairs( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( userRatings ): # very god for removing duplicates in spark
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2 # from two direction of join, keep the one that is alphabetically ahead
#also filters movies with the same name


def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)



print("\nLoading movie names...")
nameDict = loadMovieNames()

data = sc.textFile("file:///SparkCourse/ml-100k/u.data")

# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# Emit every movie rated together by the same user.
# Self-join to find every combination.
joinedRatings = ratings.join(ratings)
print("\nDataset is self-joined...")
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
print("\nDuplicates are filtered...")

# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)
print("\nMovie data is anonymized, no userid...")

# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")



Loading movie names...

Dataset is self-joined...

Duplicates are filtered...

Movie data is anonymized, no userid...


In [7]:

# Extract similarities for the movie we care about that are "good".
scoreThreshold = 0.975
coOccurenceThreshold = 70

movieID = 178 # 12 Angry Men

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above (filtering all at the same time)
filteredResults = moviePairSimilarities.filter(lambda pairSim: \
    (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
    and pairSim[1][0] > scoreThreshold and pairSim[1][1] > coOccurenceThreshold)

# Sort by quality score.
results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)

print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
    (sim, pair) = result
    # Display the similarity result that isn't the movie we're looking at
    similarMovieID = pair[0]
    if (similarMovieID == movieID):
        similarMovieID = pair[1]
    print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))

Top 10 similar movies for 12 Angry Men (1957)
Casablanca (1942)	score: 0.9814271702667515	strength: 75
Sting, The (1973)	score: 0.9800565289675167	strength: 78
Amadeus (1984)	score: 0.9779135349052294	strength: 88
One Flew Over the Cuckoo's Nest (1975)	score: 0.9778218847859345	strength: 81
Star Wars (1977)	score: 0.9776576120448436	strength: 109
Schindler's List (1993)	score: 0.977620731722541	strength: 80
Silence of the Lambs, The (1991)	score: 0.9774123548400772	strength: 97
Psycho (1960)	score: 0.9772611576478752	strength: 75
Shawshank Redemption, The (1994)	score: 0.9757950238470389	strength: 84
Citizen Kane (1941)	score: 0.9756537106878723	strength: 71
