In [3]:
%%spark

In [4]:
from pyspark import *
import numpy as np
from itertools import permutations

In [5]:
small_ratings_raw_data = sc.textFile('550_finalPJ/ratings.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
.map(lambda line : line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2]))\
.map(lambda x: (int(x[0]), int(x[1]), float(x[2])))

In [9]:
small_movies_raw_data = sc.textFile('550_finalPJ/movies.csv')
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1]))\
.map(lambda x: (int(x[0]), x[1]))

#### ______Checking data set: 
#### small_ratings_data = (userID, movieID, rating)
#### small_movies_data = (movieID, movieName)

In [7]:
small_ratings_data.take(10)

[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0), (1, 47, 5.0), (1, 50, 5.0), (1, 70, 3.0), (1, 101, 5.0), (1, 110, 4.0), (1, 151, 5.0), (1, 157, 5.0)]

In [10]:
small_movies_data.take(10)

[(1, 'Toy Story (1995)'), (2, 'Jumanji (1995)'), (3, 'Grumpier Old Men (1995)'), (4, 'Waiting to Exhale (1995)'), (5, 'Father of the Bride Part II (1995)'), (6, 'Heat (1995)'), (7, 'Sabrina (1995)'), (8, 'Tom and Huck (1995)'), (9, 'Sudden Death (1995)'), (10, 'GoldenEye (1995)')]

## Split data into training set and test set: 8:2

In [11]:
training_RDD, test_RDD = small_ratings_data.randomSplit([8, 2], seed=0)
test_user_unwatch = test_RDD.map(lambda x: (x[0], x[1]))

#### ___Checking data set:
#### Numbers of training data and test data
#### test samples for prediction: (userID, unWatchedID)

In [14]:
Total_train = training_RDD.count()
Total_test = test_RDD.count()
print("The total number of training dataset is", Total_train)
print("The total number of test dataset is", Total_test)
print("Rate of training and test:", Total_train/Total_test)

The total number of training dataset is 80720
The total number of test dataset is 20116
Rate of training and test: 4.012726188108968

In [15]:
test_user_unwatch.take(10)

[(1, 70), (1, 101), (1, 110), (1, 151), (1, 216), (1, 316), (1, 333), (1, 356), (1, 367), (1, 500)]

## Normalize the rating of movie: subtract mean rating $m_{i}$ from each movie i

In [16]:
movie_means = training_RDD.map(lambda x: (x[1], (x[2], 1))).reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))\
.map(lambda x: (x[0], x[1][0]/x[1][1]))

In [18]:
normalized_ratings = training_RDD.map(lambda x: (x[1], (x[0], x[2]))).join(movie_means)\
.map(lambda x: (x[1][0][0], x[0], x[1][0][1] - x[1][1]))

#### ___Checking dataset:
#### movie_means: (movie, mean of ratings)
#### normalized_ratings: (userID, movieID, norm_rating)

In [17]:
movie_means.take(10)

[(6, 3.98125), (50, 4.2398843930635834), (260, 4.21875), (296, 4.242738589211618), (362, 3.3541666666666665), (480, 3.751322751322751), (552, 3.223404255319149), (590, 3.7934782608695654), (592, 3.462025316455696), (596, 3.5104166666666665)]

In [19]:
normalized_ratings.take(10)

[(328, 5060, 0.13235294117647056), (368, 5060, 0.13235294117647056), (372, 5060, -1.8676470588235294), (385, 5060, -0.8676470588235294), (387, 5060, 0.13235294117647056), (409, 5060, 0.13235294117647056), (414, 5060, 0.13235294117647056), (465, 5060, 1.1323529411764706), (469, 5060, 0.13235294117647056), (474, 5060, -0.8676470588235294)]

## Getting cosine similarity with Pearson correlation coefficient
### $S_{xy} = $ items rated by both user x and user y
### $m_{1}$ and $m_{2}$ are normalized movie ratings

