In [1]:
from pyspark import SparkConf, SparkContext
import pandas as pd
import numpy as np
from math import sqrt
import pandas as pd
import time

In [2]:
sc = SparkContext.getOrCreate()
rating_data = sc.textFile("input/ml-100k/u.data")

rating_data_test, rating_data_train = rating_data.randomSplit(weights=[0.2, 0.8], seed=1)

In [3]:
def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), (int(data[1]), float(data[2])))
    
def create_sim_score(rating_data):

    def _filter_movies(line):
        movie1 = line[1][0]
        movie2 = line[1][1]

        return movie1 < movie2

    def _makePairs(line):
        (movie1, rating1) = line[1][0]
        (movie2, rating2) = line[1][1]

        return ((movie1, movie2), (rating1, rating2))

    def _computeCosineSimilarity(ratingPairs):
        numPairs = 0
        sum_xx = sum_yy = sum_xy = 0
        for ratingX, ratingY in ratingPairs:
            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1

        numerator = sum_xy
        denominator = sqrt(sum_xx) * sqrt(sum_yy)

        score = 0
        if (denominator):
            score = (numerator / (float(denominator)))

        return (score, numPairs)

    # rating_data = sc.textFile("ml-100k/u.data")
    user_rating_lists = rating_data.map(_extract_user_rating)
    join_lists = user_rating_lists.join(user_rating_lists)

    user_movies_list = join_lists.filter(_filter_movies)

    rating_pairs = user_movies_list.map(_makePairs)
    moviePairRatings= rating_pairs.groupByKey()
    moviePairSimilarities = moviePairRatings.mapValues(_computeCosineSimilarity).persist()
    moviePairSimilarities.sortByKey().saveAsPickleFile("input/movie-sims-obj2")

In [41]:
def load_sim_dict():
    sim_movie = sc.pickleFile("input/movie-sims-obj2/")
    sim_movie_local = sim_movie.collect()
    sim_dict = {}
    for sm in sim_movie_local:
        key = sm[0]
        value = sm[1]

        sim_dict[key] = value
    
    return sim_dict

def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), [(int(data[1]), float(data[2]))])

def _extract_movie_data(line):
    data = line.split('|')
    return (int(data[0]), data[1])

sim_dict = load_sim_dict()
movie_data = sc.textFile("input/ml-100k/u.item")
movie_dict = dict(movie_data.map(_extract_movie_data).collect())

user_lists = rating_data_train.map(_extract_user_rating).reduceByKey(lambda v1,v2: v1+v2).persist()

In [42]:
user_id = 816
predicted_movie = 660

_, user_ratings = user_lists.filter(lambda line: line[0]==user_id).collect()[0]

def rate_movie(user_ratings, predicted_movie):
    
    numerator = 0
    denominator = 0

    for movie_id, rating in user_ratings:
        if(predicted_movie < movie_id):
            m1 = predicted_movie
            m2 = movie_id
        else:
            m2 = predicted_movie
            m1 = movie_id
        
        if (m1, m2) in sim_dict:
            sim_score, number_of_record = sim_dict[(m1, m2)]
        else:
            sim_score, number_of_record = (0,0)

        numerator += sim_score*rating
        denominator += sim_score
    
    predicted_rating = numerator/denominator if denominator else 0
    
    return predicted_rating

print(rate_movie(user_ratings, predicted_movie))

2.790068438751714


In [43]:
user_id = 816
_, user_ratings = user_lists.filter(lambda line: line[0]==user_id).collect()[0]

predicted_ratings = []
for m_id in movie_dict.keys():
#     print(rate_movie(user_id, m_id))
    predicted_ratings += [(m_id,rate_movie(user_ratings, m_id))]

sorted_m_list = sorted(predicted_ratings, key=lambda x: x[1], reverse=True)

In [44]:
for m_id, rating in sorted_m_list:
    print(movie_dict[m_id], rating)

