# Rcommand System Implement
## dataSet comes from https://grouplens.org/datasets/movielens/
## ml-latest-small.zip
## output (userId, movieId) predicate rate

# 讀取資料，並且使用movies計算總共有幾個movie在計算最後的結果時才知道有幾個movies

In [None]:
movies = set()
def readData(inputFile):·
    global movies
    with open(inputFile) as fp:
        data = fp.readlines()

    data = data[1:]
    for i in range(len(data)):
        data[i] = data[i].strip().split(',')[:3]
        data[i][0] = int(data[i][0])
        data[i][1] = int(data[i][1])
        movies.add(data[i][1])
        data[i][2] = float(data[i][2])
    return data

In [None]:
data = readData('ml-latest-small/ratings.csv')
data[:10]

In [None]:
for i in range(len(data)):
    if data[i][2] == 0.0:
        print(i)

# 系統設定與資訊

In [None]:
import sys
sys.version

In [None]:
import pyspark
from pyspark import SparkContext, SparkConf
import math
import os


conf = SparkConf().setMaster("local[*]").setAppName("Project").set("spark.yarn.executor.memoryOverhead","4096").set("spark.driver.maxResultSize","16G").set("spark.driver.memory","64G").set("spark.executor.memory","16G").set("spark.network.timeout","7200s")
sc = SparkContext(conf=conf).getOrCreate()

# test data

In [None]:
test_data = [ # type is uer-id, movie id, score
    [0, 0, 4], [0, 3, 5], [0, 4, 1],
    [1, 0, 5], [1, 1, 5], [1, 2, 4],
    [2, 3, 2], [2, 4, 4], [2, 5, 5],
    [3, 1, 3], [3, 6, 3]
]

In [None]:
test_data2 = [ # type is movieId ,userId ,score
    [1, 1, 1], [1, 3, 3], [1, 6, 5], [1, 9, 5], [1, 11, 4],
    [2, 3, 5], [2, 4, 4], [2, 7, 4], [2, 10, 2], [2, 11, 1], [2, 12, 3],
    [3, 1, 2], [3, 2, 4], [3, 4, 1], [3, 5, 2], [3, 7, 3], [3, 9, 4], [3, 10, 3], [3, 11, 5],
    [4, 2, 2], [4, 3, 4], [4, 5, 5], [4, 8, 4], [4, 11, 2],
    [5, 3, 4], [5, 4, 3], [5, 5, 4], [5, 6, 2], [5, 11, 2], [5, 12, 5],
    [6, 1, 1], [6, 3, 3], [6, 5, 3], [6, 8, 2], [6, 11, 4]
]
for i in range(len(test_data2)):
    temp = test_data2[i][0]
    test_data2[i][0] = test_data2[i][1]
    test_data2[i][1] = temp
# sorted(test_data2)

# 需要幾個最相似的item來計算推薦值

In [None]:
topK = 10

### input data是[userId, movieId, score]
### 透過第一個map去掉userId,只剩下movieId,再將相同的movieId聚集起來
### 再透過mapValue取的movies的平均分數

In [None]:
inputMat = sc.parallelize(data)
point_for_each_item = inputMat.map(lambda s:(s[1], s[2])).groupByKey() # user得到他所有的評分
avg_point_for_each_item = point_for_each_item.mapValues(lambda s:sum(s)/len(s)) # user總分的平均值

In [None]:
# for ele in avg_point_for_each_item.sortByKey().collect()[:10]:
#     print(ele[0], (ele[1]))

## 將input 轉換為 (movieId, (userId, score))的形式
## 再將上述結果與前面計算的movieId平均值相減
## 就可以得到扣除平均值的結果

In [None]:
point_minus_avg = inputMat.map(lambda s:(s[1], (s[0], s[2]))) # type (movieId, (userId, score))
point_minus_avg = point_minus_avg.join(avg_point_for_each_item) # type(movieId, ((userId, score), avg))
point_minus_avg = point_minus_avg.map(lambda s:((s[0], s[1][0][0]), s[1][0][1]-s[1][1])) # type ((movieId, userId), score)

In [None]:
# point_minus_avg.sortByKey().collect()[:10]

## 計算cosim similarity的方法
### 首先將item1與item2有的value平方相加開根號
### 再根據暴力法找看過相同的movie並且相乘在相加

In [None]:
# calculate cosine similarity
def cosim(_input):
    item1_L2_nor = 0
    for ele in list(_input[0][1]):
        item1_L2_nor += (ele[1]**2)
    item1_L2_nor = item1_L2_nor ** 0.5

    item2_L2_nor = 0
    for ele in list(_input[1][1]):
        item2_L2_nor += (ele[1]**2)
    item2_L2_nor = item2_L2_nor ** 0.5
    
    rxi_ryi = 0
    for ele1 in list(_input[0][1]):
        for ele2 in list(_input[1][1]):
            if(ele1[0] == ele2[0]):
                rxi_ryi += (ele1[1] * ele2[1])
                break
    if item1_L2_nor * item2_L2_nor != 0:
        return ((_input[0][0], _input[1][0]), rxi_ryi/(item1_L2_nor * item2_L2_nor))
    else:
        return ((_input[0][0], _input[1][0]), 0.0)

