## Distributed Page Rank
---

We have a graph structure where each node represents a server, and each (directed) edge represents a connection between servers, a text file represents the graph as an adjacency list. Stanford Network Analysis Project, collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).

---
Determining some basic properties of the graph:
- \# of nodes and edges
- \# of nodes of each outdegree
- \# of nodes of each indegree

We use Spark to determine this.

In [None]:
def nodes_edges():
    num_nodes = sc.accumulator(0) # initialize count to 0
    num_edges = sc.accumulator(0) # initialize to 0
    lines = sc.textFile("_.txt") # load file
    '''
    REDACTED
    '''
    lines.foreach(lambda x: num_edges.add(
        len(x.split()) - 1)) # add number of space delimited items on line - 1 to count
    return (num_nodes.value, num_edges.value) # extract values

def out_dict():
    '''
    REDACTED
    '''
    outdegrees = lines.map(lambda x: (len(x.split()) - 1, 1)).reduceByKey(
        lambda x, y: x + y) # count number of neighbours on each line and take sum
    return outdegrees.sortByKey(True).collectAsMap() # sort by node id

def in_counts():
    lines = sc.textFile("_.txt") # load file
    '''
    REDACTED
    '''
    indegrees = neighbours.reduceByKey(lambda x, y: x + y).map(
        lambda x: (x[1], 1)).reduceByKey(
            lambda x, y: x + y) # for each node_id, count number of occurrences
                                #   as a neighbour
    return indegrees.sortByKey(True).collectAsMap() # sort by node_id


---
This Spark program performs page rank with respect to a soruce node.

In [None]:
def page_rank(source_id, n_iter, jump, m):
    lines = sc.textFile("_.txt") # load file
    links = lines.map(lambda x: list(map(
        int, x.split()))).map(
            lambda x: (x[0], x[1:])) # create key value pairs of nodes and
                                     #    neighbour list

    # calc_contrib(node_id, adj, rank) determines how to distribute mass given
    #   by rank to the neighbours of node_id given by adj
    def calc_contrib(node_id, adj, rank):
        num_neighbours = len(adj)
        if num_neighbours == 0: # detect dead end
            lost_mass = lost_mass_curr + rank # accumulate lost mass
        '''
        REDACTED
        '''
        return vec

    ranks = links.map(lambda x:
                      (x[0], 1.0 if x[0] == source_id else 0.0)) # set up p0

    '''
    REDACTED
    '''

    for i in range(0, n_iter):
        '''
        REDACTED
        '''
        contribs = links.join(ranks).flatMap(
            lambda x: calc_contrib(
                x[0], x[1][0], x[1][1])) # perform page rank step
        ranks = contribs.reduceByKey(lambda x, y: x + y).sortByKey().map(
            lambda x: (x[0], x[1] * (1-jump_factor) +
                       ((jump_factor + lost_mass_prev)
                       if x[0] == source_id else 0.0))) # compute p[i+1]
        '''
        REDACTED
        '''

    output = ranks.sortBy(lambda x: x[1], False).take(m) # sort by probability
    return output


---
We perform page rank here until we reach a satisfiably close to steady state (steady state as defined in the context of a Markov Chain represented as a stochastic matrix).

We cache an RDD here as it is used repeatedly in the loop during the join operation. This is after considering the tradeoff between storing it on disk versus repeatedly computing it during each iteration, the former seems more suitable as we tend to write spark code following a loosely functional paradigm and use this RDD many times for further processing. Caching it ensures it is preserved and reused.

In [None]:
def page_rank_criterion(source_id, jump):
    lines = sc.textFile("_.txt") # load file
    num_nodes = sc.accumulator(0) # initialize to 0
    '''
    REDACTED
    '''

    links = lines.map(lambda x: list(map(
        int, x.split()))).map(lambda x: (x[0], x[1:])) # create key value pairs of nodes and
                                                       #    neighbour list

    # calc_contrib(node_id, adj, rank) determines how to distribute mass given
    #   by rank to the neighbours of node_id given by adj
    def calc_contrib(node_id, adj, rank):
        '''
        REDACTED
        '''

    def det_jump(node_id):
        '''
        REDACTED
        '''

    ranks_prev = links.map(
        lambda x: (x[0], 1.0 if x[0] == source_node_id else 0.0)) # set up p0

    '''
    REDACTED
    '''

    while(change > threshold):
        '''
        REDACTED
        '''
        contribs = links.join(ranks_prev).flatMap(
            lambda x: calc_contrib(
                x[0], x[1][0], x[1][1])) # perform page rank step
        ranks_curr = contribs.reduceByKey(lambda x, y: x + y).sortByKey().map(
            lambda x: (x[0], x[1] * (1-jump_factor) +
                       ((jump_factor + lost_mass_prev)
                       if x[0] == source_node_id else 0.0))) # compute p[i+1]
        '''
        REDACTED
        '''

    output = ranks_curr.sortBy(lambda x: x[1], False).take(10) # sort by probability
    return output
