# PageRank
With PageRank equation &nbsp;[Brin-Page, '98] :
$$ r_j=\sum_{i\rightarrow j}\beta\dfrac{r_i}{d_i}+(1-\beta)\dfrac{1}{N} $$  
and dead-ends renormalize :
$$ \forall j:r^{new}_j=r'^{new}_j+\dfrac{1-S}{N}\quad where: S=\Sigma_jr'^{new}_j  $$  
## Report :  
### 首先將 input 轉成 adjacent list :  
```python
flatMap(mapper_split) 及 redeceByKey(reducer)

[(2, [4]), (1, [2, 3]), (3, [1, 4, 5]), (5, [1, 4])]
```
### 接著 create initial ranks :  
```python
ranks = mapValues(lambda dests: init)

[(2, 0.2), (1, 0.2), (3, 0.2), (5, 0.2)]
```
### 以下以 for loop 執行 20 次    
##### 把 adjacent list 跟 ranks join 在一起 :
```python
contribution = lines.join(ranks)

[(2, ([4], 0.2)), (1, ([2, 3], 0.2)), (3, ([1, 4, 5], 0.2)), (5, ([1, 4], 0.2))]  
```
##### 計算 PageRank equation :
```python
contribution = contribution.flatMap(lambda line: compute(line[1][0], line[1][1]))
contribution = contribution.reduceByKey(reducer)
ranks = contribution.mapValues(lambda rank: (beta * rank) + ((1 - beta) / v_cnt))

[(4, 0.33333333333333337), (2, 0.12000000000000001), (3, 0.12000000000000001), (1, 0.17333333333333334), (5, 0.09333333333333332)]
```
##### 計算 dead-ends renormalize :
```python
S = ranks.values().sum()
ranks = ranks.mapValues(lambda rank: rank + (1 - S) / v_cnt)

[(4, 0.36533333333333334), (2, 0.152), (3, 0.152), (1, 0.2053333333333333), (5, 0.1253333333333333)]
```
### 迴圈結束後進行排序並輸出答案 :
```python
ans = ranks.collect()
ans = sorted(ans, key = lambda x: x[1], reverse = True)
print(ans)

[(4, 0.3290734821381495), (1, 0.19314551263107277), (2, 0.16990996237299621), (3, 0.16990996237299621), (5, 0.13796108048478528)]
```

In [1]:
from pyspark import SparkConf, SparkContext
import datetime
import re
beta = 0.8
iterations = 20
v_cnt = 10876
#v_cnt = 5
init = 1 / v_cnt

def mapper_split(lines):
    maplist = []
    linelist = lines.splitlines()
    for line in linelist:
        item = re.split(" |\t", line)
        maplist.append((int(item[0]),[int(item[1])]))
    return maplist
def reducer(x, y):
    return x + y
def compute(dests, rank):
    d = len(dests)
    for dest in dests:
        yield (dest, rank / d)
'''
def mapper_link(line):
    maplist = []
    key = line[0]
    for dest in line[1][0]:
        maplist.append((dest, beta * init / len(line[0])))
    return maplist
def mapper_random(line):
    maplist = []
    random = (1 - beta) / v_cnt
    maplist.append((line[0], line[1] + random))
    return maplist
def mapper_leak(line):
    maplist = []
    leak = (1 - S) / v_cnt
    maplist.append((line[0], line[1] + leak))
    return maplist
'''

'\ndef mapper_link(line):\n    maplist = []\n    key = line[0]\n    for dest in line[1][0]:\n        maplist.append((dest, beta * init / len(line[0])))\n    return maplist\ndef mapper_random(line):\n    maplist = []\n    random = (1 - beta) / v_cnt\n    maplist.append((line[0], line[1] + random))\n    return maplist\ndef mapper_leak(line):\n    maplist = []\n    leak = (1 - S) / v_cnt\n    maplist.append((line[0], line[1] + leak))\n    return maplist\n'

In [2]:
start = datetime.datetime.now()

sc.stop()
conf = SparkConf().setMaster("local[*]").setAppName("PageRank")
sc = SparkContext(conf = conf)
lines = sc.textFile("p2p-Gnutella04.txt")
#lines = sc.textFile("test.txt")
lines = lines.flatMap(mapper_split)
lines = lines.reduceByKey(reducer)
ranks = lines.mapValues(lambda dests: init) #create initial ranks
for i in range(iterations):
    contribution = lines.join(ranks)
    contribution = contribution.flatMap(lambda line: compute(line[1][0], line[1][1])) 
    contribution = contribution.reduceByKey(reducer)
    ranks = contribution.mapValues(lambda rank: (beta * rank) + ((1 - beta) / v_cnt)) 
    S = ranks.values().sum()
    ranks = ranks.mapValues(lambda rank: rank + (1 - S) / v_cnt)
ans = ranks.collect()
ans = sorted(ans, key = lambda x: x[1], reverse = True)

'''#lines = lines.flatMap(mapper_link)
#print(lines.collect())
#lines = lines.reduceByKey(reducer)
#print(lines.collect())
#lines = lines.flatMap(mapper_random)
#print(lines.collect())
# S = lines.map(lambda v:v[1]).sum()
#S = lines.values().sum()
#print(S)
#lines = lines.flatMap(mapper_leak)
#print(lines.collect())'''

end = datetime.datetime.now()
print("執行時間：", end - start)

執行時間： 0:00:07.003871


In [3]:
file = open('output.txt', 'w')
i = 0
for dest, rank in ans:
    if i > 9:
        break
    file.write(str(dest) + "\t" + str(rank))
    file.write('\n')
    i += 1
file.close()
sc.stop()