In [15]:
from pyspark import SparkContext
import numpy as np
import pandas as pd

In [17]:
# Pendant les test pour stopper un SparkContext
#sc.stop()

In [18]:
# Connexion au cluster / test en local
sc = SparkContext("local", "Page Rank With PySpark")

In [31]:
# Préparation du fichier avant utilisation dans l'algo

# Taille du fichier initial = dfFile.shape = (5105039, 2)
#dfFile = pd.read_csv("web-Google.txt", sep='\t', header=3)

# Pour les tests récupérations des x premières lignes uniquement 
dfFile = pd.read_csv("data/web-Google.txt", sep='\t', header=3, nrows = 100)
print(dfFile.shape)
#Etant donné que nous ne sélectionons qu'une partie du fichier les noeuds sont filtrés en conséquence
nodesList = dfFile['# FromNodeId'].unique()
dfFileUniqueNodes = dfFile[dfFile['ToNodeId'].isin(nodesList)]
#dfFile.to_csv("web-GoogleSmall.txt", sep='\t', header=False, index=False)
dfFileUniqueNodes.to_csv("data/web-GoogleSmall.txt", sep=' ', header=False, index=False)
print(dfFileUniqueNodes.shape)

(100, 2)
(20, 2)


In [33]:
# Read the text file "GoogleReduc.txt" from a local file system (available on all nodes),
# and return it as an RDD of Strings.
RddDataBase = sc.textFile("data/web-GoogleSmall.txt")
#RddDataBase.collect()

In [34]:
def linkNodesFromTo(NodesRow):
    nodeslinked = NodesRow.split(' ') 
    return nodeslinked[0], nodeslinked[1]

In [35]:
# First step :
# Create key/value pairs, 
# where the key is the name of the page and the value is out-links from the page (Di) 
# and İnitiate PageRank values (Ri) as 1/Number of pages.

# Key/value pairs
links = RddDataBase.map(lambda NodesRow: linkNodesFromTo(NodesRow)).distinct().groupByKey().cache()
print(links.collect())

# Find node count
N = links.count()
print(N)

# Create and initialize the ranks
ranks = links.map(lambda node: (node[0],1.0/N))
print(ranks.collect())

[('0', <pyspark.resultiterable.ResultIterable object at 0x0000020B6D910520>), ('11342', <pyspark.resultiterable.ResultIterable object at 0x0000020B6B417610>), ('824020', <pyspark.resultiterable.ResultIterable object at 0x0000020B6D9014C0>), ('867923', <pyspark.resultiterable.ResultIterable object at 0x0000020B6CA8B5E0>), ('891835', <pyspark.resultiterable.ResultIterable object at 0x0000020B6DAA10D0>), ('1', <pyspark.resultiterable.ResultIterable object at 0x0000020B6DAA1160>), ('203402', <pyspark.resultiterable.ResultIterable object at 0x0000020B6DAA11C0>)]
7
[('0', 0.14285714285714285), ('11342', 0.14285714285714285), ('824020', 0.14285714285714285), ('867923', 0.14285714285714285), ('891835', 0.14285714285714285), ('1', 0.14285714285714285), ('203402', 0.14285714285714285)]


In [36]:
#Map: For each node i, 
#calculate vote (Ri/Di) for each out-link of i and propagate to adjacent nodes.
#Reduce: For each node i, sum the upcoming votes and update Rank value (Ri).
#Repeat this Map-Reduce step until Rank values converge (stable or within a margin).
nbIter=5
for i in range(nbIter):
    # Join graph info with rank info and propogate to all neighbors rank scores (rank/(number of neighbors)
    # And add up ranks from all in-coming edges
    ranks = links.join(ranks).flatMap(lambda x : [(i, float(x[1][1])/len(x[1][0])) for i in x[1][0]])\
    .reduceByKey(lambda x,y: x+y)
    print(ranks.sortByKey().collect())

[('0', 0.19047619047619047), ('1', 0.07142857142857142), ('11342', 0.13095238095238093), ('203402', 0.07142857142857142), ('223236', 0.14285714285714285), ('824020', 0.03571428571428571), ('867923', 0.17857142857142855), ('891835', 0.17857142857142855)]
[('0', 0.1746031746031746), ('1', 0.03571428571428571), ('11342', 0.16666666666666663), ('203402', 0.03571428571428571), ('223236', 0.07142857142857142), ('824020', 0.047619047619047616), ('867923', 0.16269841269841268), ('891835', 0.16269841269841268)]
[('0', 0.17989417989417986), ('1', 0.017857142857142856), ('11342', 0.1521164021164021), ('203402', 0.017857142857142856), ('223236', 0.03571428571428571), ('824020', 0.04365079365079365), ('867923', 0.1693121693121693), ('891835', 0.1693121693121693)]
[('0', 0.1781305114638448), ('1', 0.008928571428571428), ('11342', 0.15784832451499117), ('203402', 0.008928571428571428), ('223236', 0.017857142857142856), ('824020', 0.044973544973544964), ('867923', 0.16666666666666666), ('891835', 0.16