## Set up

姓名 |學號
---|---
劉弘祥|106022103

In [0]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext

In [7]:
conf = SparkConf().setMaster("local").setAppName("PageRank").set("spark.default.parallelism", 4)
sc = SparkContext(conf=conf)
sc

## Parameter

設定題目給定的 $\beta = 0.8$

In [0]:
beta = 0.8

## Input

In [0]:
lines = sc.textFile("PageRank_data.txt")

In [0]:
matrix_M = lines.map(lambda x : x.split("\t")).map(lambda x: (int(x[0]),int(x[1])))

In [0]:
Nodes = matrix_M.flatMap(lambda x : (x[0],x[1])).max()+1

n=1/Nodes
beta_complent_n = (1-beta)*n

## Build M

### column sum
$d_j=\sum\limits_i m_{ij}$

對於每一個節點來說，流出的貢獻平分，所以要計算每個節點有連出的數量
同時有透過取差集的方式來把那些沒有流出任何的節點標上0

In [0]:
column_sum = matrix_M.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)
N = sc.parallelize(range(Nodes)).map(lambda x: (x, 0))
dj = N.subtractByKey(column_sum).union(column_sum)
djMap = dj.collectAsMap()
djMap = column_sum.collectAsMap()
# use broadcast to save memory
bc_djMap = sc.broadcast(djMap)

### adjancy matrix M 

將input的資料轉換成轉移矩陣$M$，value設成$\frac{1}{d_i}$

In [0]:
M = matrix_M.map(lambda x: ([x[1],x[0],1/bc_djMap.value[x[0]]]) )

### M with renormalize

因為可能會有deadend，所以要做renormalize的動作  
先找出所有沒有值的column，然後把它們補上$[\frac{1-\beta}{N}]_N$

In [0]:
M_no_values = sc.parallelize(range(Nodes)
              ).subtract(matrix_M.map(lambda x: ([x[1],x[0],1/bc_djMap.value[x[0]]]) 
              ).map(lambda x :x[1]).distinct())

N1 = sc.parallelize(range(Nodes)).map(lambda x: (x,n))

M_renormalize_part = M_no_values.cartesian(N1).map(lambda x: ([x[1][0],x[0],x[1][1]]))

M_new = M.union(M_renormalize_part)

## Build $r$

設定初始值$r_0$，本次題目給定要求是$[\frac{1-\beta}{N}]_N$,所以拿前面已經算過的來用

In [0]:
r0 = N1

## Calculate

先跑一次計算確認結果沒問題

$r_{old}$

每次代入的變數$r_{old}$，一開始是從$r_0$開始

In [0]:
r_old = r0

### $\beta M$

因為$\beta M$不會變動，所以可以先算一次之後存起來  
把它放到內存裡面以節省時間

In [0]:
betaM = M_new.map(lambda x: ([x[0],x[1],x[2]*beta]))

In [43]:
betaM.persist()
betaM.is_cached

True

### $+ [\frac {1-\beta} {N}]_N$

後面的加法部分定義成一個`mapper`

In [0]:
def mapper1(x):
    # (key, value)
    return (x[0],x[1]+beta_complent_n)

$\beta M \cdot r_{old}$

乘法部分和前一次矩陣乘法類似  
不過因為r只有一個column，所以可以再做簡化

In [0]:
beta_Mr = betaM\
            .map(lambda x: (x[1],(x[0],x[2])))\
            .join(r_old)\
            .map(lambda x: (x[1][0][0],x[1][0][1]*x[1][1]))\
            .reduceByKey(lambda x,y: x+y)

$r_{new} = \beta M\cdot r_{old} + [\frac {1-\beta} {N}]_N$

到這邊整個算式即運算完成一次

In [0]:
r_new = beta_Mr.map(mapper1)

### Show top 20

這邊根據要求列出排行前20的page  
用`top(n)`來取得前n個key值

In [47]:
r_new.map(lambda x: (x[1],x[0])).top(20)

