In [34]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkConf, SparkContext
import numpy as np

conf = SparkConf().setMaster('local').setAppName('PageRank')
sc = SparkContext(conf=conf)

In [35]:
# set up parameters
num_nodes = 10876
beta = 0.8
teleport = 1/num_nodes
# load inputs
inputs = sc.textFile("p2p-Gnutella04.txt")

In [36]:
def mapper_1(x):
    x = x.split("\t")
    j = x[0]
    i = x[1].split("\n")[0]
    print((j,i))
    return (j,[(i,1)])

def mapper_2(x):
    j = int(x[0])
    edges = len(x[1])
    Ed_list = []
    for i in range(edges):
        i, v = x[1][i]
        Ed_list.append((int(i),beta/edges))
    return (j,(Ed_list))

Transistion_Matrix = inputs.map(mapper_1).reduceByKey(lambda p,q:p+q).map(mapper_2)

mapper_1 we map each index txt to format (j,(i,1))
then we reduce by key to concatenate it key pairs into (j, [(i,1),(k,1) ...])
mapper_2 we map the pairs into (j, [(i,value),(k,value) ...]) where the value is 1 divided by the outgoing edges

In [37]:
def rank_init_mapper(x):
    num_nodes = 10876
    return (x,([1/num_nodes]))

rank_init = sc.parallelize(np.arange(num_nodes))
rank_inits = rank_init.map(rank_init_mapper)

here we initialize the pagerank first by 1/all nodes

In [None]:
def teleporting(x):
    beta = 0.8
    num_nodes = 10876
    return (x, ((1-beta)/num_nodes))

teleport = sc.parallelize(np.arange(num_nodes))
teleport_term = teleport.map(teleporting)

here we create the teleporting of each page which we will add after the matrix multiplication is done

In [39]:
def TM_x_Rank_mapper(x):
    
    if len(x[1]) == 1:
        pass
    else:
        r = x[1][-1]
        for k in range(len(x[1])-1):
            print(x[1][k])
            i = x[1][k][0]
            value = x[1][k][1]*r
            yield (i,(value))

here we create a mapper that can do the matrix multiplication by (j, [(i,value),(k,value)... r])
where r is the page rank that we have in current and output pairs (i, value)

In [40]:
def sum_mapper(x):
    return (1,x[1])

def add_s(x,s):
    print(s)
    return (x[0],([x[1]+s]))

times = 0
Rank_new = rank_inits

here we create sum_mapper to sum up all the page rank and we will check if it equals to 1
and we also create add_s to add the teleporting term after the matrix multiplication

In [42]:
while times < 20 :

    T_R = Transistion_Matrix.union(Rank_new).reduceByKey(lambda p,q:p+q)
    Rank_pre = T_R.flatMap(TM_x_Rank_mapper).reduceByKey(lambda p,q:p+q)
    Rank_old = Rank_pre.union(teleport_term).reduceByKey(lambda p,q:p+q)

    
    check_Rank_old = Rank_old
    S = check_Rank_old.map(sum_mapper).reduceByKey(lambda p,q:p+q).collect()[0][1]
    S = round(S,8)
    if S != 1:
        s = (1-S)/num_nodes
        Rank_new = Rank_old.map(lambda x:add_s(x,s))
    else:
        Rank_new = Rank_old
    
    print(times)
    times = times + 1

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


first we union the key value pairs of the transition matrix and the old page rank
then we calculate the new pagerank r', then we add the teleporting term
after all these, we sum up the ranks and check if it equals to one
if not we renormalize it into the new page rank
if yes we get our new page rank
and follow by the iteration we compute this for 20 iterations

In [58]:
Page_Rank = Rank_new.sortByKey().collect()
with open("Page_Rank.txt",'w') as f:
    for i in Page_Rank:
        f.write(str(i[0])+"\t"+str(i[1][0])+"\n")

write the result into the txt file

In [None]:
sc.stop()

after finish output to the file we wat to put it in the decreasing order by value
so we open the file we saved and reorganized the structure 

In [5]:
PR_txt = "Page_Rank.txt"
with open(PR_txt,'r') as f:
    lines = f.readlines()

Lines = {}

for i in range(len(lines)):
    
    Lines[i] = float(lines[i].split("\t")[1].split("\n")[0])

Top_2_Low_PR = sorted(Lines.items(), reverse=True,key=lambda x:x[1])

with open("PageRank.txt",'w') as F:
    for i in range(len(Top_2_Low_PR)):
        F.write(str(Top_2_Low_PR[i][0])+"\t"+str(Top_2_Low_PR[i][1])+"\n")
        if i <= 10:
            print("{}\t".format(i),Top_2_Low_PR[i])
        

0	 (1056, 0.0006321988065666042)
1	 (1054, 0.0006291557098509748)
2	 (1536, 0.0005239103372466644)
3	 (171, 0.0005116224681556148)
4	 (453, 0.000495658645299187)
5	 (407, 0.0004848441973200729)
6	 (263, 0.00047961928702500796)
7	 (4664, 0.000470497549157698)
8	 (261, 0.0004628915843548081)
9	 (410, 0.00046151003608277695)
10	 (1959, 0.00046052908830341416)
