In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from itertools import combinations
import sys, time, operator

config = SparkConf() \
    .set('spark.executor.memory', '4g') \
    .set('spark.driver.memory', '4g')
sc = SparkContext(conf=config).getOrCreate()


In [2]:
#task2 case 1
time_case1 = time.time()
# data = sc.textFile('/Users/haopengsong/Downloads/small.csv')
# testdata = sc.textFile('/Users/haopengsong/Downloads/small_test.csv')

# data = sc.textFile('/Users/haopengsong/Downloads/small_item.csv')
# testdata = sc.textFile('/Users/haopengsong/Downloads/small_item_test.csv')

data = sc.textFile('/Users/haopengsong/Downloads/yelp_train.csv')
testdata = sc.textFile('/Users/haopengsong/Downloads/yelp_val.csv')

In [3]:
data = data.map(lambda line : line.split(','))
firstLine = data.take(1)
data = data.filter(lambda x : x != firstLine[0]).map(lambda x : (x[0].strip(), x[1].strip(), x[2].strip()))

In [4]:
firstLine

[['user_id', ' business_id', ' stars']]

In [5]:
data.take(5)


[('vxR_YV0atFxIxfOnF9uHjQ', 'gTw6PENNGl68ZPUpYWP50A', '5.0'),
 ('o0p-iTC5yTBV5Yab_7es4g', 'iAuOpYDfOTuzQ6OPpEiGwA', '4.0'),
 ('-qj9ouN0bzMXz1vfEslG-A', '5j7BnXXvlS69uLVHrY9Upw', '2.0'),
 ('E43QxgV87Ij6KxMCHcijKw', 'jUYp798M93Mpcjys_TTgsQ', '5.0'),
 ('T13IBpJITI32a1k41rc-tg', '3MntE_HWbNNoyiLGxywjYA', '5.0')]

In [100]:
user_index = data.map(lambda x : x[0].strip()).distinct().zipWithIndex().collectAsMap()

index_user = {}

for k, v in user_index.items():
    
    index_user[v] = k
    
len(index_user)

11270

In [75]:
business_index = data.map(lambda x : x[1].strip()).distinct().zipWithIndex().collectAsMap()

index_business = {}

for k, v in business_index.items():
    
    index_business[v] = k
    

In [8]:
ratings = data.map(lambda x : (int(user_index[x[0]]), int(business_index[x[1]]), float(x[2])))

In [9]:
ratings.take(5)

[(5626, 12261, 5.0),
 (0, 12262, 4.0),
 (1, 12263, 2.0),
 (5627, 12264, 5.0),
 (5628, 0, 5.0)]

In [10]:
data.count()

455854

In [11]:
#orig 8 8
features = 2
iterations = 10
model = ALS.train(ratings, features, iterations, seed=10)

In [12]:
testdata.take(5)

['user_id, business_id, stars',
 'wf1GqnKQuvH-V3QN80UOOQ,fThrN4tfupIGetkrz18JOg,5.0',
 '39FT2Ui8KUXwmUt6hnwy-g,uW6UHfONAmm8QttPkbMewQ,5.0',
 '7weuSPSSqYLUFga6IYP4pg,IhNASEZ3XnBHmuuVnWdIwA,4.0',
 'CqaIzLiWaa-lMFYBAsYQxw,G859H6xfAmVLxbzQgipuoA,5.0']

In [13]:
testdata = testdata.map(lambda line : line.split(','))

In [14]:
testdata_firstline = testdata.take(1)

In [15]:
testdata_firstline

[['user_id', ' business_id', ' stars']]

In [16]:
def cut_testdata(x):
    rating = float(x[2].strip())
    if rating > 5:
        rating = 5
    elif rating < 1:
        rating = 1
    return (x[0].strip(), x[1].strip(), rating)
testdata = testdata.filter(lambda x : x != testdata_firstline[0]) \
                    .map(lambda x : (x[0].strip(), x[1].strip(), float(x[2].strip())))

In [79]:
testdata.distinct().count()

142044

In [17]:
testdata.take(5)