Charade (1963) 1111.2738734601771
Star Trek III: The Search for Spock (1984) 228.91757681128692
Kids (1995) 101.33170091687762
Tin Drum, The (Blechtrommel, Die) (1979) 93.45082825296775
Newton Boys, The (1998) 87.84071700709268
With Honors (1994) 74.76718865811971
Swan Princess, The (1994) 57.67507761225835
Gabbeh (1996) 56.942099919852346
My Man Godfrey (1936) 40.736953652868834
What's Eating Gilbert Grape (1993) 38.81539531161409
Two Much (1996) 33.79101215029578
Cool Runnings (1993) 28.81551059690039
Don Juan DeMarco (1995) 27.894805272060754
Kull the Conqueror (1997) 25.564454152824624
Vanya on 42nd Street (1994) 24.387628310366743
Braveheart (1995) 23.36066455744541
Dave (1993) 20.593892913359234
Ciao, Professore! (1993) 20.131795542740825
Free Willy 2: The Adventure Home (1995) 18.12475706116485
Last of the Mohicans, The (1992) 17.70949127180278
War Room, The (1993) 17.656147446450557
Lightning Jack (1994) 17.140207219638913
Koyaanisqatsi (1983) 16.757169645471297
Kama Sutra: A T

In [45]:
rating_data_test.map(_extract_user_rating).takeSample(False,5)

[(693, [(636, 1.0)]),
 (457, [(195, 5.0)]),
 (697, [(263, 1.0)]),
 (610, [(187, 4.0)]),
 (232, [(1149, 5.0)])]

In [46]:
test_set = rating_data_test.map(_extract_user_rating).reduceByKey(lambda v1,v2: v1 + v2).collect()
test_set

