## Recommendation System
106062314 蔡政諺

In [1]:
from pyspark import SparkConf, SparkContext
import os
import time

In [2]:
sc.stop()
conf = SparkConf().setMaster("local").setAppName("recommendationSystem").set("spark.default.parallelism", 4).set('spark.driver.memory', '45g').set('spark.driver.maxResultSize', '10g')
sc = SparkContext(conf=conf)

In [3]:
# big file (610x9742)
# ratingsFile = "./ml-latest-small/ratings.csv"
# similaritiesFile = "./similarities.txt"
# predictionsFile = "./predictions.txt"
# iteration = 20
# N = 10
# numOfUsers = 610

# small file (12x6)
ratingsFile = "./ratings-small.csv"
similaritiesFile = "./similarities-small.txt"
predictionsFile = "./predictions-small.txt"
iteration = 1
N = 2
numOfUsers = 12

### Item-Item Collaborative Filtering
算出兩兩電影之間的相似度。
#### mapper1
將每行文字根據逗號 ```","``` 切開，得到 ```(movieId, [userId, rating])``` 的 key-value pairs。<br>
E.g., ```"1,1,4.0,964982703"``` → ```(1, [1, 4.0])```
#### reducer1
將同一部電影的 key-value pairs 合在一起。<br>
E.g., ```(1, [1, 1.0]), (1, [3, 3.0])``` → ```(1, [1, 1.0, 3, 3.0])```
#### mapper2
將 ```userId``` 與 ```rating``` 兩兩 pair 在一起。<br>
E.g., ```(1, [1, 1.0, 3, 3.0])``` → ```(1, [[1, 1.0], [3, 3.0]])```
#### mapper3
對同一部電影的所有評分做 subtract mean。<br>
E.g., ```(1, [[1, 1.0], [3, 3.0]])``` → ```(1, [[1, -1.0], [3, 1.0]])```
#### norm
計算一個向量的長度。
#### dot
計算兩個向量的內積。
#### cosineSimilarity
計算兩部電影的 cosine similarity。
#### mapper4
呼叫 ```cosineSimilarity```，算出兩兩電影之間的相似度。<br>
E.g., ```((1, [[1, -1.0], [3, 1.0]]), (2, [[1, -1.0], [3, 1.0]]))``` → ```((1, 2), 1.00)```

In [4]:
def mapper1(line):
    userId, movieId, rating, _ = line.split(",")
    return (int(movieId), [int(userId), float(rating)])

def reducer1(x, y):
    return x + y

def mapper2(x):
    # remove movies that only rated by one user
    if len(x[1]) == 2:
        return []
    ratings = []
    for i in range(int(len(x[1])/2)):
        ratings.append([x[1][2*i], x[1][2*i+1]])
    return [(x[0], ratings)]

# subtract mean
def mapper3(x):
    ratings = x[1]
    mean = 0.0
    for i in range(len(ratings)):
        mean += ratings[i][1]
    mean /= len(ratings)
    for i in range(len(ratings)):
        ratings[i][1] -= mean
        ratings[i][1] = ratings[i][1]
    # remove movies that the ratings are all the same
    isZeros = True
    for rating in ratings:
        if rating[1] != 0:
            isZeros = False
    if isZeros:
        return []
    return [(x[0], ratings)]

def norm(r):
    norm = 0
    for rating in r:
        norm += rating[1] ** 2
    norm = norm ** 0.5
    return norm

def dot(rx, ry):
    i, j, product = 0, 0, 0
    while i != len(rx) and j != len(ry):
        if rx[i][0] == ry[j][0]:
            product += rx[i][1] * ry[j][1]
            i += 1
            j += 1
        elif rx[i][0] < ry[j][0]:
            i += 1
        else:
            j += 1
    return product

def cosineSimilarity(rx, ry):
    if norm(rx)*norm(ry) == 0:
        return 0.0
    return dot(rx, ry)/(norm(rx)*norm(ry))

def mapper4(x, movies):
    similarities = []
    for movie in movies:
        if movie[0] < x[0]:
            similarity = cosineSimilarity(x[1], movie[1])
            if similarity != 0.0:
                similarities.append(((movie[0], x[0]), similarity))
    return similarities

In [5]:
ratings = sc.textFile(ratingsFile)
ratings = ratings.filter(lambda line: not line.startswith('userId'))
ratings = ratings.map(mapper1).sortBy(lambda x: (x[0], x[1][0]))
ratings = ratings.reduceByKey(reducer1).flatMap(mapper2).sortBy(lambda x: x[0])

ratingsNorm = ratings.flatMap(mapper3).sortBy(lambda x: x[0])
ratingsNormList = ratingsNorm.collect()

f = open(similaritiesFile, 'w')
batch_size = int(len(ratingsNormList)/iteration)
for i in range(iteration):
    start_time = time.time()
    similarities = ratingsNorm.flatMap(lambda x: mapper4(x, ratingsNormList[i*batch_size:(i+1)*batch_size])).sortBy(lambda x: (x[0][0], x[0][1]))
    similaritiesList = similarities.collect()
    for pair in similaritiesList:
        f.write("(%d, %d), %.6f\n" % (pair[0][0], pair[0][1], pair[1]))
    print("iter %d/%d: %ds" % (i+1, iteration, time.time()-start_time))