[('wf1GqnKQuvH-V3QN80UOOQ', 'fThrN4tfupIGetkrz18JOg', 5.0),
 ('39FT2Ui8KUXwmUt6hnwy-g', 'uW6UHfONAmm8QttPkbMewQ', 5.0),
 ('7weuSPSSqYLUFga6IYP4pg', 'IhNASEZ3XnBHmuuVnWdIwA', 4.0),
 ('CqaIzLiWaa-lMFYBAsYQxw', 'G859H6xfAmVLxbzQgipuoA', 5.0),
 ('yy7shAsNWRbGg-8Y67Dzag', 'rS39YnrhoXmPqHLzCBjeqw', 3.0)]

In [18]:
#data entries that are not in training dataset
def addition_entry(x):
    if x[0] not in user_index:
        return (x[0], x[1], x[2])
    elif x[1] not in business_index:
        return (x[0], x[1], x[2])
    return None

additional_entry = testdata.map(addition_entry).filter(lambda x : x != None)

In [19]:
#assign ratings for additional entries using avg rating over all addition entries
num_additional_entry = additional_entry.distinct().count()
avg_additional_entry_rating = 0
if num_additional_entry > 0:
    avg_additional_entry_rating = additional_entry.map(lambda x : float(x[2])) \
                                                .sum() / num_additional_entry

In [20]:
avg_additional_entry_rating

3.8403908794788273

In [21]:
#additional entry with diff rating
additional_entry_avg = additional_entry \
                            .map(lambda x : ((x[0], x[1]), avg_additional_entry_rating))

additional_entry_orig = additional_entry \
                            .map(lambda x : ((x[0], x[1]), x[2]))
additional_entry_orig.take(3)

[(('7fZu8ud7JXFthU0jPxVf4g', 'yFumR3CWzpfvTH2FCthvVw'), 3.0),
 (('I3E0WQMyDhkvfrLHVot00w', '7y4Pxm63yetTE0sMV9c1cw'), 2.0),
 (('DRDjrfODfL0a1E_NB6YKAw', 'igLKtZ9aDT8911U1xZcuSg'), 4.0)]

In [22]:
additional_entry_avg.take(3)

[(('7fZu8ud7JXFthU0jPxVf4g', 'yFumR3CWzpfvTH2FCthvVw'), 3.8403908794788273),
 (('I3E0WQMyDhkvfrLHVot00w', '7y4Pxm63yetTE0sMV9c1cw'), 3.8403908794788273),
 (('DRDjrfODfL0a1E_NB6YKAw', 'igLKtZ9aDT8911U1xZcuSg'), 3.8403908794788273)]

In [23]:
def output_testset(x):
    if x[0] in user_index and x[1] in business_index:
        return (int(user_index[x[0]]), int(business_index[x[1]]), float(x[2]))
testdata_case12 = testdata.map(output_testset).filter(lambda x : x != None)
testset_ratings_case12 = testdata_case12 \
                            .map(lambda x : (x[0], x[1], x[2]))
#testset_ratings_case12.filter(lambda x : x[2] > 5).take(5)


In [80]:
testset_ratings_case12.count()

141737

In [24]:
testdata.count()

142044

In [25]:
testdata_case12.count()

141737

In [26]:
testset_case12 = testdata_case12.map(lambda x : (x[0], x[1]))

In [27]:
testset_case12.take(1)

[(8731, 2541)]

In [28]:
testset_ratings_case12.take(1)

[(8731, 2541, 5.0)]

In [29]:
preds_case1 = model.predictAll(testset_case12).map(lambda x : ((x[0], x[1]), x[2]))

In [30]:
preds_case1.take(1)

[((152, 4904), 3.4365826913679953)]

In [31]:
#put addition entry back to preds with avg ratings
preds_case1.count()

141737

In [32]:
preds_case1 = preds_case1.union(additional_entry_avg)

In [81]:
r1 = preds_case1.collect()

output_r1 = []

for pair in r1:
    
    u1 = pair[0][0]
    
    if u1 in index_user:
    
        u1 = index_user[pair[0][0]]
    
    b1 = pair[0][1]
    
    if b1 in index_business:
        
        b1 = index_business[pair[0][1]]

    ra = pair[1]
    
    output_r1.append(((u1, b1), ra))
    
