In [1]:
from pyspark import SparkConf, SparkContext
import math

### Item-item cosine similarity w/ subtract mean

In [2]:
conf = SparkConf() \
        .setMaster("local") \
        .setAppName("rating-prediction") \
        .set("spark.default.parallelism", 4) \
        .set('spark.driver.memory', "40G") \
        .set('spark.driver.maxResultSize', '20G') \
        .set('spark.worker.cleanup.enabled', 'True') \
        .set('spark.sql.shuffle.partitions', 500) # Increasing SQL shuffle partitions
    
sc = SparkContext.getOrCreate(conf=conf)

ratings_dir = './test.csv'

# userId, movieId, rating, timestamp
raw_rdd = sc.textFile(ratings_dir)

header = raw_rdd.first()

# out: mov_id, (usr_id, rating)
ratings = raw_rdd.filter(lambda line : line != header and len(line) > 1) \
        .map(lambda line: line.split(",")) \
        .map(lambda x: (x[1], (x[0], float(x[2]) ) )).cache()

# ratings.collect()

In [3]:
def subtract_mean(lst):
    total = 0
    for p in lst:
        u_id, rating = p
        total += rating
        
    mean = total / len(lst)
    return [(p[0], p[1]-mean) for p in lst]


def calc_sim(x):
    p1_id, p1_lst = x[0]
    p2_id, p2_lst = x[1]
    
    if p1_id == p2_id:
        return (p1_id, p2_id), 1.0
    
    d1, d2 = dict(p1_lst), dict(p2_lst)
    
    sum_of_prod = 0
    sum_of_sqr1, sum_of_sqr2 = 0,0
    
    for k in d1.keys():
        sum_of_sqr1 += d1[k]**2
        if k in d2.keys():
            sum_of_prod += d1[k] * d2[k]
            
    for k in d2.keys():
        sum_of_sqr2 += d2[k]**2
    
    if sum_of_prod == 0:
        sim = 0
    else:
        sim = sum_of_prod / (math.sqrt(sum_of_sqr1) * math.sqrt(sum_of_sqr2))
    
    return (p1_id, p2_id), sim

In [None]:
# mean = ratings.mapValues(lambda v: (v[1], 1)) \
#         .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
#         .mapValues(lambda v: v[0]/v[1])

# sub_mean = ratings.join(mean) \
#         .mapValues(lambda v: [(v[0][0], v[0][1]-v[1])]) \
#         .reduceByKey(lambda a,b: a+b)\
#         .sortByKey() \

# sub_mean.take(5)

In [4]:
# out: mov_id, [list of (usr_id, rating)...]
sub_mean = ratings.groupByKey() \
            .mapValues(subtract_mean).cache()
    
# sub_mean.take(5)

In [5]:
# out: ((mov_id, mov_id), sim)
sim = sub_mean.cartesian(sub_mean) \
        .map(calc_sim).cache()
#sim.take(5)

#### Ordering and write output file

In [6]:
def write_sim_out(sim):
    sim_out = sim \
        .map(lambda x: ((int(x[0][0]), int(x[0][1])), x[1]) ) \
        .sortByKey() \
        .filter(lambda x: x[0][0] < x[0][1])
    
    out_file = "./part1_out_test.txt"
    with open(out_file,'w') as f:
        for e in sim_out.collect():  
            print("{},    {:.8f}".format(e[0], float(e[1])))
            f.write("({}, {}), {:.8f}\n".format(e[0][0], e[0][1],e[1]) )
            
    sim_out.unpersist()

In [7]:
write_sim_out(sim)

(1, 2),    -0.17854212
(1, 3),    0.41403934
(1, 4),    -0.10245014
(1, 5),    -0.30895719
(1, 6),    0.58703951
(2, 3),    -0.52623481
(2, 4),    0.46800784
(2, 5),    0.39891072
(2, 6),    -0.30643976
(3, 4),    -0.62398065
(3, 5),    -0.28426762
(3, 6),    0.50636968
(4, 5),    0.45873490
(4, 6),    -0.23533936
(5, 6),    -0.21591676