f.close()

iter 1/1: 13s


### Rating Predictions
預測所有使用者對電影的評分。
#### mapper_load
從 ```similarities.txt``` 中將兩兩電影的相似度讀出來。<br>
E.g., ```"((1, 2), 1.00)"``` → ```((1, 2), 1.00)```
#### mapper5
將兩兩電影之間的相似度的所有配對 append 出來。<br>
E.g., ```((1, 2), 1.00)``` → ```[(1, [2, 1.00]), (2, [1, 1.00])]```
#### mapper6
取出各電影的 neighbors，即相似度大於 0 的電影。<br>
E.g., ```(1, [2, 1.00, 3, 0.50, 4, 0.00, 5, -0.50])``` → ```(1, [[2, 1.00], [3, 0.50]])```
#### getRating
獲得某個 user 對這部電影的評分，若沒有則回傳 -1。
#### predictRatings
對一部電影跑一個 for loop，檢查哪些 users 沒有對這部電影評分。對於空缺的評分，從這部電影的 neighbors 找出有評分且與之最像的 ```N``` 部電影 (若不夠則從缺)，計算 weighted average。最後會對所有空缺的評分算出預測值回傳。
#### mapper7
拿出每部電影原有的評分，以及其所有 neighbors 的評分，呼叫 ```predictRatings``` 計算某一部電影的所有空缺評分。<br>
E.g., ```(1, [[1, 1.0], [4, 1.0], [5, 1.0])``` → ```(1, [[1, 1.0], [4, 1.0], [5, 1.0], [2, 1.0], [3, 1.0]])```
#### mapper8
將同一部電影的所有評分展開成多個 key-value pairs。<br>
E.g., ```(1, [[1, 1.0], [4, 1.0], [5, 1.0], [2, 1.0], [3, 1.0]])``` → ```[((1, 1), 1.0), ((4, 1), 1.0), ((5, 1), 1.0), ((2, 1), 1.0), ((3, 1), 1.0)]```

In [6]:
def mapper_load(line):
    movieId1, movieId2, similarity = line.split(",")
    return ((int(movieId1[1:]), int(movieId2[:-1])), float(similarity))

def mapper5(x):
    From, To, similarity = x[0][0], x[0][1], x[1]
    return [(From, [To, similarity]), (To, [From, similarity])]

def mapper6(x):
    # extract neighbors
    similarities = []
    for i in range(int(len(x[1])/2)):
        if x[1][2*i+1] <= 0:
            break
        similarities.append([x[1][2*i], x[1][2*i+1]])
    return (x[0], similarities)

def getRating(user, list):
    for i in range(len(list)):
        if list[i][0] == user:
            return list[i][1]
    return -1

def predictRatings(similarities, ratings):
    predictions = list(ratings[0][1])
    length = len(predictions)
    ratings = ratings[1:]
    for i in range(1, numOfUsers+1):
        # if the rating does not exist, append the predicted value
        if getRating(i, predictions) == -1:
            # calculate weighted average
            predictedRating = 0
            sumOfWeights = 0
            count = 0
            for j in range(len(ratings)):
                rating = getRating(i, ratings[j][1])
                if rating != -1:
                    predictedRating += similarities[j][1] * rating
                    sumOfWeights += similarities[j][1]
                    count += 1
                if count == N:
                    break
            if sumOfWeights == 0:
                predictedRating = 0.0
            else:
                predictedRating /= sumOfWeights
            predictions.append([i, predictedRating])
    return predictions[length:]

def mapper7(x, moviesToUsers):
    # get Ids of self and neighbors
    movies = [x[0]]
    for neighbor in x[1]:
        movies.append(neighbor[0])
    # get ratings of self and neighbors
    ratings = []
    for i in range(len(x[1])+1):
        for j in range(len(moviesToUsers)):
            if moviesToUsers[j][0] == movies[i]:
                ratings.append(moviesToUsers[j])
    # predict ratings
    predictions = predictRatings(x[1], ratings)
    return (x[0], predictions)

def mapper8(x):
    predictions = []
    for i in range(len(x[1])):
        predictions.append(((x[1][i][0], x[0]), x[1][i][1]))
    return predictions

In [7]:
ratings = sc.textFile(ratingsFile)
ratings = ratings.filter(lambda line: not line.startswith('userId'))
ratings = ratings.map(mapper1).sortBy(lambda x: (x[0], x[1][0]))
ratings = ratings.reduceByKey(reducer1).flatMap(mapper2).sortBy(lambda x: x[0])
ratingsList = ratings.collect()

similarities = sc.textFile(similaritiesFile)
similarities = similarities.map(mapper_load)

neighbors = similarities.flatMap(mapper5).sortBy(lambda x: (x[0], -x[1][1]))
neighbors = neighbors.reduceByKey(reducer1).map(mapper6)

predictions = neighbors.map(lambda x: mapper7(x, ratingsList))
predictions = predictions.flatMap(mapper8).sortBy(lambda x: (x[0][0], x[0][1]))
predictionsList = predictions.collect()

f = open(predictionsFile, 'w')
for pair in predictionsList:
    f.write("(%d, %d), %.6f\n" % (pair[0][0], pair[0][1], pair[1]))
f.close()