#### **This is the first version of PageRank on MapReduce.**

The Internet is stored as a big matrix ***M*** (size n × n). Specifically the column-normalized adjacency matrix
where each column represents a webpage and where it links to are the non-zero entries.
Break M into k vertical stripes M = [M1 M2 . . . Mk] so each Mj fits on a machine. İnitiate ***q*** as a vector of PageRank with  values as 1/number of pages(n). 
* ***Mapper:***  j → key= j' ∈ [k] ; value = row r of Mj ∗ qj 
* ***Reducer:***  adds values for each key i to get qi+1[j] ∗ β + (1 − β)/n.

typically β = 0.85.

In [1]:
import numpy as np
from pyspark import SparkContext
import pickle
import networkx as nx
import pandas as pd

In [2]:
with open("G1.pkl", "rb") as file:
    G1 = pickle.load(file)

In [3]:
def top_N_G1(data,N=G1.number_of_nodes()):
    top=data['hero'].value_counts().head(N)
    hero_list=top.index.tolist()
    return hero_list

In [4]:
#Create a dataframe with the edges datas
df_edges = pd.read_csv('edges.csv')

#Extract the subgraph of the top_N heros for the first graph
hero_list=top_N_G1(df_edges, 500)
top=G1.subgraph(hero_list)

In [5]:
#Create the adjacency matrix of the first graph
M_G1=nx.to_numpy_matrix(top)
Matrix=M_G1.tolist()

#Save the length of M_G1 as n
n1=len(M_G1)

#Beta is usually sets as 0.85
beta=0.85

In [14]:
#Set up the Spark context
sc = SparkContext()

In [7]:
#Convert M_G1 to RDDs (into k vertical stripes)
M_rdd = sc.parallelize(np.transpose(Matrix))
rddCollect = M_rdd.collect()
N=M_rdd.count()

#Inizialize q 
q =[]
for i in range(N):
    q.append(1/N)


                                                                                

In [8]:
def mapper(row,q):
    m=[]
    for i in range(len(row)):
        m.append(((i+1),row[i]*q[0]))
    return m

In [9]:
#Map the grouped tiles
M_mapped=M_rdd.flatMap(lambda x: mapper(x,q))

#Reduce the mapped output by adding the values for each row
M_reduced = M_mapped.reduceByKey(lambda x, y: x + y)

#Update the values of q using the map-reduce output
q_updated = M_reduced.map(lambda row: (row[0], row[1] * beta + (1 - beta) / n1))

#Return the updated values of q
c=q_updated.collect()
c[:10]

                                                                                

[(2, 0.10016190035063106),
 (4, 0.1986695519750855),
 (6, 0.17743980344805738),
 (8, 0.08122033085994863),
 (10, 0.1233882818284333),
 (12, 0.11635272965760857),
 (14, 0.6187876090383257),
 (16, 0.2654296840031845),
 (18, 0.2086029757399857),
 (20, 0.07413488952437754)]

In [10]:
#Take matrix M from the example on the pdf file
M=[[0, 0.5, 0, 0],
   [0.3, 0, 1, 0.5],
   [0.3, 0, 0, 0.5],
   [0.3, 0.5, 0, 0]]

n=len(M)


In [11]:
#Convert M to RDDs (into k vertical stripes)
M_rdd = sc.parallelize(np.transpose(M))
rddCollect = M_rdd.collect()
N=M_rdd.count()

#Inizialize q 
q =[]
for i in range(N):
    q.append(1/N)

In [12]:
#Map the grouped tiles
M_mapped=M_rdd.flatMap(lambda x: mapper(x,q))

#Reduce the mapped output by adding the values for each row
M_reduced = M_mapped.reduceByKey(lambda x, y: x + y)

#Update the values of q using the map-reduce output
q_updated = M_reduced.map(lambda row: (row[0], row[1] * beta + (1 - beta) / n))

#Return the updated values of q
q_updated.collect()

[(2, 0.42000000000000004),
 (4, 0.20750000000000002),
 (1, 0.14375),
 (3, 0.20750000000000002)]

Where q is a probability vector with tuples where the first values are equal to the number of a spacific node (one of n), the second values are the probability that you are in that node.

In [13]:
#Stop the Spark context
sc.stop()