# Term Project
## 105072123 黃海茵

In [1]:
from pyspark import SparkConf, SparkContext
import pyspark
import math

# Mapper
## aver
* 算出每個 movie 的平均 rating
* 回傳 ((movieId, rating mean), [(userId, rating)...])

In [2]:
def aver(line):
    S = 0.0
    for i in line[1]:
        S += i[1]
    
    return ((line[0], S/len(line[1])), line[1])

## sub
* 算出每部 movie 的 rating - rating mean
* 回傳 (movieId, [(userId, rating - mean)...]

In [3]:
def sub(line):
    M = []
    for i in line[1]:
        M.append((i[0], i[1]-line[0][1]))

    return (line[0][0], M)

## sqrt
* 算出每個 movie 的 rating - rating mean 根號平方和
* 回傳 (movieId, rating - rating mean 根號平方和)

In [4]:
def sqrt(line):
    S = 0.0
    for i in line[1]:
        S += i[1]**2
    
    return (line[0], S**0.5)

## mul
* 算出每個 movie pair 的 rating - rating mean 乘積
* 回傳 [((movieId, movieId), rating - rating mean 乘積)...]

In [5]:
def mul(line):
    M = []
    for i in range(len(line[1])-1):
        for j in range(i+1, len(line[1])):
            if line[1][i][0] < line[1][j][0]:
                M.append(((line[1][i][0], line[1][j][0]), line[1][i][1] * line[1][j][1]))
            else:
                M.append(((line[1][j][0], line[1][i][0]), line[1][i][1] * line[1][j][1]))
            
    return M

## cos_sim
* 算出每兩部電影的 cosine similarity
* 回傳 ((movieId, movieId), similarity)
* 看到討論區助教有說 "同學可以將 rdd collect() 成 list 餵給其他人當參數用，然後用 map reduce 算結果"
* 所以分母直接使用前面存在 dictionary 中的資料

In [6]:
def cos_sim(line):
    if D[line[0][0]]*D[line[0][1]] == 0:
        return (line[0], 0)
    else:
        return (line[0], line[1] / (D[line[0][0]]*D[line[0][1]]))

## change_key
* 把原本的 (movieId, [(userId, rating - mean)...] 換成 (userId, [(movieId, rating - mean)...]
* 方便後續的 map reduce 動作

In [7]:
def change_key(line):
    M = []
    for i in line[1]:
        M.append((i[0], (line[0], i[1])))
        
    return M

## movie_list
* 回傳 (userId, [沒有 rating 的 movieId]) 
* 方便後續求 prediction

In [8]:
def movie_list(line):
    M = []
    for i in line[1]:
        M.append(i[0])
        
    return (line[0], list(set(movie_collect)-set(M)))

## rating_prediction
* M 中存的資料為 top 10 的 (similarity, rating)，方便做分子的乘積
* 如果預測的 rating 為負數，或是分母為 0 就刪去
* 回傳 (userId, [(movieId, rating)...])

In [9]:
def rating_predict(line):
    r = []
    for i in line[1][0]:
        M = []
        for j in line[1][1]:
            if len(M) < 2 and (i in D_sim and j[0] in D_sim[i] or j[0] in D_sim and i in D_sim[j[0]]):
                if i in D_sim and j[0] in D_sim[i]:
                    M.append((D_sim[i][j[0]], j[1]))
                else:
                    M.append((D_sim[j[0]][i], j[1]))
            elif i in D_sim and j[0] in D_sim[i] or j[0] in D_sim and i in D_sim[j[0]]:
                if i in D_sim and j[0] in D_sim[i] and D_sim[i][j[0]] > min(M)[0]:
                    M.remove(min(M))
                    M.append((D_sim[i][j[0]], j[1]))
                elif j[0] in D_sim and i in D_sim[j[0]] and D_sim[j[0]][i] > min(M)[0]:
                    M.remove(min(M))
                    M.append((D_sim[j[0]][i], j[1]))
                    
        p = 0
        q = 0
        for k in M:
            p += k[0] * k[1]
            q += k[0]
        if q != 0 and p/q > 0 and p/q <= 5:
            r.append((i, p/q))
        
    return (line[0], r)    

# reducer
* x + y

In [10]:
def reducer(x, y):
    return x + y

## 讀資料
* 討論區中助教提到可以把 driver memory 開大一點，所以我就用了助教的方式來做
* 然後先將 input data 中的 ratings.csv 讀進來
* 以 movieId 作為 data 的 key，(userId, rating) 作為 value

In [11]:
conf = SparkConf().set("spark.default.parallelism", 4).setAppName("Term_Project_Group22") \
    .set('spark.driver.memory', '150G') \
    .set('spark.driver.maxResultSize', '4G')
sc = SparkContext.getOrCreate(conf=conf)

data = sc.textFile("ml-latest-small/testing.csv").map(lambda x: x.split(',')).filter(lambda x: x[0] != 'userId')
data2 = data.map(lambda x: (int(x[1]), (int(x[0]), float(x[2])))).groupByKey().mapValues(list)
#print(sorted(data2.collect()))

* (movieId, rating mean), [(userId, rating)...]

In [12]:
rating_aver = data2.map(aver)
#rating_aver_collect = sorted(rating_aver.collect())
#print(rating_aver_collect)

* (movieId, [(userId, rating - rating mean)...])

In [13]:
sub_mean = rating_aver.map(sub)
#sub_mean_collect = sorted(sub_mean.collect())
#print(sub_mean_collect)

* (movieId, rating - rating mean 根號平方和)

In [14]:
rating_sqrt = sub_mean.map(sqrt)
rating_sqrt_collect = sorted(rating_sqrt.collect())
#print(rating_sqrt_collect)

* 前面的 rdd collect() 成 list 後，存到 dictionary 中，方便操作
* D[movieId] = rating - mean

In [15]:
D = {}
for i in rating_sqrt_collect:
    D[i[0]] = i[1]
#print(D)    

* (userId, [(movieId, rating - mean)...])

In [16]:
sub_mean2 = sub_mean.flatMap(change_key).groupByKey().mapValues(list)
#sub_mean2_collect = sorted(sub_mean2.collect())
#print(sub_mean2_collect)

* ((movieId, movieId), rx*ry)

In [17]:
numerator = sub_mean2.flatMap(mul).reduceByKey(reducer)
#numerator_collect = sorted(numerator.collect())
#print(numerator_collect)

* ((movieId, movieId), similarity)
* 把 similarity 為 0 的過濾掉

In [18]:
sim = numerator.map(cos_sim).filter(lambda x: x[1] != 0)

In [19]:
sim_collect = sorted(sim.collect())
#print(sim_collect)

* 太大無法直接 print，memory 會爆掉
* 所以寫成 txt 檔

In [20]:
with open('Output_basic.txt', 'w') as f:
    for i in sim_collect:
        f.write('(' + str(i[0][0]) + ', ' + str(i[0][1]) +  '), ' + str(i[1]) + '\n')

* 前面的 rdd collect() 成 list 後，存到 2D-dictionary 中，方便操作
* D_sim[movieId][movieId] = similarity

In [21]:
D_sim = {}
for i in sim_collect:
    if i[0][0] in D_sim:
        D_sim[i[0][0]].update({i[0][1]: i[1]})
    else:
        D_sim.update({i[0][0]:{i[0][1]: i[1]}})
#print(D_sim)

* (userId, [(movieId, rating)...])

In [22]:
data3 = data.map(lambda x: (int(x[0]), (int(x[1]), float(x[2])))).groupByKey().mapValues(list)
#print(sorted(data3.collect()))

* movie 包含了全部的 movieId

In [23]:
movie = rating_sqrt.map(lambda x: x[0])
movie_collect = sorted(movie.collect())
#print(movie_collect)

* (userId, ([無 rating 的 movieId], [(movieId, rating)...]))

In [24]:
user_movie = sub_mean2.map(movie_list).join(data3)
#user_movie_collect = sorted(user_movie.collect())
#print(user_movie_collect)

* (userId, [(movieId, rating)...])

In [25]:
prediction = user_movie.map(rating_predict)

In [None]:
prediction_collect = sorted(prediction.collect())
#print(prediction_collect)

* 太大無法直接 print，memory 會爆掉
* 所以寫成 txt 檔

In [None]:
with open('Output_predict.txt', 'w') as f:
    for i in prediction_collect:
        for j in i[1]:
            f.write('(' + str(i[0]) + ', ' + str(j[0]) +  '), ' + str(j[1]) + '\n')
        
sc.stop()