### Rating Prediction

- Select top 10 similar items to calculate the movie rating for each user, then predict missing by taking weighted average

In [8]:
# out: (mov_id, [(mov_id, sim), ...])
item_item_sim = sim \
        .filter(lambda x: int(x[0][0]) != int(x[0][1])) \
        .map(lambda x: (x[0][0], (x[0][1],x[1]) )) \
        .groupByKey().mapValues(list).cache()

item_item_sim.take(5)

In [9]:
# out: (usr_id, [(mov_id, rating), ...])
user = ratings \
        .map(lambda x: (x[1][0], (x[0], x[1][1]))) \
        .groupByKey().mapValues(list)

# user.take(3)

In [10]:
item = ratings.map(lambda x: x[0]).distinct()

# item.take(6)

In [11]:
def item_mean_mapper(x):
    return (x[0], sum(x[1]) / len(x[1]))


item_mean = ratings \
            .map(lambda x: (x[0], x[1][1])) \
            .groupByKey() \
            .map(item_mean_mapper)

item_mean.take(6)

[('1', 3.6),
 ('4', 3.4),
 ('2', 3.1666666666666665),
 ('3', 3.0),
 ('5', 3.3333333333333335),
 ('6', 2.6)]

In [12]:
def new_mapper(x):
    u_id, u_rating_lst = x[0]
    m_id = x[1]
    
    u_rating_dic = dict(u_rating_lst)
    
    if m_id not in u_rating_dic.keys():
        
        return [(m_id, (u_id, u_rating_lst))]
    else:
        return []
    
    
def get_weight_avg(u_rating_dic, sim_mov_lst):
    i, count = 0, 0
    sum_of_prod, sum_of_wieght = 0, 0

    sim_mov_lst = sorted(sim_mov_lst, key = \
                lambda x: x[1], reverse=True)

    while (i < len(sim_mov_lst) and count < 10):
        sim_mov_id, sim = sim_mov_lst[i]
        if sim <= 0:
            sim = 0;
            
        if sim_mov_id in u_rating_dic.keys():
            count += 1
            sum_of_prod += sim * u_rating_dic[sim_mov_id]
            sum_of_wieght += sim
 
        i += 1
    
    if sum_of_prod==0 or sum_of_wieght==0 :
        return 0
    else:
        return sum_of_prod / sum_of_wieght
    
    
def predict(x):
    m_id = x[0]
    u_id, u_rating_lst = x[1][0]
    sim_mov_lst = x[1][1]
    
    predict = get_weight_avg(dict(u_rating_lst), sim_mov_lst)
    
    return ((u_id, m_id), predict)

In [13]:
new_join = user \
        .cartesian(item) \
        .flatMap(new_mapper) \
        .join(item_item_sim) \
        .map(predict) \
        .map(lambda x: ((int(x[0][0]), int(x[0][1])), x[1])).sortByKey().cache()



In [14]:
new_join.collect()

[((1, 2), 0),
 ((1, 4), 0),
 ((1, 5), 0),
 ((2, 1), 4.0),
 ((2, 2), 2.0),
 ((2, 5), 2.0),
 ((2, 6), 4.0),
 ((3, 3), 3.0),
 ((4, 1), 1.0),
 ((4, 4), 3.5050029734328474),
 ((4, 6), 1.0),
 ((5, 1), 2.586406866934817),
 ((5, 2), 4.539852143274253),
 ((6, 2), 2.0),
 ((6, 3), 5.0),
 ((6, 4), 2.0),
 ((6, 6), 5.0),
 ((7, 1), 3.0),
 ((7, 4), 4.0),
 ((7, 5), 4.0),
 ((7, 6), 3.0),
 ((8, 1), 2.0),
 ((8, 2), 4.0),
 ((8, 3), 2.0),
 ((8, 5), 4.0),
 ((9, 2), 0),
 ((9, 4), 0),
 ((9, 5), 0),
 ((9, 6), 4.536889128793153),
 ((10, 1), 3.0),
 ((10, 4), 2.0),
 ((10, 5), 2.0),
 ((10, 6), 3.0),
 ((12, 1), 0),
 ((12, 3), 0),
 ((12, 4), 3.9899940531343057),
 ((12, 6), 0)]

