# Calculate the similarities between movies
## u.data   -- The full u data set, 100000 ratings by 943 users on 1682 items.
Each user has rated at least 20 movies.  Users and items are numbered consecutively from 1.  The data is randomly ordered. This is a tab separated list of  
user id | item id | rating | timestamp. 
The time stamps are unix seconds since 1/1/1970 UTC
## u.item     -- Information about the items (movies); 
this is a tab separated list of
movie id | movie title | release date | video release date |
IMDb URL | unknown | Action | Adventure | Animation |
Children's | Comedy | Crime | Documentary | Drama | Fantasy |
Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |
Thriller | War | Western |
The last 19 fields are the genres, a 1 indicates the movie
is of that genre, a 0 indicates it is not; movies can be in
several genres at once.
The movie ids are the ones used in the u.data data set.

In [7]:
# import modules
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt
import os
root=os.getcwd() # root is the current directory 

In [55]:
# def the functions which are used later

# Dic for movies, key(movie-id) --> value (movie title)
def getMovieNames():
    movieNames = {}
    with open(os.path.join(root,"ml_100k/u.ITEM"), encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# function to remove duplicated ratings
def filterDuplicates(userRatings): # userRatings: (userID,((movieID,rating),(movieID,rating)))
    ratings=userRatings[1] # (movieID,rating),(movieID,rating)
    movie1,rating1=ratings[0]
    movie2,rating2=ratings[1]
    return movie1<movie2

# generate (movie1,movie2),(rating1,rating2) pairs:
def makePairs(userRatings):
    ratings=userRatings[1]
    movie1,rating1=ratings[0]
    movie2,rating2=ratings[1]
    return ((movie1,movie2),(rating1,rating2))

# function to caculate cosine similarity
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = round((numerator / (float(denominator))),3)

    return (score, numPairs)

In [11]:
# set the spark context,using all available local CPUs
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)

In [14]:
# get the dic for movie names
nameDict=getMovieNames()

In [20]:
# get movie rating data
ratingsRaw=sc.textFile(os.path.join(root,"ml_100k/u.data"))

In [22]:
# map ratings to key/value pairs: userID -> (movieID,rating)
ratingsPair = ratingsRaw.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

In [26]:
# join and get: userID => ((movieID, rating), (movieID, rating))
# a good example about how join works
# http://apachesparkbook.blogspot.com/2015/12/join-leftouterjoin-rightouterjoin.html
ratingsJoin=ratingsPair.join(ratingsPair)
# filter the duplicate
uniqueRatingsJoin=ratingsJoin.filter(filterDuplicates)

In [28]:
# get rating pairs: key->value = movie1,movie2 -> rating1,rating2
pairs=uniqueRatingsJoin.map(makePairs)

In [29]:
# group ratings by movie pairs
pairsGroup=pairs.groupByKey()
# get a data structure like movie1,movie2 -> (rating1,rating2),(rating1,rating2)...

In [33]:
# calculate similarities and save it to cache
PairSimilarities = pairsGroup.mapValues(computeCosineSimilarity).cache()
# get data set: (movie1, movie2) -> (similarityScore,numPairs)

In [45]:
# find the results meeting requirement
scoreReq=0.97
numReq=30
stop=False
while (stop!=True):
    movieID=int(input('select a movie ID: '))
    if movieID in nameDict.keys():
        print(f'the movie name is {nameDict[movieID]}')
        stop=True
    else:
        print("This movie ID doesn't exist, try another\n")

select a movie ID: 50
the movie name is Star Wars (1977)


In [None]:
nameDict.keys()

In [46]:
filterRetults=PairSimilarities.filter(lambda pair:(movieID in pair[0]) \
    and (pair[1][0]>scoreReq and pair[1][1]>numReq))

In [50]:
filterRetults.collect()

[((50, 172), (0.9895522078385338, 345)),
 ((28, 50), (0.9708054799005053, 243)),
 ((12, 50), (0.9724956031333988, 223)),
 ((50, 408), (0.9775948291054827, 92)),
 ((50, 404), (0.971249981890631, 95)),
 ((50, 480), (0.9734534315266805, 156)),
 ((50, 272), (0.9710571265648429, 110)),
 ((50, 1020), (0.9723737465397915, 31)),
 ((50, 612), (0.9710635462049332, 31)),
 ((50, 651), (0.9716728035350273, 155)),
 ((50, 495), (0.972453952686859, 52)),
 ((50, 483), (0.9726570623726027, 214)),
 ((50, 199), (0.9727591639531913, 145)),
 ((50, 479), (0.9702185595460495, 161)),
 ((50, 963), (0.9823449614960231, 40)),
 ((50, 251), (0.9761507096081464, 38)),
 ((50, 1007), (0.9783184758610347, 37)),
 ((50, 174), (0.981760098872619, 380)),
 ((50, 114), (0.9741816128302572, 58)),
 ((50, 478), (0.9734294611633468, 87)),
 ((50, 210), (0.9735394829992481, 304)),
 ((50, 166), (0.9722800399742672, 48)),
 ((50, 178), (0.9776576120448436, 109)),
 ((50, 498), (0.9764692222674887, 138)),
 ((50, 494), (0.97020633310884

In [64]:
# sort the result
sortedResultsReverse=filterRetults.map(lambda l: (l[1],l[0])).sortByKey(ascending = False)
sortedResults=sortedResultsReverse.map(lambda l: (l[1],l[0])).take(10)

In [65]:
# print the results, the top ten 
for x in sortedResults:
    print(x)

((50, 172), (0.9895522078385338, 345))
((50, 181), (0.9857230861253026, 480))
((50, 963), (0.9823449614960231, 40))
((50, 174), (0.981760098872619, 380))
((50, 141), (0.9789385605497993, 68))
((50, 1007), (0.9783184758610347, 37))
((50, 178), (0.9776576120448436, 109))
((50, 408), (0.9775948291054827, 92))
((50, 297), (0.9768144539214534, 42))
((50, 498), (0.9764692222674887, 138))
