In [3]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("Page_Rank")
sc = SparkContext(conf=conf)

讀入資料，Ranks為初始化的weight，為N*1的RDD，值為1/N

In [4]:
text = sc.textFile("p2p-Gnutella04.txt")
vertex = 10876
ranks = sc.parallelize(range(0,vertex)).map(lambda x: (x,(1/vertex)))
iterations = 20
beta = 0.8

In [5]:
def mapper1(line):
    wordlist = line.split("\t")
    return ((int(wordlist[0]), int(wordlist[1])))

In [6]:
def mapper2(line):
    listM=[]
    for item in line[1]:
        listM.append(((line[0],(item, 1/len(line[1])))))
    return listM

先使用mapper1，將資料讀進來後，包裝成(key, value)=(x, y)，y為x所指的page <br>
接著使用groupByKey()，依照key進行分群<br>
使用mapper2計算weight，初始的weight為1/out_degree<br>
links(key,value) = (x, (y, 1/out_degree_of_x))，根據每一個page的out_degree，計算所代表的weight

In [7]:
links = text.map(mapper1).groupByKey().flatMap(mapper2).cache()
links.collect()

[(0, (1, 0.1)),
 (0, (2, 0.1)),
 (0, (3, 0.1)),
 (0, (4, 0.1)),
 (0, (5, 0.1)),
 (0, (6, 0.1)),
 (0, (7, 0.1)),
 (0, (8, 0.1)),
 (0, (9, 0.1)),
 (0, (10, 0.1)),
 (1, (2, 0.1)),
 (1, (11, 0.1)),
 (1, (12, 0.1)),
 (1, (13, 0.1)),
 (1, (14, 0.1)),
 (1, (15, 0.1)),
 (1, (16, 0.1)),
 (1, (17, 0.1)),
 (1, (18, 0.1)),
 (1, (19, 0.1)),
 (3, (20, 0.1)),
 (3, (21, 0.1)),
 (3, (22, 0.1)),
 (3, (23, 0.1)),
 (3, (24, 0.1)),
 (3, (25, 0.1)),
 (3, (26, 0.1)),
 (3, (27, 0.1)),
 (3, (28, 0.1)),
 (3, (29, 0.1)),
 (8, (30, 0.1)),
 (8, (31, 0.1)),
 (8, (32, 0.1)),
 (8, (33, 0.1)),
 (8, (34, 0.1)),
 (8, (35, 0.1)),
 (8, (36, 0.1)),
 (8, (37, 0.1)),
 (8, (38, 0.1)),
 (8, (39, 0.1)),
 (10, (41, 0.1)),
 (10, (136, 0.1)),
 (10, (137, 0.1)),
 (10, (138, 0.1)),
 (10, (139, 0.1)),
 (10, (140, 0.1)),
 (10, (141, 0.1)),
 (10, (142, 0.1)),
 (10, (143, 0.1)),
 (10, (144, 0.1)),
 (12, (40, 0.1)),
 (12, (41, 0.1)),
 (12, (42, 0.1)),
 (12, (43, 0.1)),
 (12, (44, 0.1)),
 (12, (45, 0.1)),
 (12, (46, 0.1)),
 (12, (47, 0.1)

page rank的演算法做的主要是page權重跟現有的PR值做矩陣運算<br>
相當於將links跟ranks做矩陣乘法得到R<br>
所以首先將 links 跟 ranks 做 join，同時為了避免iteration所造成join的partitions數增加，所以設定numPartitions為1<br>
之後再做reduce就得到可以得到R<br>
再來因為有dead end，所以必須更新R的值以確保R的總和是1<br>
所以計算renormalize = 1 - total R<br>
以及random walk的beta值等於0.8<br>
整體的Rnew會等於R+(1-renormalize)/vertex)*beta+(1-beta)/vertex)<br>

In [8]:
import time
TS = time.time()

for i in range(0,iterations):
    TSS = time.time()

    j = links.join(ranks,1).cache()
    resultj = j.map(lambda x: ((x[1][0][0]), x[1][0][1]*x[1][1])).reduceByKey(lambda x,y:x+y)
    
    vertex = resultj.count()
    renormalize = resultj.values().sum()

    ranks = resultj.mapValues(lambda value:( value+ ( 1- renormalize)/ vertex )* beta + (1 - beta)/ vertex).cache()
    
    TFF = time.time()
    print("iterations:",i,"  time:",TFF-TSS)
TF = time.time()
print("Total time:",TF-TS)

iterations: 0   time: 3.2699050903320312
iterations: 1   time: 3.5747737884521484
iterations: 2   time: 3.559680223464966
iterations: 3   time: 3.6019949913024902
iterations: 4   time: 3.5286171436309814
iterations: 5   time: 3.4957592487335205
iterations: 6   time: 3.5169873237609863
iterations: 7   time: 3.5464298725128174
iterations: 8   time: 3.5378410816192627
iterations: 9   time: 3.5076792240142822
iterations: 10   time: 3.478394031524658
iterations: 11   time: 3.523465156555176
iterations: 12   time: 3.5422637462615967
iterations: 13   time: 3.532994508743286
iterations: 14   time: 3.5667598247528076
iterations: 15   time: 3.5921456813812256
iterations: 16   time: 3.579259157180786
iterations: 17   time: 3.5136544704437256
iterations: 18   time: 3.5022072792053223
iterations: 19   time: 3.497391700744629
Total time: 70.47019863128662


經過20次疊代後的結果如下

In [9]:
print(ranks.values().sum())
ranks.map(lambda x:(x[0], round(x[1],6) ) ).sortBy(lambda a: -a[1]).collect()[:10]

1.0000000000000016


[(1054, 0.000632),
 (1056, 0.000632),
 (1536, 0.000527),
 (171, 0.000515),
 (453, 0.000498),
 (407, 0.000487),
 (263, 0.000482),
 (4664, 0.000474),
 (261, 0.000466),
 (410, 0.000464)]

In [2]:
sc.stop()