In [1]:
from pyspark import SparkConf, SparkContext
from operator import itemgetter, attrgetter
import heapq
import time
import matplotlib.pyplot as plt

In [2]:
def mapper0(x):
    ans = x.split(',')
    ans_type = [int(ans[1]), [(int(ans[0]), float(ans[2]))]]
    return tuple(ans_type)

In [3]:
def mapper1(x):
    List = [item[1] for item in x[1]]
    avg = sum(List) / len(List)
    Sum = 0
    for item in List:
        Sum += pow(item-avg, 2)
    return [(x[0], pow(Sum, 0.5))] if Sum != 0 else []

In [4]:
def mapper2(x):
    List = [item[1] for item in x[1]]
    avg = sum(List) / len(List)
    return [(item[0], [(x[0], item[1] - avg)]) for item in x[1]]

In [5]:
def mapper3(x):
    ans = []
    List = [item for item in x[1] if item[1] != 0]
    for item1 in List:
        for item2 in List:
            if item1 < item2:
                ans.append(((item1[0], item2[0]), item1[1] * item2[1]))
    return ans

In [6]:
def mapper4(x):
    not_zero = x[1][0][2] * x[1][1]
    ans = x[1][0][1] / not_zero if not_zero != 0 else 0
    return ((x[1][0][0], x[0]), ans)

In [7]:
def mapper_switch1(x):
    return (x[0][0], (x[0][1], x[1]))

def mapper_switch2(x):
    return (x[1][0][0], (x[0], x[1][0][1], x[1][1]))

In [8]:
def reducer1(x, y):
    return x + y

In [9]:
sc = SparkContext.getOrCreate()
doc = sc.textFile("./input.csv")
rating_per_item = doc.map(mapper0).reduceByKey(reducer1)
avg_per_item = rating_per_item.flatMap(mapper1)

In [10]:
rating_per_user = rating_per_item.flatMap(mapper2).reduceByKey(reducer1)

In [11]:
ratings_per_pair = rating_per_user.flatMap(mapper3).reduceByKey(reducer1)
ratings_per_pair = ratings_per_pair.map(mapper_switch1).join(avg_per_item)
ratings_per_pair = ratings_per_pair.map(mapper_switch2).join(avg_per_item)
ratings_per_pair = ratings_per_pair.map(mapper4)

In [12]:
sorted_ans = sorted(ratings_per_pair.collect())

In [13]:
f = open("./Output.txt", 'a')
for ans in sorted_ans:
    f.write('(%d,%d),%.2f\n'%(ans[0][0], ans[0][1], ans[1]))
print(sorted_ans)

[((1, 2), -0.17854212213729673), ((1, 3), 0.41403933560541256), ((1, 4), -0.10245014273309601), ((1, 5), -0.30895719032666236), ((1, 6), 0.5870395085642741), ((2, 3), -0.5262348115842176), ((2, 4), 0.46800784077976615), ((2, 5), 0.39891071573694176), ((2, 6), -0.3064397582621859), ((3, 4), -0.6239806502223061), ((3, 5), -0.2842676218074806), ((3, 6), 0.5063696835418333), ((4, 5), 0.45873490213598356), ((4, 6), -0.2353393621658208), ((5, 6), -0.21591675854376524)]


In [14]:
def mapper0_0_1(x):
    ans = x.split(',')
    ans_type = [int(ans[0]), [((int(ans[1])), float(ans[2]))]]
    return tuple(ans_type)

def mapper0_1(x):
    if x[1] <= 0: return []
    return [] if x[1] <= 0 else [(x[0][0], [(x[0][1], x[1])])]

movies_per_user = doc.map(mapper0_0_1).reduceByKey(reducer1)
movie_list = rating_per_item.keys().collect()
sim_double = ratings_per_pair.flatMap(mapper0_1).reduceByKey(reducer1).collect()

In [15]:
def mapper0_2(x, sim_double, movie_list):
    ans = []
    exist_movie = [data[0] for data in x[1]]
    missing_movie = list(set(movie_list) - set(exist_movie))
    Dict = {}
    for item in sim_double:
        Dict[item[0]] = {}
        for sim in item[1]:
            Dict[item[0]][sim[0]] = sim[1]
    for movie1 in missing_movie:
        sim_list = []
        sim_rating_dict = {}
        for (movie2, rating) in x[1]:
            id1 = min(movie1, movie2)
            id2 = max(movie1, movie2)
            if Dict.get(id1) and Dict.get(id1).get(id2):
                sim = Dict.get(id1).get(id2)
                sim_list.append(sim)
                if sim_rating_dict.get(sim):
                    sim_rating_dict[sim] += [rating]
                else:
                    sim_rating_dict[sim] = [rating]
                
        for sim in sim_rating_dict:
            sim_rating_dict[sim] = sum(sim_rating_dict[sim]) / len(sim_rating_dict[sim])
                
        top_ten_list = heapq.nlargest(10, sim_list) 
        down = sum(top_ten_list)
        up = sum(sim * sim_rating_dict[sim] for sim in top_ten_list)
        if down != 0:
            ans.append((movie1, up / down))
    return (x[0], ans)

predict_sims = movies_per_user.map(lambda x: mapper0_2(x, sim_double, movie_list))
predict_ans = predict_sims.collect()

In [16]:
f = open("./Outputfile-bonus.txt", 'a')
sorted_predict_ans = sorted(predict_ans)
for ans in sorted(predict_ans):
    for e in sorted(ans[1]):
        f.write('(%d,%d),%.2f\n'%(ans[0], e[0], e[1]))
f.close()
print(sorted_predict_ans)

[(1, []), (2, [(1, 4.0), (2, 2.0), (5, 2.0), (6, 4.0)]), (3, [(3, 3.0)]), (4, [(1, 1.0), (4, 3.5050029734328474), (6, 1.0)]), (5, [(1, 2.586406866934817), (2, 4.539852143274253)]), (6, [(2, 2.0), (3, 5.0), (4, 2.0), (6, 5.0)]), (7, [(1, 3.0), (4, 4.0), (5, 4.0), (6, 3.0)]), (8, [(1, 2.0), (2, 4.0), (3, 2.0), (5, 4.0)]), (9, [(6, 4.536889128793153)]), (10, [(1, 3.0), (4, 2.0), (5, 2.0), (6, 3.0)]), (11, []), (12, [(4, 3.9899940531343057)])]


In [None]:
def generate_table(c):
    Dict = {"c1": c[0], "c2": c[1]}
    DF = pd.DataFrame(Dict,index=[i+1 for i in range(20)])
    display(DF)
generate_table(sorted_predict_ans)