### point_minus_avg.map將資料轉換為(movieId, (userId, score))
### 透過item_score.cartesian(item_score)得到所有的pair
### 利用filter刪除 (2, 1)這種case，防止(2, 1),(1, 2)重複計算
### all_pair_for_item.map(cosim)得到所有的movie pair的similarity

In [None]:
item_score = point_minus_avg.map(lambda s:(s[0][0], (s[0][1],s[1]))).groupByKey() # type(movieId, (userId, score))
all_pair_for_item = item_score.cartesian(item_score).filter(lambda s: s[0][0] < s[1][0]) #type((movie1, (userid, score)), (movie2, (userid, score)))
all_pair_score_for_item = all_pair_for_item.map(cosim) # type((movieId1, movieId2), similarity)

In [None]:
# all_pair_score_for_item.sortByKey().collect()[:10]

### all_pair_score_for_item.map(....)將所有資料轉為(similarity, ((movie1, movie2), (movie2, movie1)))
### 透過all_pair_score_for_item.flatMapValues 將資料轉為(similarity, (movie1, movie2))，如此一來也能得到(2, 1)的similarity
### 最後在map成 ((movie1, movie2), score)的形式

In [None]:
# type (score, ((movie1, movie2), (movie2, movie1)))
all_pair_score_for_item = all_pair_score_for_item.map(lambda s:(s[1], ((s[0][0], s[0][1]), (s[0][1],s[0][0]))))
# type ((movie1, movie2), score)
all_pair_score_for_item = all_pair_score_for_item.flatMapValues(lambda s: s).map(lambda s:(s[1], s[0])) 

In [None]:
# all_pair_score_for_item.sortByKey().collect()[:10]

### 將movie依照similarity排序
### ele當中為(movie, similarity)

In [None]:
def sort_similarity(_input):
    ele = list(_input[1])
    ele = sorted(ele, key=lambda s:s[1], reverse=True)            
    return (_input[0], ele)

### all_pair_score_for_item.map將item轉為(movieId, sorted similarity)
### sorted similarity是list每個element為(item, score)並且根據score的大小進行排序

In [None]:
# type (movie,similarity list)
similarity_score_for_each_item = all_pair_score_for_item.map(lambda s:((s[0][0]), (s[0][1], s[1]))).groupByKey().map(sort_similarity)

In [None]:
# similarity_score_for_each_item.collect()[:10]

### 得到user沒看過的movieId

In [None]:
def get_unRating(_input):
    global movies
    movie_list = list(movies)
    for ele in list(_input):
        movie_list.pop(movie_list.index(ele[0]))

    return movie_list

### 得到user對movie的pred

In [None]:
def get_rate(_input):
    label = dict()
    for ele in list(_input[1][0][1]): ## user have seen movie
        label[ele[0]] = ele[1]
    count = 0
    pred = 0
    weight_list = []
    for j in range(len(_input[1][1])):
        if _input[1][1][j][0] in label.keys() and _input[1][1][j][1]>0:
            pred += (_input[1][1][j][1] * label[_input[1][1][j][0]])
            weight_list.append(_input[1][1][j][1])
            count += 1
            if(count == topK):
                break
    weight_sum = sum(weight_list)
    if weight_sum == 0:
        return ((_input[1][0][0], _input[0]), 0)
    else:
        return ((_input[1][0][0], _input[0]), pred/sum(weight_list))

### 從input當中得到最大的pred與之對應的moveId

In [None]:
def get_max_pred(_input):
    res = list(_input)
    res = sorted(res, key=lambda s:s[1], reverse=True)
    return (res[0][0], res[0][1])

### inputMat.map(...)得到user看過的所有電影 type為type(user ,list of (movie, score))
### 透過get_unRating得到user沒看有過的movies，透過flatMapValues將type轉為 (user, unseen movie)
### user_unrating.join(user_rating)得到user沒看過move和看過的movie list與rating
### user_rating_and_unrating.map(...)將unseen movie當成key
### user_rating_and_unrating.join(...) 得到了unseen movie的similarity score
### 最後透過total_info.map(get_rate)得到((userId, movieId), pred)


In [None]:
user_rating = inputMat.map(lambda s:(s[0],(s[1], s[2]))).groupByKey() # type(user ,list of (movie, score))
user_unseen = user_rating.mapValues(get_unRating).flatMapValues(lambda s:s) # type(userid, movieId)
user_rating_and_unrating = user_unseen.join(user_rating) # type(userId, (unrating movie, list of (movie ,rating)))
user_rating_and_unrating = user_rating_and_unrating.map(lambda s:(s[1][0], (s[0], s[1][1]))) # type(unrating movie, (userId, list of (movie ,rating)))
total_info = user_rating_and_unrating.join(similarity_score_for_each_item)# type (unrating movie, ((userId, list of(movie, rating))), (similarity list for each item))
user_unseen_rate = total_info.map(get_rate) # type(userId, (movieId, pred))
res = user_unseen_rate.sortByKey().collect()

### 將資料寫回file

In [None]:
with open("Outputfile.txt", 'w') as fp:
    for i in range(len(res)):
        fp.write("({0},{1}),{2}\n".format(res[i][0][0], res[i][0][1], res[i][1]))