In [1]:
! pip install findspark 



In [2]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local").setAppName("PangRank")
sc = SparkContext(conf = conf)

In [None]:
from operator import add, sub
from pyspark.sql import Row

def get_edges(edge):
    source, dest, weight = list(map(int, edge.split('\t')))
    return Row(source=source, dest=dest, weight=weight)

def get_vertexes(edges):
    return edges.flatMap(lambda edge: (edge.source, edge.dest)).distinct()

def norm_L1(ranks):
    return ranks.map(lambda pair: pair[1] ** 2).reduce(add) ** 0.5
    
def normalize_weights(entry):
    source, (edges, sum_weights) = entry
    for edge in edges:
        yield Row(source=edge.source, dest=edge.dest, weight=edge.weight / sum_weights)

def markov_transform(entry):
    source, (edges, rank) = entry
    for edge in edges:
        yield (edge.dest, rank * edge.weight)

def compute_page_rank(input_file_name, c = 0.85, iterations = 50):
    # Read edges
    lines = sc.textFile(input_file_name)
    edges = lines.map(get_edges)

    # Re-weight edges
    edges_by_source = edges.groupBy(lambda row: row.source)
    sum_weights = edges_by_source.map(lambda x: (x[0], sum(row.weight for row in x[1])))
    edges = edges_by_source.join(sum_weights).flatMap(normalize_weights)

    # Prepare suitable data
    vertexes = get_vertexes(edges)
    vertex_count = edges.count()
    edges = edges.groupBy(lambda row: row.source)
    #print(list(map(lambda x: (x[0], list(x[1])), edges.collect())))
    #print(vertexes.collect())
    
    # Poor man's PageRank
    prev_ranks = vertexes.map(lambda vert: (vert, 1.0))
    prev_norm = norm_L1(prev_ranks)
    errors = []
    for i in range(iterations):
        partial_ranks = edges.join(prev_ranks).flatMap(markov_transform)
        partial_ranks = partial_ranks.union(vertexes.map(lambda vert: (vert, 0)))
        ranks = partial_ranks.reduceByKey(add)
        ranks = ranks.mapValues(lambda rank: c * rank)
        norm = norm_L1(ranks)
        gamma = prev_norm - norm
        ranks = ranks.map(lambda x: (x[0], x[1] + gamma / float(vertex_count)))

        error = norm_L1(ranks.union(prev_ranks).reduceByKey(sub)) / vertex_count
        errors.append(error)
        
        prev_ranks = ranks
        prev_norm = norm
    
    # Sort vertexes by rank
    prev_ranks = prev_ranks.collect()
    prev_ranks.sort(key=lambda x: x[1])
    
    return (prev_ranks, errors)

In [None]:
import numpy as np
from matplotlib import pyplot as plt

iterations = 50
ranks, errors = compute_page_rank("graph1.tsv", c=0.85, iterations = iterations)

print(ranks[-10:])

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.


[324, 65482, 1566, 304, 3380, 2, 498, 448, 616, 714, 786, 538, 76448, 5986, 1550, 142, 690, 76428, 146, 656, 76460, 5504, 10544, 76456, 360, 1548, 40554, 518, 1066, 726, 40556, 76434, 508, 1310, 590, 1622, 76438, 586, 572, 1010, 758, 76430, 28308, 2178, 4430, 4316, 40864, 2396, 2384, 4290, 642, 816, 10196, 152, 2016, 6006, 762, 76442, 2100, 76446, 40416, 444, 1384, 52722, 76444, 76422, 4250, 944, 1538, 4572, 852, 128, 148, 2172, 764, 76426, 1412, 5590, 546, 76418, 76424, 76458, 63964, 76410, 11784, 28968, 11680, 188, 5290, 1306, 1364, 76414, 722, 532, 446, 134, 554, 728, 76454, 39792, 39728, 452, 802, 566, 2392, 5574, 33152, 144, 76432, 76436, 618, 14040, 750, 1278, 348, 52794, 1070, 1048, 1326, 1540, 76416, 35966, 582, 1224, 6106, 8232, 11656, 11888, 14640, 14714, 14904, 18976, 19776, 29534, 39658, 40110, 45208, 76450, 76452, 76412, 568, 854, 40296, 28870, 496, 1516, 736, 920, 12376, 634, 674, 822, 530, 7742, 1082, 1076, 11900, 540, 2710, 30, 500, 512, 542, 548, 40862, 796, 610, 1080,

In [None]:
plt.plot(range(iterations), errors)
plt.show()

In [None]:
def save_page_ranks(ranks, output_file_name):
    with open(output_file_name, "w+") as f:
        for rank in ranks:
            f.write("%s\t%s\n" % rank)

save_page_ranks(ranks, "results.tsv")