# Movie Recommender

## Import the data

### Find one user or aggregate many users  
    1) find the most similar user or aggregate user
    2) identify that user's highly rated movies

### Metrics to judge similarity
        - based on aggregrate rating of a movie
        - based on what the rating
        - based on what genres they watched the most of
        - based on how frequently they rate movies
        - something with tags on the movie? Sentiment analysis?
        - scrape data from imdb for a critic's review
        - timestamps?

### Misc  
    - User liked a movie = T/F if their rating > aggregate movie rating
    - 

In [321]:
# for parsing the movies csv
import re
# for removing headers
from itertools import islice
# set up sparkcontext
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Movie Recommender")
sc = SparkContext(conf = conf) 

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Movie Recommender, master=local[*]) created by __init__ at <ipython-input-1-7ed28d649a1f>:4 

In [335]:
def replace_commas_in_quotes_in_csv(line):
    if "\"" in line:
        l = line.split("\"")
        if "," in l[1]:
            l[1] = l[1].replace(",","^")
        line = "".join(l)
    return line

In [340]:
# one line has a comma around the title, the other does not, notice the result of the comma inbetween the quotes
print(replace_commas_in_quotes_in_csv('11,"American President, The (1995)",Comedy|Drama|Romance'))
print(replace_commas_in_quotes_in_csv('11,American President, The (1995),Comedy|Drama|Romance'))

11,American President^ The (1995),Comedy|Drama|Romance
11,American President, The (1995),Comedy|Drama|Romance


In [342]:
#create movies RDD
moviesRDD = sc.textFile("Data/movies.csv")
moviesRDD = moviesRDD.map(replace_commas_in_quotes_in_csv)
moviesRDD = moviesRDD.map(lambda x: tuple(x.split(',')))
moviesRDD = moviesRDD.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
#convert datatypes
moviesRDD = moviesRDD.map(lambda x: (int(x[0]), x[1], x[2]))
moviesRDD.take(20)

