---
### Dataset:
For this project, I use Python and Spark to perform some graph analysis, using a graph of the Gnutella server network.   In this graph, each node represents a server, and each directed edge represents a connection between servers in Gnutella's peer-to-peer network.  The data file `p2p-Gnutella08-adj.txt`, represents the graph as an adjacency list.   Each server (node) is identified by a unique number, and each line in the file gives the adjacency list for a single server.
For example, this line:
> 91	243	1923	2194

gives the adjacency list for server `91`.   It indicates that there are edges from server `91` to servers `243`, `1923`, and `2194`.    According to the Stanford Network Analysis Project, which collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).

In [0]:
import findspark
findspark.init("/u/cs451/packages/spark")

from pyspark import SparkContext, SparkConf

and then create a `SparkContext`:

In [0]:
import random
sc = SparkContext(appName="YourTest", master="local[2]", conf=SparkConf().set('spark.ui.port', random.randrange(4000,5000)))

---

### Determine some basic properties of the Gnutella graph:
- Number of nodes and edges in the graph.
- Number of nodes of each outdegree. (e.g. how many nodes have no outgoing edges, how many have one outgoing edge, and so on)
- Number of nodes of each indegree.

In [0]:
def num_nodes_edges():
    """Returns a tuple (num_nodes, num_edges)"""
    #### Your code for Question 1.1 should go here
    text_RDD = sc.textFile('p2p-Gnutella08-adj.txt')
    return text_RDD.map(lambda line : (1, len(line.split())-1)) \
                   .reduce(lambda x, y : (x[0] + y[0], x[1] + y[1]))

def out_counts():
    """Returns a dictionary where the keys are the outdegrees, and the 
    values are the number of nodes of the corresponding outdegree """
    #### Your code for Question 1.2 should go here
    text_RDD = sc.textFile('p2p-Gnutella08-adj.txt')
    return text_RDD.map(lambda line : (len(line.split())-1, 1)) \
                   .reduceByKey(lambda x, y : x + y) \
                   .collectAsMap()

def in_counts():
    """Returns a dictionary where the keys are the indegrees, and the 
    values are the number of nodes of the corresponding indegree """
    #### Your code for Question 1.3 should go here
    text_RDD = sc.textFile('p2p-Gnutella08-adj.txt')
    return text_RDD.flatMap(lambda line : [(i, 1) for i in line.split()[1:]]) \
                   .reduceByKey(lambda x, y : x + y) \
                   .map(lambda pair : (pair[1], 1)) \
                   .reduceByKey(lambda x, y : x + y) \
                   .collectAsMap()

{74: 1,
 70: 3,
 2: 1287,
 4: 559,
 6: 227,
 54: 1,
 50: 1,
 82: 1,
 20: 4,
 30: 1,
 10: 37,
 32: 2,
 62: 2,
 8: 76,
 14: 13,
 12: 23,
 18: 2,
 72: 2,
 60: 3,
 56: 2,
 16: 1,
 66: 2,
 52: 1,
 38: 1,
 86: 1,
 44: 1,
 22: 1,
 78: 1,
 1: 2452,
 59: 1,
 49: 1,
 7: 144,
 5: 333,
 11: 29,
 71: 3,
 81: 4,
 19: 2,
 13: 19,
 67: 3,
 21: 2,
 57: 1,
 47: 2,
 3: 868,
 9: 70,
 15: 8,
 69: 2,
 25: 1,
 33: 1,
 31: 2,
 77: 2,
 73: 2,
 51: 1,
 85: 1,
 83: 1,
 87: 1,
 61: 1,
 27: 1,
 35: 1,
 63: 1,
 79: 1,
 55: 1,
 41: 1,
 91: 1,
 23: 1}

---
### Objective:
The main objective for this project is to perform *single source personalized page rank* over the Gnutella graph. Personalized page rank is like ordinary page rank except:
- One node in the graph is designated as the *source* node. Personalized page rank is performed with respect to that source node.
- Personalized page rank is initialized by assigning all probability mass to the source node, and none to the other nodes. In contrast, ordinary page rank is initialized by giving all nodes the same probability mass.
- Whenever personalized page rank makes a random jump, it jumps back to the source node. In contrast, ordinary page rank may jump to any node.
- In personalized page rank, all probability mass lost dangling nodes is put back into the source nodes.  In ordinary page rank, lost mass is distributed evenly over all nodes.


### Input Parameters:
- source node id (a positive integer)
- iteration count (a positive integer)
- random jump factor value (a float between 0 and 1)

The function performs personalized page rank, with respect to the specified source node, over the Gnutella graph, for the specified number of iterations, using Spark.

The output of your function is a list of the 10 nodes with the highest personalized page rank with respect to the given source. For each of the 10 nodes, return the node's id and page rank value as a tuple. The list returned by the function looks like this: `[(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]`

