# Tutorial: Taming Big Data With Apache Spark and Python - Hands On!
## Exercise 8 - Movie Similarity (Collaborative Filtering)

*Note: this script cannot be run in Jupyter Lab (sys.argv). I use it to troubleshoot new code prior to running the .py file*

### Setup

FindSpark

This will circumvent many issues with your system finding spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!wget https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
!mv spark-2.4.5-bin-hadoop2.7 spark-2.4.5

In [None]:
import os
# Install java
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null 

!pip install -q findspark
 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
os.environ["SPARK_HOME"] = "/content/spark-2.4.5"
!java -version

openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-8u342-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)


In [None]:
!git clone https://github.com/bangkit-pambudi/resource-spark.git

Cloning into 'resource-spark'...
remote: Enumerating objects: 38, done.[K
remote: Counting objects: 100% (38/38), done.[K
remote: Compressing objects: 100% (36/36), done.[K
remote: Total 38 (delta 7), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (38/38), done.


In [None]:
import findspark
findspark.init()

Load Libraries

In [None]:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

Set the file path

In [None]:
data_folder = "/content/resource-spark/data/ml-100k/"

Create the Spark Context

In [None]:
# configure your Spark context; master node is local machine
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")

# create a spark context object
sc = SparkContext(conf = conf)

### Define Functions

In [None]:
def loadMovieNames():
    movieNames = {} # create a dict
    file_to_open = data_folder + "u.ITEM" #file path
    with open(file_to_open, encoding = 'ascii', errors = 'ignore') as f: # open file
        for line in f:
            fields = line.split('|') # break the lines
            movieNames[int(fields[0])] = fields[1] # create key-value
    return movieNames

In [None]:
def filterDuplicates(userRatings):
    ratings = userRatings[1] # the value ((movieID, rating), (movieID, rating))
    (movie1, rating1) = ratings[0] 
    (movie2, rating2) = ratings[1]
    return movie1 < movie2 # return only those entries where movieID 2 is greater than movieID 1

In [None]:
#Python 3 doesn't let you pass arond unpacked tuples,
# so we explicitly extract the ratings now.
def makePairs(userRatings):
    ratings = userRatings[1] # the value ((movieID, rating), (movieID, rating))
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2)) #format so its pair of movies and pair of ratings

In [None]:
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1
    
    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)
    
    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))
        
    return (score, numPairs)

In [None]:
def medianratings(ratings):
    import statistics
    temp = []
    for x,y in ratings:
        temp.append( (x, statistics.median(y)) )
    return temp

In [None]:
def filterbad(rdd, key):
    temp = []
    for x in rdd.collect():
        if (x[1][0] in key) == True:
            temp.append(x)
    return temp

### The Program

**1) Create a dictionary with movieID and movieNames.**

In [None]:
print("\nLoading movie names...")
nameDict = loadMovieNames()

print("Success! Your", type(nameDict), "was created with", len(nameDict), "entries. For instance", nameDict.get(42), "is one of them.")


Loading movie names...
Success! Your <class 'dict'> was created with 1682 entries. For instance Clerks (1994) is one of them.


**2) Bring in the movie ratings data.**

In [None]:
data = sc.textFile(data_folder + "u.data")

print("Success! Your", type(data), "was created with", data.count(), "entries. \
\n\nThe first five are:\n", data.take(5))

Success! Your <class 'pyspark.rdd.RDD'> was created with 100000 entries. 

The first five are:
 ['196\t242\t3\t881250949', '186\t302\t3\t891717742', '22\t377\t1\t878887116', '244\t51\t2\t880606923', '166\t346\t1\t886397596']


**3) Key/value stores of dicts: user ID => movie ID, rating.**

In [None]:
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

print("Success! Your", type(ratings), "was created with", ratings.count(), "entries. \
\n\nThe first five are:\n", ratings.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 100000 entries. 

The first five are:
 [(196, (242, 3.0)), (186, (302, 3.0)), (22, (377, 1.0)), (244, (51, 2.0)), (166, (346, 1.0))]


**3.2) Remove Bad Movies**

Return a list of values from the ratings Rdd. Will be a list of (movieID, rating)

In [None]:
list_movie_rating = list(ratings.values().collect()) # tuples list, with (movieID, rating)

Group by the movieID, with the value being a list of all ratings.

In [None]:
from itertools import groupby
from operator import itemgetter

In [None]:
# the list needs to be sorted prior to being aggregated
def getKey(item):
    return item[0]

list_movie_rating_sort = sorted(list_movie_rating, key=getKey)

In [None]:
list_movie_allratings = [(k, list(list(zip(*g))[1])) for k, g in groupby(list_movie_rating_sort, itemgetter(0))]  # (movieID, (rating1,rating2))

Take the median of the ratings for each movie

In [None]:
rdd_movie_medianratings = sc.parallelize(medianratings(list_movie_allratings))

result = rdd_movie_medianratings

print("Success! Your", type(result), "was created with", result.count(), "entries. \
\n\nThe first five are:\n", result.take(5))

Success! Your <class 'pyspark.rdd.RDD'> was created with 1682 entries. 

The first five are:
 [(1, 4.0), (2, 3.0), (3, 3.0), (4, 4.0), (5, 3.0)]


Filter out movies that have median ratings less than 3.5

In [None]:
filteredbyRatings = rdd_movie_medianratings.filter(lambda x: x[1] > 2)

result = filteredbyRatings