<font size = "5">

$$
sim =  \frac{\sum_{s \in S_{xy} } m_{1} \times m_{2}}{ \sqrt{\sum_{s \in S_{xy} } m_{1}^{2}} \sqrt{\sum_{s \in S_{xy} } m_{2}^{2}} }
$$

</font>

In [20]:
def item_perm(line):
    perm = list(permutations(line, 2))
    return perm

In [28]:
cosine_sim = normalized_ratings.map(lambda x: (x[0], (x[1], x[2]))).groupByKey()\ # (user, list((movie, ratings)))
.flatMap(lambda x: item_perm(list(x[1])))\ # ((m1, r1), (m2, r2))
.map(lambda x: ((x[0][0], x[1][0]),(x[0][1]*x[1][1], x[0][1]**2, x[1][1]**2)))\ # ((m1, m2),(r1*r2, r1^2, r2^2))
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))\ #((m1, m2), (sum(r1*r2), sum(r1^2), sum(r2^2)))
.map(lambda x: (x[0], x[1][0]/ np.sqrt(x[1][1]) / np.sqrt(x[1][2]))).cache() # ((m1,m2), sim)

#### ____Checking dataset:
#### cosine_sim: (movie_1, movie_2, similarity)

In [29]:
cosine_sim.take(10)

[((1732, 1722), -0.15349541830097294), ((2288, 230), 1.0), ((2112, 3370), 1.0), ((3768, 858), 0.0), ((350, 736), 0.3633622554647428), ((266, 1616), 0.4172599427706273), ((2402, 3208), 1.0), ((3430, 1676), 1.0), ((555, 2347), -1.0), ((2803, 431), -0.5174798063860673)]

In [17]:
cosine_unit = movie_pairs.map(lambda x: (x[0], x[1][0][0]*x[1][0][1]/x[1][1][0]/x[1][1][1]))
cosine_dist = cosine_unit.reduceByKey(lambda x, y : x + y).cache()

In [18]:
cosine_dist.take(10)

[((5060, 934), 0.0009350619880827025), ((1676, 3006), 0.008327630580662328), ((1610, 208), 0.002095698252686468), ((2174, 2716), 0.006398772712746157), ((3210, 748), -0.0034375528847518993), ((3006, 1500), 0.004453708718907958), ((2302, 1204), 0.0032704276843571805), ((1690, 648), 0.011557282922293674), ((1754, 3032), 0.004933541706660469), ((3846, 736), 0.0019309767679504342)]

### get pairs in one user

In [10]:
def item_join(line):
    perm = list(permutations(line, 2))
    return perm

In [11]:
movie_pairs = normalized_ratings.map(lambda line: (line[0], (line[1], line[2], line[3]))).groupByKey()\
.map(lambda line: list(line[1])).flatMap(lambda line: item_join(line))\
.map(lambda line: ((line[0][0], line[1][0]),((line[0][1], line[1][1]), (line[0][2], line[1][2])))).cache()

In [12]:
movie_pairs.take(10)

[((1, 733), ((-0.9057142857142857, 1.33), (52.822817039608935, 37.73592452822641))), ((1, 1517), ((-0.9057142857142857, 0.44666666666666677), (52.822817039608935, 31.682013824881775))), ((1, 141), ((-0.9057142857142857, -0.4675324675324677), (52.822817039608935, 31.368774282716245))), ((1, 1405), ((-0.9057142857142857, 0.94), (52.822817039608935, 16.439282222773596))), ((1, 673), ((-0.9057142857142857, -1.9324324324324325), (52.822817039608935, 19.04599695474091))), ((1, 661), ((-0.9057142857142857, -0.37804878048780477), (52.822817039608935, 22.621892051727237))), ((1, 605), ((-0.9057142857142857, -2.0), (52.822817039608935, 9.40744386111339))), ((1, 1429), ((-0.9057142857142857, 1.2999999999999998), (52.822817039608935, 14.637281168304447))), ((1, 805), ((-0.9057142857142857, 0.3793103448275863), (52.822817039608935, 19.987496091306685))), ((1, 737), ((-0.9057142857142857, 2.3055555555555554), (52.822817039608935, 12.698425099200294)))]