In [0]:
from time import time
from pyspark import StorageLevel
def personalized_page_rank(source_node_id, num_iterations, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    # your solution to Question 2 here
    def construct_graph(line):
        l = list(map(int, line.split()))
        nid = l[0]
        adj_list = l[1:]
        return nid, adj_list
    
    def initialize_rank(line, source_node_id):
        l = list(map(int, line.split()))
        nid = l[0]
        rank = 1 if nid == source_node_id else 0
        loss = 0
        return nid, (rank, loss)
    
    def spread(pair):
        nid, (adj_list, (rank, loss)) = pair
        if not adj_list:
            loss += rank
            return [(nid, (0.0, loss))]
        num_adj_nodes = len(adj_list)
        return list(zip(adj_list, [(rank / num_adj_nodes, 0.0)] * num_adj_nodes)) + [(nid, (0.0, 0.0))]
    
    def compensate_loss_jump(pair, total_loss, source_node_id, jump_factor):
        nid, (rank, loss) = pair
        if nid == source_node_id:
            rank = (rank + total_loss) * (1 - jump_factor) + jump_factor
            return nid, (rank, 0)
        return nid, (rank * (1 - jump_factor), 0)
    
    text_RDD = sc.textFile('p2p-Gnutella08-adj.txt')
    graph_RDD = text_RDD.map(lambda line : construct_graph(line))
    rank_RDD = text_RDD.map(lambda line : initialize_rank(line, source_node_id))
    
    for i in range(num_iterations):                   
        rank_RDD = graph_RDD.leftOuterJoin(rank_RDD) \
                            .flatMap(lambda pair : spread(pair)) \
                            .reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))
        total_loss = rank_RDD.aggregate(0,
                                       (lambda acc, e : acc + e[1][1]),
                                       (lambda acc1, acc2 : acc1 + acc2))
        rank_RDD = rank_RDD.map(lambda e : compensate_loss_jump(e, total_loss, source_node_id, jump_factor))
        
    top_10_rank = rank_RDD.map(lambda e : (e[1][0], e[0])) \
                          .sortByKey(ascending=False) \
                          .map(lambda e : (e[1], e[0])) \
                          .take(10)
    return top_10_rank

[(28, 0.5384424922495424),
 (1517, 0.12026767556203095),
 (1516, 0.12026368682524335),
 (846, 0.12026334287813738),
 (852, 0.008058825545098519),
 (152, 0.008057101055510536),
 (847, 0.008056064295146433),
 (850, 0.008056047096075143),
 (849, 0.008056044737059032),
 (848, 0.008056043598862838)]

---
#### Alternative Stopping Criterion:

The above function runs PageRank for a specified number of iterations.  However, it is also common to have iterative algorithms that run until some specified termination condition is reached.

For example, for page rank, suppose the $p_i(x)$ represents the probability mass assigned to node $x$ after the $i$th iteration of the algorithm.  ($p_0(x)$ is the initial probability mass of node $x$.)   We define the change of $x$'s probability mass on the $i$th iteration as $\lvert p_i(x)-p_{i-1}(x) \rvert$.   Then, we can iterate personalized page rank until the maximum (over all nodes) change is less than a specified threshold, i.e, until all nodes' page ranks have converged.

The below function iterates until the maximum node change is less than $\frac{0.5}{N}$, where $N$ represents the number of nodes in the graph.
This version of the function should take only two inputs: the source node id and the random jump factor.

In [0]:
from time import time
def personalized_page_rank_stopping_criterion(source_node_id, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    # your solution to Question 3 here
    def construct_graph(line):
        l = list(map(int, line.split()))
        nid = l[0]
        adj_list = l[1:]
        return nid, adj_list
    
    def initialize_rank(line, source_node_id):
        l = list(map(int, line.split()))
        nid = l[0]
        rank = 1 if nid == source_node_id else 0
        loss = 0
        return nid, (rank, loss)
    
    def spread(pair):
        nid, (adj_list, (rank, loss)) = pair
        if not adj_list:
            loss += rank
            return [(nid, (0, loss))]
        num_adj_nodes = len(adj_list)
        return list(zip(adj_list, [(rank / num_adj_nodes, 0)] * num_adj_nodes)) + [(nid, (0, 0))]
    
    def compensate_loss_jump(pair, total_loss, source_node_id, jump_factor):
        nid, (rank, loss) = pair
        if nid == source_node_id:
            rank = (rank + total_loss) * (1 - jump_factor) + jump_factor
            return nid, (rank, 0)
        return nid, (rank * (1 - jump_factor), 0)
    
    text_RDD = sc.textFile('p2p-Gnutella08-adj.txt')
    graph_RDD = text_RDD.map(lambda line : construct_graph(line)).cache()
    curr_rank_RDD = text_RDD.map(lambda line : initialize_rank(line, source_node_id))
    num_nodes = graph_RDD.count()
    
    while True: 
        prev_rank_RDD = curr_rank_RDD.cache()
        curr_rank_RDD = graph_RDD.leftOuterJoin(prev_rank_RDD) \
                            .flatMap(lambda pair : spread(pair)) \
                            .reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))
        total_loss = curr_rank_RDD.aggregate(0,
                                            (lambda acc, e : acc + e[1][1]),
                                            (lambda acc1, acc2 : acc1 + acc2))
        curr_rank_RDD = curr_rank_RDD.map(lambda e : compensate_loss_jump(e, total_loss, source_node_id, jump_factor))
        
        max_change = curr_rank_RDD.join(prev_rank_RDD) \
                                  .aggregate(0,
                                            (lambda acc, e : max(acc, abs(e[1][0][0] - e[1][1][0]))),
                                            (lambda acc1, acc2 : max(acc1, acc2)))
        if max_change < (0.5 / num_nodes): break
        
    top_10_rank = curr_rank_RDD.map(lambda e : (e[1][0], e[0])) \
                               .sortByKey(ascending=False) \
                               .map(lambda e : (e[1], e[0])) \
                               .take(10)
    return top_10_rank

[(28, 0.5384424922495424),
 (1517, 0.12026767556203095),
 (1516, 0.12026368682524335),
 (846, 0.12026334287813738),
 (852, 0.008058825545098519),
 (152, 0.008057101055510536),
 (847, 0.008056064295146433),
 (850, 0.008056047096075143),
 (849, 0.008056044737059032),
 (848, 0.008056043598862838)]