print("Success! Your", type(result), "was created with", result.count(), "entries. \
\n\nThe first five are:\n", result.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 1369 entries. 

The first five are:
 [(1, 4.0), (2, 3.0), (3, 3.0), (4, 4.0), (5, 3.0)]


Create a key of movies that have ratings > 2

In [None]:
key_movies = list(filteredbyRatings.keys().collect()) # list of keys, i.e., movies with ratings > 2

Filter the ratings Rdd to remove movies with ratings < 2

In [None]:
ratings_filter_bad = sc.parallelize(filterbad(ratings, key_movies))

result = ratings_filter_bad

print("Success! Your", type(result), "was created with", result.count(), "entries. \
\n\nThe first five are:\n", result.take(5))

Success! Your <class 'pyspark.rdd.RDD'> was created with 96211 entries. 

The first five are:
 [(196, (242, 3.0)), (186, (302, 3.0)), (244, (51, 2.0)), (166, (346, 1.0)), (298, (474, 4.0))]


**4) Emit every movie rated together by the same user. Self-join to find every combination.**

In [None]:
joinedRatings = ratings_filter_bad.join(ratings)

print("Success! Your", type(joinedRatings), "was created with", joinedRatings.count(), "entries. \
\n\nThe first five are:\n", joinedRatings.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 19285491 entries. 

The first five are:
 [(186, ((302, 3.0), (302, 3.0))), (186, ((302, 3.0), (566, 5.0))), (186, ((302, 3.0), (250, 1.0))), (186, ((302, 3.0), (148, 4.0))), (186, ((302, 3.0), (263, 3.0)))]


**5) Filter out duplicate pairs**

In [None]:
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

print("Success! Your", type(uniqueJoinedRatings), "was created with", uniqueJoinedRatings.count(), "entries. \
\n\nThe filter removed", joinedRatings.count() - uniqueJoinedRatings.count(), "entries.\
\n\nThe first five are:\n", uniqueJoinedRatings.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 10050406 entries. 

The filter removed 10150406 entries.

The first five are:
 [(196, ((242, 3.0), (393, 4.0))), (196, ((242, 3.0), (381, 4.0))), (196, ((242, 3.0), (251, 3.0))), (196, ((242, 3.0), (655, 5.0))), (196, ((242, 3.0), (306, 4.0)))]


**5) Now key by (movie1, movie2) pairs**

In [None]:
moviePairs = uniqueJoinedRatings.map(makePairs)

print("Success! Your", type(moviePairs), "was created with", moviePairs.count(), "entries. \
\n\nThe first five are:\n", moviePairs.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 10050406 entries. 

The first five are:
 [((242, 393), (3.0, 4.0)), ((242, 381), (3.0, 4.0)), ((242, 251), (3.0, 3.0)), ((242, 655), (3.0, 5.0)), ((242, 306), (3.0, 4.0))]


**6) Group by key.**

In [None]:
moviePairRatings = moviePairs.groupByKey()

print("Success! Your", type(moviePairRatings), "was created with", moviePairRatings.count(), "entries. \
\n\nThe first five are:\n", moviePairRatings.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 983206 entries. 

The first five are:
 [((242, 580), <pyspark.resultiterable.ResultIterable object at 0x0000024386ED1D30>), ((242, 692), <pyspark.resultiterable.ResultIterable object at 0x0000024386ED1CF8>), ((242, 428), <pyspark.resultiterable.ResultIterable object at 0x0000024386ED1BE0>), ((242, 340), <pyspark.resultiterable.ResultIterable object at 0x0000024386ED1E10>), ((393, 1241), <pyspark.resultiterable.ResultIterable object at 0x0000024387099470>)]


**7) Compute similarities.**

*Save the results if desired
moviePairSimilarities.sortByKey()
moviePairSimilarities.saveAsTextFile("movie-sims")*

In [None]:
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

print("Success! Your", type(moviePairSimilarities), "was created with", moviePairSimilarities.count(), "entries. \
\n\nThe first five are:\n", moviePairSimilarities.take(5))

Success! Your <class 'pyspark.rdd.PipelinedRDD'> was created with 983206 entries. 

The first five are:
 [((242, 580), (0.9443699330874624, 6)), ((242, 692), (0.9203762039948743, 18)), ((242, 428), (0.9419097988977888, 15)), ((242, 340), (0.9455404837184603, 32)), ((393, 1241), (1.0, 1))]


Extract similarities for the movie we care about that are "good".

In [None]:
len(sys.argv)

3

In [None]:
print(sys.argv)

['C:\\Users\\Andy\\Anaconda3\\lib\\site-packages\\ipykernel_launcher.py', '-f', 'C:\\Users\\Andy\\AppData\\Roaming\\jupyter\\runtime\\kernel-ec43cdaa-d7ba-4395-a03b-e4277274e23f.json']


In [None]:
if (len(sys.argv) > 1):
    
    scoreThreshold = 0.97
    coOccurenceThreshold = 50
    
    movieID = int(sys.argv[1])
    
    # Filter for movies with this sim that are "good" as defined by 
    # our quality thresholds above
    filteredResults = moviePairSimilarities.filter(lambda pairSim: \
                                                  (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
                                                  and pairSim[1][0] > scoreThreshold and pairsim[1][1] > coOccurenceThreshold)
    
    # Sort by quality score.
    results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending= False).take(10)
    
    print("Top 10 similar movies for " + nameDict[movieID])
    for result in results:
        (sim, pair) = result
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID == pair[1]
        print(nameDict[similarMovieID] + "\t score: " + str(sim[0]) + "\t strength: " + str(sim[1]))

ValueError: invalid literal for int() with base 10: '-f'