[(6,
  [(86, 3.0),
   (301, 2.0),
   (69, 3.0),
   (469, 5.0),
   (275, 4.0),
   (498, 4.0),
   (213, 4.0),
   (28, 2.0),
   (467, 4.0),
   (117, 2.0),
   (458, 1.0),
   (199, 4.0),
   (197, 5.0),
   (193, 3.0),
   (483, 5.0),
   (526, 3.0),
   (187, 4.0),
   (518, 3.0),
   (189, 3.0),
   (534, 4.0),
   (536, 4.0),
   (21, 3.0),
   (200, 3.0),
   (520, 4.0),
   (465, 1.0),
   (493, 5.0),
   (495, 4.0),
   (203, 3.0),
   (274, 4.0),
   (408, 4.0),
   (166, 4.0),
   (308, 3.0),
   (7, 2.0),
   (317, 3.0),
   (127, 5.0),
   (9, 4.0),
   (261, 3.0),
   (306, 4.0),
   (533, 4.0)]),
 (286,
  [(1014, 5.0),
   (357, 4.0),
   (44, 3.0),
   (34, 5.0),
   (721, 3.0),
   (689, 5.0),
   (151, 5.0),
   (158, 3.0),
   (100, 3.0),
   (432, 3.0),
   (642, 3.0),
   (419, 5.0),
   (704, 2.0),
   (683, 5.0),
   (89, 4.0),
   (107, 1.0),
   (1113, 3.0),
   (738, 5.0),
   (354, 4.0),
   (477, 3.0),
   (1411, 2.0),
   (168, 4.0),
   (289, 5.0),
   (554, 4.0),
   (881, 5.0),
   (53, 2.0),
   (161, 2.0),
   (3

In [47]:
predicted_ratings = []

for user_id, movie_rating in test_set:
    _, user_ratings = user_lists.filter(lambda line: line[0]==user_id).collect()[0]
    
    for m_id, rating in movie_rating:
        predicted_ratings += [(user_id, m_id,rate_movie(user_ratings, m_id), rating)]

In [48]:
predicted_ratings

[(6, 86, 4.975909921453239, 3.0),
 (6, 301, 4.579439195094018, 2.0),
 (6, 69, 4.766069361683235, 3.0),
 (6, 469, 4.410723573467039, 5.0),
 (6, 275, 4.473007875428568, 4.0),
 (6, 498, 5.275417784020356, 4.0),
 (6, 213, 4.796934131572793, 4.0),
 (6, 28, 4.161055476996883, 2.0),
 (6, 467, 4.074785203363831, 4.0),
 (6, 117, 6.266835493644071, 2.0),
 (6, 458, 4.302735810827519, 1.0),
 (6, 199, 4.531586366829542, 4.0),
 (6, 197, 4.636247570459653, 5.0),
 (6, 193, 4.536195001162025, 3.0),
 (6, 483, 4.440114733011767, 5.0),
 (6, 526, 4.636666427624758, 3.0),
 (6, 187, 4.393772066814703, 4.0),
 (6, 518, 4.179940343504595, 3.0),
 (6, 189, 4.538012678316828, 3.0),
 (6, 534, 4.112867414896236, 4.0),
 (6, 536, 10.000130113910222, 4.0),
 (6, 21, 4.1221203291777115, 3.0),
 (6, 200, 4.47432765349994, 3.0),
 (6, 520, 4.610373243062266, 4.0),
 (6, 465, 5.555745038285413, 1.0),
 (6, 493, 4.886661155420664, 5.0),
 (6, 495, 5.318444175750988, 4.0),
 (6, 203, 4.501798947394514, 3.0),
 (6, 274, 4.91719853830

# normalize

In [34]:
def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), float(data[2]))

avg_ratings = dict(rating_data_train.map(_extract_user_rating) \
            .aggregateByKey((0,0), lambda g, v: (g[0]+v, g[1]+1), lambda g1,g2: (g1[0]+g2[0], g1[1]+g2[1])) \
            .mapValues(lambda v: (v[0]/v[1])) \
            .collect())

avg_ratings_bc = sc.broadcast(avg_ratings)

In [40]:
def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), (int(data[1]), float(data[2])))

def _filter_movies(line):
    movie1 = line[1][0]
    movie2 = line[1][1]

    return movie1 < movie2

def _makePairs(line):
    user_id = line[0]
    (movie1, rating1) = line[1][0]
    (movie2, rating2) = line[1][1]
    avg_ratings = avg_ratings_bc.value[user_id]

    return ((movie1, movie2), (rating1 - avg_ratings, rating2 - avg_ratings))

def _computeCosineSimilarity(ratingPairs):


    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

user_rating_lists = rating_data_train.map(_extract_user_rating)
join_lists = user_rating_lists.join(user_rating_lists)
user_movies_list = join_lists.filter(_filter_movies)

rating_pairs = user_movies_list.map(_makePairs)

moviePairRatings= rating_pairs.groupByKey()
moviePairSimilarities = moviePairRatings.mapValues(_computeCosineSimilarity).persist()
moviePairSimilarities.sortByKey().saveAsPickleFile("input/movie-sims-obj2")

In [49]:
def load_sim_dict():
    sim_movie = sc.pickleFile("input/movie-sims-obj2/")
    sim_movie_local = sim_movie.collect()
    sim_dict = {}
    for sm in sim_movie_local:
        key = sm[0]
        value = sm[1]

        sim_dict[key] = value
    
    return sim_dict

def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), [(int(data[1]), float(data[2]))])

def _extract_movie_data(line):
    data = line.split('|')
    return (int(data[0]), data[1])

sim_dict = load_sim_dict()
movie_data = sc.textFile("input/ml-100k/u.item")
movie_dict = dict(movie_data.map(_extract_movie_data).collect())

user_lists = rating_data_train.map(_extract_user_rating).reduceByKey(lambda v1,v2: v1+v2).persist()

In [56]:
def rate_movie(user_ratings, predicted_movie):
    
    numerator = 0
    denominator = 0
    
    normalize_rating = {1:-1, 2:-0.5, 3:0, 4:0.5, 5:1}