### Get baseline estimation for user-item

In [13]:
ratings = training_RDD.map(lambda x: float(x[2]))
total = ratings.sum()
total_num = ratings.count()
global_mean = total/total_num

In [14]:
global_mean

3.502682111000991

In [15]:
movie_average = movie_mean_form.map(lambda line: (line[0], line[1][0]))
user_average = training_RDD.map(lambda x: (int(x[0]), (float(x[2]), 1)))\
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).map(lambda x: (x[0], x[1][0]/x[1][1]))

In [16]:
movie_ave_dict = {x[0]: x[1] for x in movie_average.collect()}
user_ave_dict = {x[0]: x[1] for x in user_average.collect()}

def baseLine(user, movie):
    if movie not in movie_ave_dict:
        return user_ave_dict[user]
    return user_ave_dict[user] + movie_ave_dict[movie] - global_mean

In [19]:
training_data = training_RDD.map(lambda line: ((int(line[0]), int(line[1])), float(line[2])))
movie_sim = cosine_dist.map(lambda x: (x[0][0], (x[0][1], x[1])))

In [20]:
test_data = test_for_predict_RDD.map(lambda x: (int(x[1]). int(x[0])))
training_data = training_RDD.map(lambda line: ((int(line[0]), int(line[1])), float(line[2])))
movie_sim = cosine_dist.map(lambda x: (x[0][0], (x[0][1], x[1])))
tmp = movie_sim.join(test_data).map(lambda x: ((x[1][1], x[1][0][0]), (x[0], x[1][0][1])))
predict_unit = training_data.join(tmp)\
.map(lambda x: ((x[0][0], x[1][1][0]), (x[1][1][1] * (x[1][0] - baseLine(x[0][0], x[0][1])), x[1][1][1])))
prediction = predict_unit.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))\
.map(lambda x: (x[0], baseLine(x[0][0], x[0][1]) + x[1][0]/x[1][1]) if x[1][1]!= 0 else (x[0], baseLine(x[0][0], x[0][1])))

In [21]:
test_value = test_RDD.map(lambda x: ((int(x[0]), int(x[1])), float(x[2])))
comparison = prediction.join(test_value).cache()

In [22]:
part_1 = comparison.map(lambda x: x[0])
part_2 = test_data.map(lambda x: (x[1], x[0])).subtract(part_1)\
.map(lambda x: (x, baseLine(x[0], x[1])))

In [23]:
part2_comparison = part_2.join(test_value)
Total_compar = comparison.union(part2_comparison)