[(0.0006184585080596114, 1054),
 (0.0005633413973307755, 1056),
 (0.0005455351388774059, 1536),
 (0.0004874065468762097, 407),
 (0.0004605600084017647, 261),
 (0.000459334405558372, 410),
 (0.0004570688972720929, 1198),
 (0.0004533814774618516, 4054),
 (0.0004507551856545751, 453),
 (0.0004376820886583309, 127),
 (0.00043686502009606277, 171),
 (0.0004341803662486162, 165),
 (0.00042379192309980804, 263),
 (0.0004043865447459988, 345),
 (0.00040418227760543647, 982),
 (0.00040411942617756967, 987),
 (0.00039763113859726785, 763),
 (0.00039029211204692554, 5315),
 (0.000390292112046921, 2265),
 (0.00038375556354879167, 989)]

## Run


### 設定可重複執行function

確認好結果沒問題之後就開始設定可以反復運行的方式  
定義`run(start,times,r_old)`
+ start： 從哪次開始
+ times： 跑幾次
+ r_old： 一開始代入的$r_{old}$是什麼

In [0]:
def run(start,times,r_old):
    for i in range(times):
        beta_Mr = betaM\
                .map(lambda x: (x[1],(x[0],x[2])))\
                .join(r_old)\
                .map(lambda x: (x[1][0][0],x[1][0][1]*x[1][1]))\
                .reduceByKey(lambda x,y: x+y)
        r_new = beta_Mr.map(mapper1)
        print("Iteration %d :" % (start+i) + str(r_new.map(lambda x: (x[1],x[0])).top(20)))
        r_old = r_new
    return r_new
#     print("Iteration %d :" % (start+times) + str(r_new.take(20)))

### Iteration 1~5

In [50]:
r5 = run(1,5,r0)