#     denormalize_rating = {-1:1, -0.5:2, 0:3, 0.5:4, 1:5}

    for movie_id, rating in user_ratings:
        if(predicted_movie < movie_id):
            m1 = predicted_movie
            m2 = movie_id
        else:
            m2 = predicted_movie
            m1 = movie_id
        
        if (m1, m2) in sim_dict:
            sim_score, number_of_record = sim_dict[(m1, m2)]
        else:
            sim_score, number_of_record = (0,0)

        numerator += sim_score*normalize_rating[rating]
        denominator += sim_score
    
    predicted_rating = numerator/denominator if denominator else 0
    predicted_rating = 0.5*(predicted_rating+1)*4 + 1
    
    if predicted_rating > 5:
        predicted_rating = 5.0
    elif predicted_rating < 1:
        predicted_rating = 1.0
    
    return predicted_rating

In [57]:
user_id = 816
_, user_ratings = user_lists.filter(lambda line: line[0]==user_id).collect()[0]

predicted_ratings = []
for m_id in movie_dict.keys():
#     print(rate_movie(user_id, m_id))
    predicted_ratings += [(m_id,rate_movie(user_ratings, m_id))]

sorted_m_list = sorted(predicted_ratings, key=lambda x: x[1], reverse=True)

In [59]:
test_set = rating_data_test.map(_extract_user_rating).reduceByKey(lambda v1,v2: v1 + v2).collect()

In [60]:
predicted_ratings = []

for user_id, movie_rating in test_set:
    _, user_ratings = user_lists.filter(lambda line: line[0]==user_id).collect()[0]
    
    for m_id, rating in movie_rating:
        predicted_ratings += [(user_id, m_id,rate_movie(user_ratings, m_id), rating)]

In [61]:
predicted_ratings

[(6, 86, 4.97590992145324, 3.0),
 (6, 301, 4.57943919509402, 2.0),
 (6, 69, 4.766069361683236, 3.0),
 (6, 469, 4.410723573467037, 5.0),
 (6, 275, 4.473007875428566, 4.0),
 (6, 498, 5, 4.0),
 (6, 213, 4.796934131572795, 4.0),
 (6, 28, 4.161055476996883, 2.0),
 (6, 467, 4.074785203363829, 4.0),
 (6, 117, 5, 2.0),
 (6, 458, 4.3027358108275235, 1.0),
 (6, 199, 4.531586366829545, 4.0),
 (6, 197, 4.636247570459653, 5.0),
 (6, 193, 4.536195001162026, 3.0),
 (6, 483, 4.4401147330117645, 5.0),
 (6, 526, 4.636666427624757, 3.0),
 (6, 187, 4.3937720668147024, 4.0),
 (6, 518, 4.179940343504595, 3.0),
 (6, 189, 4.5380126783168295, 3.0),
 (6, 534, 4.112867414896236, 4.0),
 (6, 536, 5, 4.0),
 (6, 21, 4.12212032917771, 3.0),
 (6, 200, 4.474327653499941, 3.0),
 (6, 520, 4.610373243062266, 4.0),
 (6, 465, 5, 1.0),
 (6, 493, 4.886661155420668, 5.0),
 (6, 495, 5, 4.0),
 (6, 203, 4.501798947394516, 3.0),
 (6, 274, 4.917198538303646, 4.0),
 (6, 408, 4.449398773342452, 4.0),
 (6, 166, 4.221219046731982, 4.0)

# refacctor avg user

## broadcast

In [12]:
# def _extract_user_rating(line):
#     data = line.split('\t')
#     return (int(data[0]), float(data[2]))

# avg_ratings = dict(rating_data_train.map(_extract_user_rating) \
#             .aggregateByKey((0,0), lambda g, v: (g[0]+v, g[1]+1), lambda g1,g2: (g1[0]+g2[0], g1[1]+g2[1])) \
#             .mapValues(lambda v: (v[0]/v[1])) \
#             .collect())

avg_ratings = dict(rating_data_train.map(_extract_user_rating) \
            .aggregateByKey((0,0), lambda g, v: (g[0]+v[1], g[1]+1), lambda g1,g2: (g1[0]+g2[0], g1[1]+g2[1])) \
            .mapValues(lambda v: (v[0]/v[1])) \
            .collect())

avg_ratings_bc = sc.broadcast(avg_ratings)

def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), (int(data[1]), float(data[2])))