len(output_r1)

142044

In [78]:
for i in r1:
    
    if isinstance( i[0][0] , str ):
        
        print(i)

(('7fZu8ud7JXFthU0jPxVf4g', 'yFumR3CWzpfvTH2FCthvVw'), 3.8403908794788273)
(('I3E0WQMyDhkvfrLHVot00w', '7y4Pxm63yetTE0sMV9c1cw'), 3.8403908794788273)
(('DRDjrfODfL0a1E_NB6YKAw', 'igLKtZ9aDT8911U1xZcuSg'), 3.8403908794788273)
(('xN-xoLhTHUFfTS_BMGG0xg', 'SVXpyYPAuvJVKcfZ0nMKyg'), 3.8403908794788273)
(('apwx9mVErPHhQgxFnlvcjA', '9YpgT2Jt6UNKVAZlb_2yFQ'), 3.8403908794788273)
(('Ntq_6n5SpkZGmNHXnEmlNQ', 'mjjlXI5Hd3Ht0amDFXMfBg'), 3.8403908794788273)
(('iNw_rn2m88F-WjNQmHhNJg', 'SMrdfbvErYWLXfc44FwKJg'), 3.8403908794788273)
(('ZcFB7xOpJufgX9hHPFh9mQ', 'HodEoYNU4qHzUD2MESDWhw'), 3.8403908794788273)
(('fnNSkK1yp5_Th48_gDkj_g', '8TnEzVYiQrBH7FDKya3RZQ'), 3.8403908794788273)
(('5I6-yegWr4p1mtUMwNkVtg', '6aJKlkjSkxnFydVWERSjCQ'), 3.8403908794788273)
(('3KEHmthOP_mSk40zJ4gANA', 'fHwWQLQX8IJe3ZNGmCdgfw'), 3.8403908794788273)
(('qtWYsp3Fagzr-yWtJvO8JQ', 'QT--1yBePDPLCNJ6fmQOng'), 3.8403908794788273)
(('k5XFyzSbP1yW2nXkrh6ZCw', 'Ps5II15c_LIUnj1zb53Bbg'), 3.8403908794788273)
(('BjTabf62vN0c35MsUSINJA

In [33]:
preds_case1.count()

142044

In [34]:
predsCompare_case1 = testset_ratings_case12 \
                    .map(lambda x : ((x[0], x[1]), x[2])).union(additional_entry_orig) \
                    .join(preds_case1)

In [35]:
case1RMSE = pow(predsCompare_case1.map(lambda x : pow(x[1][0] - x[1][1], 2)).mean() , 1/2)


In [36]:
case1RMSE

1.2371592328937924

In [37]:
#model.predict(8731, 2541)

In [38]:
print('task2 case 1 finish time: ' + str(time.time() - time_case1))

task2 case 1 finish time: 25.15339207649231


In [58]:
#case 2

user_business_map = data.map(lambda x : (int(user_index[x[0]]), [int(business_index[x[1]])])) \
                        .reduceByKey(lambda a, b : a + b) \
                        .map(lambda e : (e[0], set(e[1]))) \
                        .collectAsMap()


In [59]:
user_num_business_map = data.map(lambda x : (int(user_index[x[0]]), [int(business_index[x[1]])])) \
                        .reduceByKey(lambda a, b : a + b) \
                        .map(lambda e : (e[0], set(e[1]))) \
                        .map(lambda x : (x[0], len(x[1]))) \
                        .sortBy(lambda x : -x[1]) \
                        .collect()
user_num_business_map

[(5709, 1062),
 (5782, 1032),
 (13, 813),
 (9, 572),
 (5781, 496),
 (5983, 480),
 (5657, 460),
 (5857, 418),
 (437, 408),
 (5904, 402),
 (6460, 375),
 (604, 374),
 (601, 369),
 (1037, 359),
 (5919, 358),
 (6485, 348),
 (7148, 331),
 (6491, 331),
 (99, 329),
 (259, 328),
 (6076, 325),
 (218, 313),
 (6770, 309),
 (6761, 306),
 (5630, 304),
 (5876, 304),
 (6077, 301),
 (93, 298),
 (1069, 297),
 (1246, 294),
 (612, 293),
 (51, 293),
 (6701, 292),
 (6064, 286),
 (498, 286),
 (4, 285),
 (8324, 282),
 (5645, 282),
 (539, 280),
 (755, 275),
 (6169, 274),
 (6499, 273),
 (5681, 271),
 (275, 271),
 (5911, 269),
 (223, 267),
 (6793, 265),
 (6180, 264),
 (7, 264),
 (440, 262),
 (6526, 261),
 (7044, 261),
 (7812, 261),
 (1315, 261),
 (5977, 258),
 (5947, 256),
 (6771, 254),
 (5746, 252),
 (120, 252),
 (5817, 250),
 (6327, 250),
 (643, 249),
 (6430, 246),
 (52, 240),
 (339, 240),
 (7642, 239),
 (492, 238),
 (7151, 237),
 (5687, 235),
 (7464, 234),
 (135, 233),
 (756, 231),
 (288, 230),
 (5918, 230),


In [40]:
# def userWithNumBus(x):
#     res = []
#     for u in x:
#         res.append((u, user_num_business_map[u]))
#     res.sort(key = lambda x : x[1], reverse = True)
#     return res
business_user_map = data.map(lambda x : (int(business_index[x[1]]), [int(user_index[x[0]])])) \
                        .reduceByKey(lambda a, b : a + b) \
                        .map(lambda e : (e[0], set(e[1]))) \
                        .collectAsMap()


In [61]:
business_num_user = data.map(lambda x : (int(business_index[x[1]]), [int(user_index[x[0]])])) \
                        .reduceByKey(lambda a, b : a + b) \
                        .map(lambda e : (e[0], set(e[1]))) \
                        .map(lambda x : (x[0], len(x[1]))) \
                        .sortBy(lambda x : -x[1]) \
                        .collect()
business_num_user

[(466, 556),
 (78, 528),
 (13264, 505),
 (13071, 490),
 (14049, 459),
 (478, 438),
 (136, 381),
 (453, 364),
 (255, 345),
 (10, 338),
 (12561, 336),
 (12442, 329),
 (12592, 323),
 (12408, 314),
 (12752, 302),
 (12409, 299),
 (13134, 298),
 (12582, 294),
 (228, 291),
 (264, 291),
 (31, 285),
 (1214, 279),
 (12404, 274),
 (12365, 268),
 (269, 268),
 (79, 267),
 (1543, 267),
 (12324, 260),
 (12872, 260),
 (155, 256),
 (12776, 254),
 (985, 253),
 (13384, 247),
 (24, 246),
 (13562, 245),
 (1110, 243),
 (13787, 243),
 (86, 240),
 (754, 240),
 (13069, 240),
 (12363, 237),
 (13449, 236),
 (12593, 234),
 (12322, 232),
 (120, 232),
 (737, 231),
 (201, 230),
 (13421, 229),
 (28, 228),
 (13173, 226),
 (12918, 225),
 (13548, 223),
 (397, 221),
 (13137, 220),
 (12485, 219),
 (887, 218),
 (14083, 213),
 (12321, 212),
 (12932, 211),
 (916, 211),
 (1174, 211),
 (12714, 210),
 (12730, 210),
 (14462, 210),
 (13981, 210),
 (964, 209),
 (393, 209),
 (12838, 208),
 (292, 207),
 (12261, 205),
 (12649, 205),


In [41]:
def map_business_rating(x):
    business_rating = {}
    for tup in x:
        business_rating[tup[0]] = tup[1]
    return business_rating

user_business_rating_map = data \
                            .map(lambda x : (int(user_index[x[0]]), [(int(business_index[x[1]]), float(x[2]))])) \
                            .reduceByKey(lambda a, b : a + b) \
                            .mapValues(map_business_rating) \
                            .collectAsMap()


In [42]:
testset_case12.take(5)

[(8731, 2541), (2171, 1110), (5680, 12981), (9667, 3071), (8954, 16476)]

In [90]:
#cut result
def rearrange_result(x):
    if x > 5:
        x = 5    
    return x

def cos_sim(x, u, y, v, cos):
    
    inter = x.intersection(y)
    
    unio = x.union(y)
    
    result = 0
    
    if len(inter) > 0:
        
        numerator = [user_business_rating_map[u][i] * user_business_rating_map[v][i] for i in inter]
 
        denominator_u = [user_business_rating_map[u][i] ** 2 for i in x]
    
        denominator_v = [user_business_rating_map[v][i] ** 2 for i in y]
        
        if denominator_u != 0 and denominator_v != 0:
            
            result = sum(numerator) / (( sum(denominator_u) ** (1/2)) * ( sum(denominator_v) ** (1/2)))
            
            cos[frozenset({u, v})] = result
            
            return result
        
    cos[frozenset({u, v})] = result
    
    return result

def train_case2(x):
    
    user_train = x[0]
    
    business_train = x[1]
    
    #all users who have rated business_train, list
    users_who_rated_business = business_user_map[business_train]
    
    #if user_train has already rated business_train, return the rating
    if user_train in users_who_rated_business:
        return ((x[0], x[1]) ,user_business_rating_map[user_train][business_train])
    
    #otherwise
    #calculate pearson 
    #average rating on co-rated business
    #businesses that were rated by user_train
    user_train_rated_businesses = user_business_map[user_train]
    
    user_train_ratings = [user_business_rating_map[user_train][i] for i in user_train_rated_businesses]
    
    user_train_ratings_avg = sum(user_train_ratings) / float(len(user_train_ratings))
    
    pearson = {}
    
    for u in users_who_rated_business:
        
        #businesses that rated by u
        u_rated_businesses = user_business_map[u]
        
        u_ratings = [user_business_rating_map[u][i] for i in u_rated_businesses]
        
        u_ratings_avg = sum(u_ratings) / float(len(u_ratings))
        
        #co-rated businesses between u and user_train
        inter = u_rated_businesses.intersection(user_train_rated_businesses)

        if len(inter) > 0:

            dot_product = 0.0
            
            distance_to_avg_u = 0.0
            
            distance_to_avg_user_train = 0.0
                        
            for business in inter:
                
                coeff_u = float( user_business_rating_map[u][business] - u_ratings_avg )
                
                coeff_user_train = float( user_business_rating_map[user_train][business] - user_train_ratings_avg )
                
                dot_product += (coeff_u) * (coeff_user_train)
                
                distance_to_avg_u += ((coeff_u ** 2))
                
                distance_to_avg_user_train += ((coeff_user_train ** 2))

            pearson_u_user_train = 0.0
            
            if dot_product == 0 or distance_to_avg_u == 0 or distance_to_avg_user_train == 0:
                
                pearson_u_user_train = 0.0
                
            else:
                
                pearson_u_user_train = dot_product / ((distance_to_avg_u ** (1/2)) * (distance_to_avg_user_train ** (1/2)))
            
            pearson[(u, user_train)] = pearson_u_user_train
            
    #pick top N user
    pearson_total = 0.0
    
    numerator = 0.0
    
    N = 13
    
    pearson_sorted = sorted(pearson.items(), key = operator.itemgetter(1), reverse = True)
    
    for pair in pearson_sorted[:N]:
        
        u_choosen = pair[0][0]
        
        u_rating_business_train = user_business_rating_map[u_choosen][business_train]
        
        u_sum_all_other = []
        
        for k, v in user_business_rating_map[u_choosen].items():
            
            #calculate prediction numerator , avg Ru
            if k != business_train:
                
                u_sum_all_other.append(float(v))
                
        u_avg_all_other_rated_items = sum(u_sum_all_other) / float(len(u_sum_all_other))
        
        numerator += pair[1] * (u_rating_business_train - u_avg_all_other_rated_items) 
        
        pearson_total += abs(pair[1])      
            
    r_a_user_train_sum = [user_business_rating_map[user_train][i] for i in user_train_rated_businesses]
    
    r_a_user_train_avg = sum(r_a_user_train_sum) / float(len(r_a_user_train_sum))
    
    rating = r_a_user_train_avg
    
    if pearson_total > 0:
        
        rating = r_a_user_train_avg + numerator / pearson_total
        
    rating = rearrange_result(rating)

    return ((x[0], x[1]), rating)

time_case2 = time.time()

preds_case2 = testset_case12.map(train_case2)






In [91]:
preds_case2.take(5)

[((8731, 2541), 4.227217372165174),
 ((2171, 1110), 5),
 ((5680, 12981), 4.929877661024455),
 ((9667, 3071), 4.9005058490985265),
 ((8954, 16476), 3.195963197684688)]

In [92]:
testset_ratings_case12.take(5)

[(8731, 2541, 5.0),
 (2171, 1110, 5.0),
 (5680, 12981, 4.0),
 (9667, 3071, 5.0),
 (8954, 16476, 3.0)]

In [93]:
preds_case2 = preds_case2.union(additional_entry_avg).collect()

In [99]:
len(preds_case2)
fp = open('/Users/haopengsong/PycharmProjects/inf553hw3/hw3_output_task2_test.txt', 'w')
for x in preds_case2:
    fp.write(x + '\n')
fp.close()


[((8731, 2541), 4.227217372165174),
 ((2171, 1110), 5),
 ((5680, 12981), 4.929877661024455),
 ((9667, 3071), 4.9005058490985265),
 ((8954, 16476), 3.195963197684688),
 ((10289, 3333), 4.284328555659652),
 ((7590, 14181), 4.850887623828555),
 ((875, 4679), 3.3843960265363333),
 ((979, 13670), 3.428537363975027),
 ((6047, 15439), 4.037508313782388),
 ((232, 9714), 3.3701710201281574),
 ((10258, 13012), 4.020137066108516),
 ((10102, 2613), 3.988708875003633),
 ((5781, 3061), 3.24497842653193),
 ((5247, 254), 5),
 ((5956, 745), 4.009063652506979),
 ((1198, 2606), 3.8359407946283985),
 ((8648, 14510), 4.091758219131423),
 ((10197, 12772), 2.4055276667696006),
 ((5840, 15011), 3.5407727458055995),
 ((5244, 7069), 3.4321320097496546),
 ((1470, 22851), 3.6029234038357965),
 ((3079, 516), 4.3434449384448675),
 ((452, 78), 3.9404544028463917),
 ((5645, 12981), 5),
 ((3249, 14839), 4.411608084742759),
 ((9242, 17423), 4.479723517359119),
 ((783, 620), 3.7652435036105167),
 ((318, 1206), 3.8183479

In [94]:
# time_case2 = time.time()
predsCompare_case2 = testset_ratings_case12 \
                    .map(lambda x : ((x[0], x[1]), x[2])).union(additional_entry_orig) \
                    .join(sc.parallelize(preds_case2))


In [95]:
#predsCompare_case2.filter(lambda x : x[1][1] == avg_additional_entry_rating).take(2)

In [96]:
case2RMSE = pow(predsCompare_case2.map(lambda x : ((x[1][0] - x[1][1]) ** 2)).mean() , 1/2)
#predsCompare_case2.collect()
print('task2 case 2 finish time: ' + str(time.time() - time_case2))

task2 case 2 finish time: 100.1551501750946


In [97]:
# n: 10 * 15 19 1.125
print(case2RMSE)


1.090880886163281


In [51]:
# def divide(x):
#     if x[1] >= 0 and x[1] < 1:
#         return ('0 <= x < 1' , 1)
#     elif x[1] >= 1 and x[1] < 2:
#         return ('1 <= x < 2', 1)
#     elif x[1] >= 2 and x[1] < 3:
#         return ('2 <= x < 3', 1)
#     elif x[1] >= 3 and x[1] < 4:
#         return ('3 <= x < 4', 1)
#     else:
#         return ('4 <= x', 1)
    


# ground_divide = testset_ratings_case12 \
#                     .map(lambda x : ((x[0], x[1]), x[2])) \
#                     .union(additional_entry_orig) \
#                     .map(divide) \
#                     .filter(lambda x : x != None) \
#                     .reduceByKey(lambda a, b : a + b) \
#                     .collect()
# ground_divide

In [52]:
# preds_divide = preds_case2.map(divide) \
#                             .filter(lambda x : x != None) \
#                             .reduceByKey(lambda a, b : a + b).collect()

# """
# [('4 <= x', 54368),
#  ('2 <= x < 3', 16653),
#  ('1 <= x < 2', 2250),
#  ('3 <= x < 4', 68773)]
# """
# preds_divide

In [67]:
#case 3 item based
def train_case3(x):
    
    N = 15
    
    user_u = x[0]
    
    business_u = x[1]
    
    u_businesses = list( user_business_map[user_u] )
    
    if business_u in u_businesses:
    
        return ((x[0], x[1]) ,user_business_rating_map[user_u][business_u])
    
    business_u_ratings = [user_business_rating_map[i][business_u] for i in business_user_map[business_u]]
    
    #print(business_u_ratings)
    
    business_u_ratings_avg = sum(business_u_ratings) / float(len(business_u_ratings))
    
    #print(business_u_ratings_avg)
    
    #similarity between user_u and u_businesses
    
    business_u_users = business_user_map[business_u]
    
    pearson = {}
    
    u_business_all = [user_business_rating_map[user_u][i] for i in u_businesses]
    
    u_business_all_avg = sum(u_business_all) / float(len(u_business_all))

    for business_v in u_businesses[:N]:
        
        #print('business : ' + str(business_v))
        
        business_v_users = business_user_map[business_v]
        
        business_v_ratings = [user_business_rating_map[i][business_v] for i in business_user_map[business_v]]
        
        #print(business_v_ratings)
        
        business_v_ratings_avg = sum(business_v_ratings) / float(len(business_v_ratings))
        
        #print(business_v_ratings_avg)
        
        inter = business_u_users.intersection(business_v_users)
        
        if len(inter) > 0:
            
            dot_product = 0.0
            
            distance_to_avg_u = 0.0
            
            distance_to_avg_v = 0.0
            
            for user in inter:
                
                coeff_u = float(user_business_rating_map[user][business_u] - business_u_ratings_avg)
                
                #print(coeff_u)
                
                coeff_v = float(user_business_rating_map[user][business_v] - business_v_ratings_avg)
                
                #print(coeff_v)
                
                dot_product += coeff_u * coeff_v
                
                distance_to_avg_u += (coeff_u ** 2)
                
                distance_to_avg_v += (coeff_v ** 2)
                
            pearson_u_v = 0.0
            
            if dot_product == 0 or distance_to_avg_u == 0 or distance_to_avg_v == 0:
                
                pearson_u_v = 0.0
                
            else:
                
                pearson_u_v = dot_product / ((distance_to_avg_u ** (1/2)) * (distance_to_avg_v ** (1/2)))
                
            #print('pearson: ' + str(pearson_u_v))
            
            pearson[(business_v, business_u)] = pearson_u_v
            
    pearson_total = 0.0
    
    numerator = 0.0
    
    pearson_sorted = sorted(pearson.items(), key = operator.itemgetter(1), reverse = True)
    
    #print(pearson_sorted)
    
    for pair in pearson_sorted[:(N - 1)]:
        
        b_choosen = pair[0][0]
        
        b_rating_user_u = user_business_rating_map[user_u][b_choosen]
        
        numerator += b_rating_user_u * pair[1]
        
        pearson_total += abs(pair[1])
    
    rating = 0
    
    if pearson_total != 0:
        
        rating = numerator / pearson_total
        
    #print(rating)
            
    return ((x[0], x[1]), u_business_all_avg)
            
time3 = time.time()

preds_case3 = testset_case12.map(train_case3)

preds_case3 = preds_case3.union(additional_entry_avg)

predsCompare_case3 = testset_ratings_case12 \
                .map(lambda x : ((x[0], x[1]), x[2])).union(additional_entry_orig) \
                .join(preds_case3)



case3RMSE = pow(predsCompare_case3.map(lambda x : pow(x[1][0] - x[1][1], 2)).mean() , 1/2)      
print('task2 case 3 finish time: ' + str(time.time() - time3))
print(case3RMSE)


task2 case 3 finish time: 53.17190599441528
1.0755070029007214
