In [1]:
import os
from pyspark import SparkConf, SparkContext
f = open("/Users/mencher/Projects/big_data/Outputfile.txt", 'a')

# Report
本次作業要求用 Map-Reduce，計算 page rank 的值，以下為本次實作的步驟：
1. 初始化 PySpark ，讀入檔案後，首先將檔案逐行拆分。由於測資的樣本格式為 `<in-node> <out-node> (ex: 1 2)`，需再依據 Tab 分隔並產生 Python List（如：`[1, 2]`），方便後續存取，並透過 flatMap 輸出 rdd 格式。

2. 由於存在部分點只有 In-Node 沒有 Out-Node ，或只有 Out-Node 沒有 In-Node，我先分別計算只有 In-Node ，與只有 Out-Node 的，再透過 Union 在兩者取合集，找出全部的 Node 。

3. 然則，在計算中，當某個 Node 沒有 in-link （別人接給他），只有 out-link (他接到別人) 時，該點會在計算中遺失。也因此，對於這些特別的 Node，我補了自己接到自己的線，並讓其的 wight 預設為 0。透過這個設計，我們可避免計算中該 Node 遺失。

4. 在此部分，我試圖初始化 Pagerank。這部分的目標會生成兩種 Map，第一種 Map `totalMap` 負責記錄自己和他人的連線，第二種 Map `currentState` 負責記錄自己的 pagerank。

**totalMap的整理：**
```python
totalMap = splittedData.map(lambda val: (val[0], [val[1]])).reduceByKey(lambda x,y: x+y)
```
這部分會先將資料整理成 `(<in-node>, [<out-node1>, <out-node2>, <out-node3>])`，ex: `(1, [ 2, 4 ])`
```python
.flatMap(expandTotalMap)

def expandTotalMap(val):
    concatItem = []
    for item in val[1]:
        totalNum = len(val[1])
        concatItem.append((val[0], (item, 1/totalNum)))
    return concatItem
```
則是再度把整理的 value array 展開，賦予每個 out-link weight。weight 則是看自己共發散出幾條邊，進行平均。最終處理成 `(<in-node>, (<out-node>, <line weight>))`，ex: `(1, (2, 0.5)) 與 (1, (4, 0.5))`


```python
.union(additionalRelation)
```
再將只有 out-link ，但沒有 in-link 的點，補一個自己接自己的線。

**currentState的整理：**
```python
currentState = totalNode.map(lambda val: (val, 1/totalNodeNum))
```
進行初始化，每個點先平均分配 `1/<全部點個數>`。


5. 接續便要進行 `for-loop` 的持續更新，進行 pagerank 的計算。我們依照以下公式進行計算：

<img src="./equation.png" width="400px" />

並將 beta 設為 0.8 ， iteration 設為 20 次
```python
joinResult = totalMap.join(currentState).map(lambda val: (val[1][0][0], val[1][0][1]*val[1][1]*beta)).reduceByKey(lambda x, y: x+y)
```
此步驟每個 Node ，算出自己對每個 out-link 的 Contribution，將`<自己的pagerank>*<out-link 每條線的 weight>*beta` 後，再進行加總。

```python
_sum = joinResult.values().sum()
currentState = joinResult.map(lambda val: (val[0], ((1 - _sum)/totalNodeNum) + val[1]))
```
此步驟進行 Normalize ，因為每次計算的 pagerank 不一定加總起來為 1 。因此，我們先算出目前 pagerank 的加總(`_sum`)，再用 `1 - _sum`後取平均，並分配到每個 Node 身上，完成一次 Normalization。

以上步驟進行 20 次後，即完成 pagerank 計算。

6.  因應輸出需求，將結果依據 pagerank 值進行 sorting，並四捨五入後只取其前三位數，寫到 output file。

In [2]:
# union, count
# sum -> 不用 collect

totalEdgesNum = 0
beta = 0.8

def splitLines(lines):
    global totalEdgesNum
    splitedItems = []
    for perLine in lines.splitlines():
        totalEdgesNum = totalEdgesNum + 1
        splitedItems.append(perLine.split('\t'))
    return splitedItems
    
def expandTotalMap(val):
    concatItem = []
    for item in val[1]:
        totalNum = len(val[1])
        concatItem.append((val[0], (item, 1/totalNum)))
    return concatItem

# 1. First Step
conf = SparkConf().setMaster("local").setAppName("pagerank")
sc = SparkContext.getOrCreate(conf=conf)
splittedData = sc.textFile("./input.txt").flatMap(splitLines)

# 2. Second Step
inNodes = splittedData.map(lambda val: val[0]).distinct()
outNodes = splittedData.map(lambda val: val[1]).distinct()
totalNode = inNodes.union(outNodes).distinct()
totalNodeNum = totalNode.count()

# 3. Third Step
additionalRelation = inNodes.subtract(outNodes).map(lambda val: (val, (val, 0)))

# 4. Forth Step
totalMap = splittedData.map(lambda val: (val[0], [val[1]])).reduceByKey(lambda x,y: x+y).flatMap(expandTotalMap).union(additionalRelation)
currentState = totalNode.map(lambda val: (val, 1/totalNodeNum))

# 5. Fifth Step
for _ in range(20):
    joinResult = totalMap.join(currentState).map(lambda val: (val[1][0][0], val[1][0][1]*val[1][1]*beta)).reduceByKey(lambda x, y: x+y)
    _sum = joinResult.values().sum()
    currentState = joinResult.map(lambda val: (val[0], ((1 - _sum)/totalNodeNum) + val[1]))


# 6. Fifth Step
sorted_result = currentState.sortBy(lambda x: x[1], ascending=False)
for item in sorted_result.collect()[0:10]:
    f.write('%s\t%f\n'%(item[0], item[1]))
    print('%s\t%f\n'%(item[0], item[1]))
f.close()
sc.stop()


1056	0.000632

1054	0.000629

1536	0.000524

171	0.000512

453	0.000496

407	0.000485

263	0.000480

4664	0.000470

261	0.000463

410	0.000462