In [24]:
Total_compar.take(10)

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 24.0 failed 4 times, most recent failure: Lost task 4.3 in stage 24.0 (TID 39, data2.cs.rutgers.edu, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/wl497/appcache/application_1554300167658_0048/container_e69_1554300167658_0048_01_000003/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/hadoop/yarn/local/usercache/wl497/appcache/application_1554300167658_0048/container_e69_1554300167658_0048_01_000003/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/hadoop/yarn/local/usercache/wl497/appcache/application_1554300167658_0048/container_e69_1554300167658_0048_01_000003/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itert

In [25]:
MAE_tmp = Total_compar.map(lambda x: abs(x[1][0] - x[1][1])).sum()
RMSE_tmp = Total_compar.map(lambda x: (x[1][0] - x[1][1])**2).sum()
num = Total_compar.count()
MAE = MAE_tmp/num
RMSE = np.sqrt(RMSE_tmp/num)
print("The MAE for the CF prediction is: ", MAE)
print("The RMSE for the CF prediction is: ", RMSE)

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 35.0 failed 4 times, most recent failure: Lost task 4.3 in stage 35.0 (TID 49, data2.cs.rutgers.edu, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop/yarn/local/usercache/wl497/appcache/application_1554300167658_0048/container_e69_1554300167658_0048_01_000003/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/hadoop/yarn/local/usercache/wl497/appcache/application_1554300167658_0048/container_e69_1554300167658_0048_01_000003/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/hadoop/yarn/local/usercache/wl497/appcache/application_1554300167658_0048/container_e69_1554300167658_0048_01_000003/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = l

In [36]:
def getUnwatched(user, movies):
    unwat_mov = list(movie_set - movies)
    unwat_user = [(x, user) for x in unwat_mov]
    return unwat_user

In [38]:
user_watched = training_RDD.map(lambda x: (int(x[0]), int(x[1]))).groupByKey()\
.map(lambda x: (x[0], set(x[1])))
movie_set = set(small_movies_data.map(lambda x: int(x[0])).collect())
unWatched_user = user_watched.flatMap(lambda x: getUnwatched(x[0], x[1]))

In [39]:
unWatched_user.take(10)

[(2, 328), (3, 328), (4, 328), (5, 328), (6, 328), (7, 328), (8, 328), (9, 328), (10, 328), (11, 328)]

In [41]:
#movie_sim.join(unWatched_user): (unwatched,((watched, sim), user)) ->
#sim_unWatched: ((user, watched),(unwatched, sim))
sim_unWatched = movie_sim.join(unWatched_user).map(lambda x: ((x[1][1], x[1][0][0]), (x[0], x[1][0][1])))
#rate_sim: ((user, watched),(rating, (unwatched, sim)))
rate_sim = training_data.join(sim_unWatched)
predict_unit_recom = rate_sim.map(lambda x: ((x[0][0], x[1][1][0]), (x[1][1][1] *(x[1][0] - baseLine(x[0][0], x[0][1])) , x[1][1][1])))
prediction_recom1 = predict_unit_recom.reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))\
.map(lambda x: (x[0], baseLine(x[0][0], x[0][1]) + x[1][0]/x[1][1]) if x[1][1]!=0 else (x[0], baseLine(x[0][0], x[0][1])))

In [54]:
def chooseTen(pairs):
    pairs.sort(key = lambda pair: pair[1], reverse = True)
    return pairs[:10]

In [43]:
top_ten = prediction_recom1.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey().map(lambda x: (x[0], list(x[1])))\
.map(lambda x: (x[0], chooseTen(x[1])))

In [57]:
prediction_recom1.take(10)

An error was encountered:
Invalid status code '404' from http://data-services2.cs.rutgers.edu:8999/sessions/994 with error payload: "Session '994' not found."


In [55]:
test_pairs = [(1,4.7), (10, 3.2), (123, 2.3), (26, 5.2)]
chooseTen(test_pairs)

[(26, 5.2), (1, 4.7)]

In [12]:
movie_numerator = movie_pairs.map(lambda x: (x[0], x[1][0]*x[1][1])).reduceByKey(lambda x, y: x + y)

In [13]:
movie_numerator.take(10)

KeyboardInterrupt: 

### get pairs for movies denominator

In [None]:
movie_module = training_RDD.map(lambda x: (int(x[1]), float(x[2])**2)).reduceByKey(lambda x, y: x+y)\
.map(lambda x: (x[0], np.sqrt(x[1])))

In [None]:
movie_mode_cart = movie_module.cartesian(movie_module).map(lambda x: ((x[0][0], x[1][0]), x[0][1]*x[1][0]))

In [None]:
cosine_dist = movie_numerator.join(movie_mode_cart).cache()

In [None]:
cosine_dist.take(10)

In [None]:
recom = cosine_dist.map(lambda x: (x[0], x[1][0]/x[1[1]])).map(lambda x: (x[1], x[0])).sortByKey()

In [None]:
recom.take(10)