[(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 (2, 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 (3, 'Grumpier Old Men (1995)', 'Comedy|Romance'),
 (4, 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance'),
 (5, 'Father of the Bride Part II (1995)', 'Comedy'),
 (6, 'Heat (1995)', 'Action|Crime|Thriller'),
 (7, 'Sabrina (1995)', 'Comedy|Romance'),
 (8, 'Tom and Huck (1995)', 'Adventure|Children'),
 (9, 'Sudden Death (1995)', 'Action'),
 (10, 'GoldenEye (1995)', 'Action|Adventure|Thriller'),
 (11, 'American President^ The (1995)', 'Comedy|Drama|Romance'),
 (12, 'Dracula: Dead and Loving It (1995)', 'Comedy|Horror'),
 (13, 'Balto (1995)', 'Adventure|Animation|Children'),
 (14, 'Nixon (1995)', 'Drama'),
 (15, 'Cutthroat Island (1995)', 'Action|Adventure|Romance'),
 (16, 'Casino (1995)', 'Crime|Drama'),
 (17, 'Sense and Sensibility (1995)', 'Drama|Romance'),
 (18, 'Four Rooms (1995)', 'Comedy'),
 (19, 'Ace Ventura: When Nature Calls (1995)', 'Comedy'),
 (20, 'Mo

In [343]:
# create ratings RDDs
ratingsRDD = sc.textFile("Data/ratings.csv")
ratingsRDD = ratingsRDD.map(lambda x: tuple(x.split(',')))
ratingsRDD = ratingsRDD.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
# convert datatypes in RDD
ratingsRDD = ratingsRDD.map(lambda x: (int(x[0]), int(x[1]), float(x[2]), int(x[3])))
ratingsRDD.take(5)

[(1, 31, 2.5, 1260759144),
 (1, 1029, 3.0, 1260759179),
 (1, 1061, 3.0, 1260759182),
 (1, 1129, 2.0, 1260759185),
 (1, 1172, 4.0, 1260759205)]

In [344]:
# create key-value pairs of movieid and user rating
averagemovierating = ratingsRDD.map(lambda x: (x[1], x[2]))

In [345]:
averagemovierating.take(5)

[(31, 2.5), (1029, 3.0), (1061, 3.0), (1129, 2.0), (1172, 4.0)]

In [346]:
# create a tuple of (movieid, (sumofratings, numberof ratings))
averagemovierating = averagemovierating.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [347]:
# reduced the previous data frame to (movieid, averagemovierating)
averagemovierating = averagemovierating.map(lambda x: (x[0], x[1][0]/x[1][1]))

In [348]:
#converted to kev-value pairs
ratingsbymovieid = ratingsRDD.map(lambda x: (x[1], x))

In [349]:
# joined with the average movie rating on the movie id key
ratingsbymovieid = ratingsbymovieid.join(averagemovierating)

In [350]:
# fixed join formatting issues
ratingsbymovieid = ratingsbymovieid.map(lambda x: (x[0], (x[1][0][0], x[1][0][1], x[1][0][2], x[1][0][3], x[1][1])))

In [351]:
# added in "liked" data - if user rating > than average movie ratin, the value is true
ratingsbymovieid = ratingsbymovieid.map(lambda x: (x[0], (x[1][0], x[1][1], x[1][2], x[1][3], x[1][4], x[1][2]>x[1][4])))

In [352]:
ratingsbymovieid.take(5)

[(1172, (1, 1172, 4.0, 1260759205, 4.260869565217392, False)),
 (1172, (23, 1172, 5.0, 1148670101, 4.260869565217392, True)),
 (1172, (38, 1172, 4.5, 1389867840, 4.260869565217392, True)),
 (1172, (56, 1172, 2.0, 1470350810, 4.260869565217392, False)),
 (1172, (94, 1172, 3.5, 1291781459, 4.260869565217392, False))]

In [353]:
# update ratingsRDD to reflect new data
ratingsRDD = ratingsbymovieid.map(lambda x: (x[1][0], x[1][1], x[1][2], x[1][3], x[1][4], x[1][5]))

In [354]:
likedmovies = ratingsRDD.filter(lambda x: x[5])

In [355]:
userslikedmovies = likedmovies.map(lambda x: (x[1], x[0]))

In [356]:
moviesusersliked = userslikedmovies.groupByKey().map(lambda x: (x[0], tuple(x[1])))

In [357]:
moviesusersliked.take(5)

[(1172,
  (23,
   38,
   133,
   148,
   229,
   280,
   320,
   321,
   330,
   358,
   373,
   387,
   391,
   430,
   441,
   481,
   497,
   510,
   521,
   537,
   539,
   545,
   547,
   585,
   587)),
 (2968,
  (4,
   49,
   77,
   102,
   105,
   118,
   119,
   130,
   229,
   232,
   282,
   299,
   346,
   380,
   423,
   472,
   480,
   518,
   525,
   575,
   577,
   586,
   613,
   624,
   654)),
 (52,
  (30,
   70,
   87,
   102,
   105,
   148,
   154,
   162,
   211,
   224,
   242,
   256,
   306,
   344,
   358,
   361,
   383,
   387,
   406,
   407,
   434,
   451,
   463,
   502,
   509,
   514,
   529,
   548,
   564,
   587,
   614,
   641)),
 (144, (36, 242, 358, 391, 407, 420, 472, 509, 536, 555, 667)),
 (168, (110, 128, 174, 182, 213, 389, 416, 496, 534, 619, 665))]

In [358]:
def similar_users(themoviesusersliked):
    l = []
    for i in range(len(themoviesusersliked[1])):
        for j in range(len(themoviesusersliked[1])):
            if j>i:
                l.append((themoviesusersliked[1][i], themoviesusersliked[1][j]))
    return tuple(l)

In [359]:
#combinations of users that liked the same movie
useraffinity = moviesusersliked.map(similar_users)

In [360]:
useraffinity.take(5)

[((23, 38),
  (23, 133),
  (23, 148),
  (23, 229),
  (23, 280),
  (23, 320),
  (23, 321),
  (23, 330),
  (23, 358),
  (23, 373),
  (23, 387),
  (23, 391),
  (23, 430),
  (23, 441),
  (23, 481),
  (23, 497),
  (23, 510),
  (23, 521),
  (23, 537),
  (23, 539),
  (23, 545),
  (23, 547),
  (23, 585),
  (23, 587),
  (38, 133),
  (38, 148),
  (38, 229),
  (38, 280),
  (38, 320),
  (38, 321),
  (38, 330),
  (38, 358),
  (38, 373),
  (38, 387),
  (38, 391),
  (38, 430),
  (38, 441),
  (38, 481),
  (38, 497),
  (38, 510),
  (38, 521),
  (38, 537),
  (38, 539),
  (38, 545),
  (38, 547),
  (38, 585),
  (38, 587),
  (133, 148),
  (133, 229),
  (133, 280),
  (133, 320),
  (133, 321),
  (133, 330),
  (133, 358),
  (133, 373),
  (133, 387),
  (133, 391),
  (133, 430),
  (133, 441),
  (133, 481),
  (133, 497),
  (133, 510),
  (133, 521),
  (133, 537),
  (133, 539),
  (133, 545),
  (133, 547),
  (133, 585),
  (133, 587),
  (148, 229),
  (148, 280),
  (148, 320),
  (148, 321),
  (148, 330),
  (148, 358)

In [361]:
#flatten
useraffinity = useraffinity.flatMap(lambda xs: [(x[0], x[1]) for x in xs])

In [362]:
useraffinity.take(5)

[(23, 38), (23, 133), (23, 148), (23, 229), (23, 280)]

In [363]:
#ensure that the tuples are in (smallernumber, largernumber) form and count the pair
useraffinity = useraffinity.map(lambda x: ((x[1], x[0]), 1) if x[0]>x[1] else (x,1))

In [364]:
useraffinity.take(5)

[((23, 38), 1), ((23, 133), 1), ((23, 148), 1), ((23, 229), 1), ((23, 280), 1)]

In [365]:
#find the count of each tuple pair
useraffinity = useraffinity.reduceByKey(lambda x,y: x+y)

In [366]:
useraffinity.take(50)

[((23, 387), 45),
 ((23, 391), 15),
 ((23, 539), 1),
 ((23, 547), 131),
 ((23, 587), 78),
 ((38, 148), 5),
 ((38, 280), 2),
 ((38, 320), 5),
 ((133, 229), 2),
 ((133, 321), 1),
 ((133, 373), 1),
 ((133, 441), 2),
 ((133, 481), 7),
 ((133, 497), 1),
 ((133, 521), 2),
 ((133, 537), 2),
 ((133, 545), 2),
 ((133, 585), 2),
 ((148, 330), 3),
 ((148, 358), 18),
 ((148, 430), 33),
 ((148, 510), 2),
 ((229, 321), 1),
 ((229, 373), 3),
 ((229, 441), 3),
 ((229, 481), 4),
 ((229, 497), 1),
 ((229, 521), 3),
 ((229, 537), 6),
 ((229, 545), 2),
 ((229, 585), 9),
 ((280, 330), 2),
 ((280, 358), 7),
 ((280, 430), 4),
 ((280, 510), 4),
 ((320, 330), 1),
 ((320, 358), 4),
 ((320, 430), 7),
 ((320, 510), 3),
 ((321, 373), 7),
 ((321, 441), 5),
 ((321, 481), 4),
 ((321, 497), 3),
 ((321, 521), 2),
 ((321, 537), 10),
 ((321, 545), 5),
 ((321, 585), 15),
 ((373, 441), 7),
 ((373, 481), 16),
 ((373, 497), 19)]

In [369]:
def recommendation(userid):
    specificaffinity = useraffinity.filter(lambda x: userid in x[0])
    ((user1,user2), _) = specificaffinity.reduce(lambda x,y: x if x[1]>=y[1] else y)
    if userid == user1:
        affinityuser = user2
    elif userid == user2:
        affinityuser = user1
    affinityuserratings = ratingsRDD.filter(lambda x: x[0] == affinityuser)
    top5 = affinityuserratings.top(5, lambda x: (x[2], x[4]))
    movienames = moviesRDD.filter(lambda x: x[0] in map(lambda x: x[1], top5)).map(lambda x: x[1] if "^" not in x[1] else x[1].replace("^",",")).collect()
    recommendation_greeting = "We recommend that user " + str(userid) + " watches:\n"
    for i in range(len(movienames)):
        recommendation_greeting += str(i+1)+") " + movienames[i] + "\n"
    print(recommendation_greeting)

In [370]:
recommendation(3)

We recommend that user 3 watches:
1) Shawshank Redemption, The (1994)
2) Godfather, The (1972)
3) Godfather: Part II, The (1974)
4) City of God (Cidade de Deus) (2002)
5) Gladiator (1992)