In [None]:
out_file = "./part2_out_test.txt"
with open(out_file,'w') as f:   
    for e in new_join.collect():  
        f.write("({}, {}), {:.8f}\n".format(e[0][0], e[0][1],e[1]) )

 ### Altered version for rating prediction
 ####  - use average of simialr item if the similar item is unrated

In [16]:
ii_sim_with_mean = sim \
        .filter(lambda x: int(x[0][0]) != int(x[0][1])) \
        .map(lambda x: (x[0][0], (x[0][1],x[1]) )) \
        .join(item_mean) \
        .map(lambda x: (x[1][0][0], (x[0] ,x[1][0][1], x[1][1]) )) \
        .groupByKey().mapValues(list).cache()

# ii_sim_with_mean.take(3)

In [17]:
def altered_get_weight_avg(u_rating_dic, sim_mov_lst):
    i, count = 0, 0
    sum_of_prod, sum_of_wieght = 0, 0

    sim_mov_lst = sorted(sim_mov_lst, key = \
                lambda x: x[1], reverse=True)

    while (i < len(sim_mov_lst) and count < 10):
        sim_mov_id, sim, sim_mov_avg = sim_mov_lst[i]
        if sim <= 0:
            sim = 0;
            
        if sim_mov_id not in u_rating_dic.keys():        
            sum_of_prod += sim * sim_mov_avg
            sum_of_wieght += sim
        else:
            sum_of_prod += sim * u_rating_dic[sim_mov_id]
            sum_of_wieght += sim
 
        count += 1
        i += 1
    
    if sum_of_prod==0 or sum_of_wieght==0 :
        return 0
    else:
        return sum_of_prod / sum_of_wieght
    
    
def altered_predict(x):
    m_id = x[0]
    u_id, u_rating_lst = x[1][0]
    sim_mov_lst = x[1][1]
    
    predict = altered_get_weight_avg(dict(u_rating_lst), sim_mov_lst)
    
    return ((u_id, m_id), predict)

In [18]:
altered_join = user \
        .cartesian(item) \
        .flatMap(new_mapper) \
        .join(ii_sim_with_mean) \
        .map(altered_predict) \
        .map(lambda x: ((int(x[0][0]), int(x[0][1])), x[1])).sortByKey().cache()

In [19]:
altered_join.collect()

[((1, 2), 3.369323476218284),
 ((1, 4), 3.2491661710945254),
 ((1, 5), 3.2914712964590955),
 ((2, 1), 3.179030386291256),
 ((2, 2), 2.6135304756343296),
 ((2, 5), 2.5426435177045215),
 ((2, 6), 3.7852443484827387),
 ((3, 3), 3.0),
 ((4, 1), 1.9382509870957076),
 ((4, 4), 3.5050029734328474),
 ((4, 6), 2.3959117348622),
 ((5, 1), 2.586406866934817),
 ((5, 2), 4.539852143274253),
 ((6, 2), 2.755793000583954),
 ((6, 3), 3.67962262948449),
 ((6, 4), 2.5891701356716545),
 ((6, 6), 4.073778257586309),
 ((7, 1), 2.765437253226073),
 ((7, 4), 3.670001982288565),
 ((7, 5), 3.679073809105183),
 ((7, 6), 3.3221334772758926),
 ((8, 1), 2.413593133065183),
 ((8, 2), 3.693234762182835),
 ((8, 3), 2.7197484196563275),
 ((8, 5), 3.6123974873539133),
 ((9, 2), 3.369323476218284),
 ((9, 4), 3.2491661710945254),
 ((9, 5), 3.2914712964590955),
 ((9, 6), 4.536889128793153),
 ((10, 1), 2.765437253226073),
 ((10, 4), 2.6599960354228704),
 ((10, 5), 2.7488277787545745),
 ((10, 6), 3.3221334772758926),
 ((12, 