def _filter_movies(line):
    movie1 = line[1][0]
    movie2 = line[1][1]

    return movie1 < movie2

def _makePairs(line):
    user_id = line[0]
    (movie1, rating1) = line[1][0]
    (movie2, rating2) = line[1][1]
    avg_ratings = avg_ratings_bc.value[user_id]

    return ((movie1, movie2), (rating1 - avg_ratings, rating2 - avg_ratings))

def _computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

start_time = time.time()

user_rating_lists = rating_data_train.map(_extract_user_rating)
join_lists = user_rating_lists.join(user_rating_lists)
user_movies_list = join_lists.filter(_filter_movies)

rating_pairs = user_movies_list.map(_makePairs)

moviePairRatings= rating_pairs.groupByKey()
moviePairSimilarities = moviePairRatings.mapValues(_computeCosineSimilarity).takeSample(False, 10)#.persist()
# moviePairSimilarities.sortByKey().saveAsPickleFile("input/movie-sims-obj2")
# moviePairSimilarities.top(1)
elapsed_time = time.time() - start_time

In [13]:
print(elapsed_time)
moviePairSimilarities

50.226261615753174


[((853, 878), (0.1766011703007927, 3)),
 ((504, 1061), (-0.6554934893950407, 4)),
 ((246, 1663), (-1.0, 1)),
 ((476, 1109), (0.6666487471552207, 5)),
 ((1025, 1287), (0.9919728089215143, 2)),
 ((938, 1334), (1.0, 1)),
 ((232, 393), (0.3255053632375783, 48)),
 ((262, 443), (0.49770946418889783, 9)),
 ((51, 583), (0.43990043726983885, 13)),
 ((297, 512), (0.3535270807374065, 9))]

## 2 for

In [103]:
def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), (int(data[1]), float(data[2])))

def rate_substract_mean(v):
    rating_list = v[0]
    acc_rating = v[1]
    count_rating = v[2]
    user_mean = acc_rating / count_rating
    
    result = [(m_id, rating/user_mean) for m_id, rating in rating_list]
    
    return sorted(result)

def pair_movie(line):
    user_id = line[0]
    rating_list = line[1]
#     result = [(user_id, rating) for rating in rating_list]
    result = []
    for m_id1, rating1 in rating_list:
        for m_id2, rating2 in rating_list:
            if(m_id1 < m_id2):
                result += [((m_id1, m_id2), (rating1, rating2))]
    
    return result

def _computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

import time
start_time = time.time()

user_rating_lists = rating_data_train \
    .map(_extract_user_rating) \
    .aggregateByKey(([], 0, 0), lambda g1,v2: (g1[0]+[v2], g1[1]+v2[1], g1[2]+1), 
                    lambda g1,g2: (g1[0]+g2[0], g1[1]+g2[1], g1[2]+g2[2])) \
    .mapValues(rate_substract_mean) \
    .flatMap(pair_movie) \
    .groupByKey() \
    .mapValues(_computeCosineSimilarity).persist() \
    .takeSample(False, 5)
     
elapsed_time = time.time() - start_time

In [104]:
print(elapsed_time)
user_rating_lists

53.91998362541199


[((78, 403), (0.8147379692347996, 12)),
 ((694, 1257), (0.9455329232468954, 2)),
 ((451, 1064), (0.9900439089083598, 2)),
 ((21, 886), (0.9691323159912302, 4)),
 ((19, 505), (0.9832661797166811, 7))]

## no braodcast better

In [9]:
def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), (int(data[1]), float(data[2])))

def rate_substract_mean(line):
    k = line[0]
    v = line[1]
    rating_list = v[0]
    acc_rating = v[1]
    count_rating = v[2]
    user_mean = acc_rating / count_rating
    
    return [(k, (m_id, rating-user_mean)) for m_id, rating in rating_list]

def _computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

def _filter_movies(line):
    movie1 = line[1][0]
    movie2 = line[1][1]

    return movie1 < movie2

def _makePairs(line):
    user_id = line[0]
    (movie1, rating1) = line[1][0]
    (movie2, rating2) = line[1][1]

    return ((movie1, movie2), (rating1, rating2))