Iteration 1 :[(0.0006184585080596103, 1054), (0.0005633413973307774, 1056), (0.0005455351388774055, 1536), (0.000487406546876212, 407), (0.00046056000840176805, 261), (0.00045933440555837024, 410), (0.00045706889727209585, 1198), (0.0004533814774618567, 4054), (0.0004507551856545752, 453), (0.00043768208865833176, 127), (0.0004368650200960621, 171), (0.00043418036624862056, 165), (0.00042379192309981184, 263), (0.0004043865447459999, 345), (0.00040418227760544113, 982), (0.00040411942617757065, 987), (0.0003976311385972696, 763), (0.00039029211204692294, 5315), (0.00039029211204692023, 2265), (0.00038375556354879417, 989)]
Iteration 2 :[(0.0006325012736368608, 1054), (0.0006249002957578525, 1056), (0.00052265669655163, 1536), (0.0004993855291876037, 171), (0.000492708770854366, 453), (0.0004752267968187292, 407), (0.00047120474778249047, 263), (0.0004637767566161001, 1959), (0.00046102686091041296, 165), (0.00046092221149464786, 261), (0.00045975525652818287, 410), (0.00045933667692516

### Iteration 6~10

In [52]:
r10 = run(6,5,r5)

Iteration 6 :[(0.0006320859564577316, 1056), (0.0006290599651476175, 1054), (0.0005238070755912347, 1536), (0.0005115756050901472, 171), (0.0004956152891154166, 453), (0.0004847393502147826, 407), (0.00047949091027437146, 263), (0.00047032294441673255, 4664), (0.0004628121664402244, 261), (0.00046143062820479144, 410), (0.0004604336721452401, 1959), (0.0004596610315775327, 165), (0.0004404931723839473, 1198), (0.0004282934121830601, 127), (0.00041940270666812, 4054), (0.0004104706390278336, 2265), (0.00041032711554685945, 345), (0.00040908898298776134, 763), (0.00040034323307104223, 989), (0.00039940681448927436, 987)]
Iteration 7 :[(0.0006320893035982628, 1056), (0.0006290424725374239, 1054), (0.0005238070420505889, 1536), (0.0005115393839020576, 171), (0.0004955823090568193, 453), (0.00048476163361963495, 407), (0.0004795306631455665, 263), (0.00047042846952384944, 4664), (0.0004628142562285543, 261), (0.0004614336656183441, 410), (0.0004604426321670695, 1959), (0.0004596500297188286

### Iteration 11~20

In [53]:
r20 = run(11,10,r10)

Iteration 11 :[(0.0006320902774181867, 1056), (0.000629047615619634, 1054), (0.000523820327875398, 1536), (0.0005115346105320633, 171), (0.0004955735406825306, 453), (0.0004847609925847329, 407), (0.00047953701210587895, 263), (0.00047041672885213075, 4664), (0.00046281209966545704, 261), (0.0004614308334365333, 410), (0.00046045000320310743, 1959), (0.0004596487191060736, 165), (0.0004404429051866899, 1198), (0.0004282758138928031, 127), (0.0004194359861013037, 4054), (0.00041045935062826314, 2265), (0.00041029088722494326, 345), (0.000409069403280316, 763), (0.0004003388022777247, 989), (0.00039941047498430113, 987)]
Iteration 12 :[(0.000632090254434627, 1056), (0.0006290476763641644, 1054), (0.0005238203751437125, 1536), (0.0005115346264321642, 171), (0.0004955735346258259, 453), (0.000484760959538955, 407), (0.0004795369466505337, 263), (0.00047041679225897823, 4664), (0.00046281210444254657, 261), (0.00046143079627305995, 410), (0.0004604500047477837, 1959), (0.0004596487375744006

因為是睡前在跑的，怕跑完放著出狀況所以就先把它存下來

In [0]:
r20.saveAsTextFile("/content/drive/My Drive/data/r20")

### Iteration 21~30

可以發現20之後確實都沒有什麼變化了，幾乎都可以算是浮點數運算的誤差而已，所以看起來答案有收斂到結果

In [56]:
r30 = run(21,10,r20)

Iteration 21 :[(0.0006320902604126689, 1056), (0.0006290476861854345, 1054), (0.0005238203838152873, 1536), (0.0005115346245041821, 171), (0.0004955735425766981, 453), (0.0004847609513976177, 407), (0.00047953693819950086, 263), (0.0004704167664998898, 4664), (0.00046281210761333204, 261), (0.0004614307965480585, 410), (0.0004604500171928866, 1959), (0.00045964873169962385, 165), (0.00044044294128968073, 1198), (0.0004282758064446827, 127), (0.0004194360298307221, 4054), (0.0004104593828009943, 2265), (0.0004102908763941391, 345), (0.00040906942073732353, 763), (0.00040033883920273007, 989), (0.00039941047608485025, 987)]
Iteration 22 :[(0.0006320902604126669, 1056), (0.0006290476861854313, 1054), (0.0005238203838152908, 1536), (0.0005115346245041758, 171), (0.0004955735425766846, 453), (0.0004847609513976462, 407), (0.00047953693819949934, 263), (0.00047041676649997794, 4664), (0.0004628121076133329, 261), (0.00046143079654805677, 410), (0.000460450017192911, 1959), (0.000459648731699

## Show answer

結果睡醒之後好像真的它就停住了，還好答案有先存下來  
所以就從裡面讀出來之後再做一些格式上的處理並顯示結果  

In [0]:
temp = sc.textFile("/content/drive/My Drive/data/r20")

In [0]:
r20 = temp.map(lambda x : x.split("(")[1]).map(lambda x : x.split(")")[0] ).map(lambda x : x.split(", ")).map(lambda x: (int(x[0]),(float(x[1]))))

In [0]:
Ans = r20.map(lambda x: (x[1],x[0]))

In [50]:
Ans.top(10)

[(0.0006320902604125117, 1056),
 (0.0006290476861855153, 1054),
 (0.0005238203838152776, 1536),
 (0.0005115346245042205, 171),
 (0.0004955735425767618, 453),
 (0.00048476095139752965, 407),
 (0.00047953693819939515, 263),
 (0.0004704167665000123, 4664),
 (0.00046281210761336467, 261),
 (0.00046143079654813244, 410)]

In [0]:
Output = Ans.top(20)

In [60]:
for item in Output:
    item = (item[1],item[0])
    item = str(item)
    item = item.replace("(","").replace(")","").replace(", ","\t")
    print (item)

1056	0.0006320902604125117
1054	0.0006290476861855153
1536	0.0005238203838152776
171	0.0005115346245042205
453	0.0004955735425767618
407	0.00048476095139752965
263	0.00047953693819939515
4664	0.0004704167665000123
261	0.00046281210761336467
410	0.00046143079654813244
1959	0.0004604500171931673
165	0.00045964873169962147
1198	0.00044044294128966
127	0.000428275806444587
4054	0.00041943602983139736
2265	0.0004104593828009529
345	0.0004102908763941842
763	0.0004090694207373179
989	0.0004003388392027947
987	0.0003994104760849357
