In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
# Mapper for get (from, to), (to, []) KV pair and use flatMap to get both KV pair
def mapper0(link):
    list1 = []
    list1 = link.split('\t')
    return (list1, (list1[1], []))

## mapper0

   在這個 mapper 裡，除了將本來題目就有給的 (from, to) KV pair
   
   還會回傳一個 (to, []) pair 的 KV pair 
   
   這個 pair 是若沒有回傳這個 pair，沒有 outlink 的 node 就會被拋棄，這樣 PageRank就會錯掉
   
   所以我就多回傳了這個 KV pair
   
   並使用 flatMap 來使用這個 map function

In [3]:
# Since after mapper0, it will construct a lot of [] in some node
# So we will use this mapper to delete it
def mapper1(link):
    list = []
    for i in link[1]:
        if i != []:
            list.append(i)
    return (link[0], list)

## mapper1

   這個 mapper 裡，是為了處理從 mapper0 得出來的 KV pair，在groupByKey後
   
   有可能產生 ( A, [[], [], []]) 這種有很多 value 為空list的狀況
   
   所以在這裡我們會先用一個空集合來代表新的 value list
   
   然後碰到不是空 list的元素就append進去
   
   最後再return 回 list
   
   這樣若有 value 全部都為空list的 node，最後也還能得到空list

In [4]:
# Mapper for counting the outedge and reconstruct the rdd data structure to simplfy for using it
def mapper2(link):
    cnt = 0
    for ch in link[1][0]:
        cnt+=1
    return (link[0], link[1][0], link[1][1] , cnt)

## mapper2

   這個 mapper 是要計算每個 node 有幾個 outlink
    
   於是我就用一個cnt，去算每一個 node 由剛剛 mapper1 後可得到的adjacent list有多少個元素
    
   並且重新回傳我希望的 rdd 的 structure

In [5]:
# Mapper for computing the Rnew' (Before computing teleporting set)
def mapper3(link):
    list = []
    for i in range (link[3]):
        list.append((link[1][i], link[2]/link[3] * Beta))
    list.append((link[0], 0))
    return list

## mapper3

   在這裡要實踐 $rnew' = \sum_{i->j} \beta*rold_{i}/d_{i}$
   
   在這邊我希望我回傳多個 KV pair，每個 pair 都是 ( node_be_contributed, total_contribution_by_this_node )
   
   所以我就讓讀取 node 的 adjacent list，讓裡面每一個元素 ( link[1][i] ) 當成 KV pair 的 key
   
   而 value 的部分，就取用當前 node 的 PageRank ( link[2] ) 除以此 node 有多少 outlink ( link[3] )
   
   然後最後為了防止此 node 沒有 inlink，在經過一次 iteration 後就會被拋棄
   
   在回傳的 list 最後加一個 ( contribute_node, 0 ) 的 KV pair，即使此 node 有 inlink
   
   在後面 ReduceByKey() 也會被 reduce 掉
   
   
   然後用 flatMap 來使用此 mapper，可以直接拿到每個 KV pair

In [6]:
# Mapper for computing Rnew (teleporting set)
def mapper4(page):
    num = page[1]
    num += (1-Sum)/N
    return (page[0], num)

## mapper4
   這個 mapper 是用來處理 Random teleport的
   
   首先用 num 拿出當前的 PageRank(因為tuple直接修改)
   
   並使用 Global 變數 Sum( Sum 為 $rnew'$ 的總和，底下會解釋如何實作)
   
   計算 $ num = num + (1-Sum)/N $
   
   並取出 key ，包成 ( key, num ) return

In [7]:
Beta = 0.8 # Let Beta = 0.8
link = sc.textFile("p2p-Gnutella04.txt")  # Get file
link = link.flatMap(mapper0).groupByKey().map(lambda x : (x[0], list(x[1])))   # After mapper0, groupByKey them
N = link.count()   # Count how many node
link = link.map(mapper1)  # delete the duplicated [] in link
ranks = link.map(lambda node: (node[0],1.0/N)) # Give initial pagerank for every node
Sum = 0 # Global variable

## mapper

上面這格程式碼中第二行的 mapper 只是為了在 groupByKey 後可以讓後面的 value 好好的存在一個 list 裡面

第六行的 mapper 則是為了要給每一個 node 一個初始的 PageRank，於是就用 1/N 得到每個 node 的初始 PageRank 後

用一個 mapper 把 PageRank 接在其他資料的後面

In [8]:
for i in range(20):
    newlink = link.join(ranks) # put pagerank into every node's rdd
    newlink = newlink.map(mapper2) # count outlink of every node 
    r_new_p = newlink.flatMap(mapper3) 
    r_new_p = r_new_p.reduceByKey(lambda a, b: a+b)
    Sum = r_new_p.values().sum() # Sum r_new' to deal with deadend
    r_new = r_new_p.map(mapper4) # Give teleport prob for every node
    ranks = r_new # assign r_new to rjold

## Reducer

   這次實作中，唯一用到的 Reducer 就是在上面這格程式碼中第五行的 ReduceByKey(lambda a, b : a+b)
   
   在這邊為了加總許多 KV pair，其中 key 為某個 node， value 為被某個其他的 node 貢獻的 PageRank
   
   由於會有許多相同的 key 不同 value 的值需被加總
   
   所以就用 ReduceByKey(lambda a, b : a+b) 就可以得到， ( node, total_PageRank_contributed_by_others ) 的 KV pair
 
 ## Sum
   Sum 這邊是透過兩個 RDD 的 API 來得到
   
   首先先對前面算出來，存每個 KV pair 的 RDD 執行 .values()
   
   這個 API 可以讓我們的RDD變成所有 value  
   
   接著再用 .sum() 這個 API
   
   可以幫我們 RDD 內所有元素加總起來
   
   故這樣即可得到 $rnew'$ 的總和

In [9]:
ranks = ranks.sortBy(lambda x: x[1], False) # sort by value
ans = ranks.take(10) # take top 10 vertices

In [10]:
for i in range(10):
    print(ans[i][0], '%.6g'%ans[i][1])

1056 0.000632199
1054 0.000629156
1536 0.00052391
171 0.000511622
453 0.000495659
407 0.000484844
263 0.000479619
4664 0.000470498
261 0.000462892
410 0.00046151


In [11]:
outF = open("Outputfile.txt", "w")
for i in range(10):
    outF.write(str(ans[i][0]) + ' ' + str('%.6g'%ans[i][1]))
    outF.write("\n")
outF.close()

In [12]:
sc.stop()