start_time = time.time()

user_rating_lists = rating_data_train \
    .map(_extract_user_rating) \
    .aggregateByKey(([], 0., 0.), lambda g1,v2: (g1[0]+[v2], g1[1]+v2[1], g1[2]+1), 
                    lambda g1,g2: (g1[0]+g2[0], g1[1]+g2[1], g1[2]+g2[2])) \
    .flatMap(rate_substract_mean) 

join_lists = user_rating_lists.join(user_rating_lists)
user_movies_list = join_lists.filter(_filter_movies) \
    .map(_makePairs) \
    .groupByKey() \
    .mapValues(_computeCosineSimilarity).persist() \
    .takeSample(False, 10)
     
elapsed_time = time.time() - start_time

In [10]:
print(elapsed_time)
user_movies_list

34.18951869010925


[((785, 964), (0.5858485858805809, 2)),
 ((1188, 1479), (0.5604789173262675, 3)),
 ((509, 1249), (0.561152731791037, 2)),
 ((493, 642), (0.10817719040428987, 15)),
 ((1209, 1438), (-0.027558014815909432, 2)),
 ((308, 1132), (0.8272495368153464, 2)),
 ((876, 1208), (0.9999999999999998, 2)),
 ((41, 65), (-0.023911781788736007, 13)),
 ((384, 556), (0.32270623900034434, 5)),
 ((1195, 1577), (1.0, 1))]

## with pandas

In [99]:
def _extract_user_rating(line):
    data = line.split('\t')
    return (int(data[0]), (int(data[1]), float(data[2])))

def rate_substract_mean(v):
    rating_list = v[0]
    acc_rating = v[1]
    count_rating = v[2]
    user_mean = acc_rating / count_rating
    
    result = [(m_id, rating/user_mean) for m_id, rating in rating_list]
    
    return sorted(result)

def pair_movie(line):
    user_id = line[0]
    rating_list = pd.DataFrame(line[1])
    rating_list['key'] = 1
    join_list = pd.merge(rating_list, rating_list, on='key')
    filtered_df = join_list[join_list['0_x'] < join_list['0_y']]
    result = filtered_df[['0_x', '0_y', '1_x', '1_y']].values
    
#     c[c['0_x'] > c['0_y']]
#     result = [(user_id, rating) for rating in rating_list]
#     result = []
#     for m_id1, rating1 in rating_list:
#         for m_id2, rating2 in rating_list:
#             if(m_id1 < m_id2):
#                 result += [((m_id1, m_id2), (rating1, rating2))]
    
    return result

def _computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

import time
start_time = time.time()

user_rating_lists = rating_data_train \
    .map(_extract_user_rating) \
    .aggregateByKey(([], 0, 0), lambda g1,v2: (g1[0]+[v2], g1[1]+v2[1], g1[2]+1), 
                    lambda g1,g2: (g1[0]+g2[0], g1[1]+g2[1], g1[2]+g2[2])) \
    .mapValues(rate_substract_mean) \
    .flatMap(pair_movie) \
    .map(lambda line: ( ( int(line[0]),int(line[1]) ), (line[2],line[3]) )) \
    .groupByKey() \
    .mapValues(_computeCosineSimilarity).persist() \
#     .takeSample(False, 5)
     
elapsed_time = time.time() - start_time

In [100]:
print(elapsed_time)
user_rating_lists
# rating_list = pd.DataFrame(user_rating_lists[0][1])
# rating_list['key'] = 1
# join_list = pd.merge(rating_list, rating_list, on='key')
# filtered_df = join_list[join_list['0_x'] < join_list['0_y']]
# filtered_df[['0_x', '0_y', '1_x', '1_y']].values

243.46224427223206


[((803, 1602), (1.0, 1)),
 ((354, 829), (0.91864784914159281, 4)),
 ((1194, 1444), (1.0, 1)),
 ((462, 1227), (0.8104599727530829, 4)),
 ((468, 480), (0.96364839049028872, 16))]