## Description
#### This is one of the assignments from course CS 431/631 (Data-intensive Distributed Analytics) at University of Waterloo.
#### This assignment focuses on the graph analysis to study the page rank value of a graph.
#### Some modifications have been made to improve the presentation on this platform.
---

#### Overview
**Goal:** Use Python and Spark to perform some graph analysis, using a graph of the Gnutella server network.   In this graph, each node represents a server, and each (directed) edge represents a connection between servers in Gnutella's peer-to-peer network.  
**Files needed:** `p2p-Gnutella08-adj.txt`  
(This file represents the graph as an adjacency list.   Each server (node) is identified by a unique number, and each line in the file gives the adjacency list for a single server.
For example, this line:
> 91	243	1923	2194

gives the adjacency list for server `91`.   It indicates that there are edges from server `91` to servers `243`, `1923`, and `2194`.    According to the Stanford Network Analysis Project, which collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).)

For here, use the Spark installation in the CS451 course account:

In [1]:
import findspark
findspark.init("/u/cs451/packages/spark")

from pyspark import SparkContext, SparkConf

and then create a `SparkContext`:

In [2]:
import random
sc = SparkContext(appName="YourTest", master="local[2]", conf=SparkConf().set('spark.ui.port', random.randrange(4000,5000)))

---
#### Part 1:

To get warmed up, write Spark code to confirm or determine some basic properties of the Gnutella graph.  The code answers the following questions:
- How many nodes and edges are there in the graph? 
- How many nodes of each outdegree are there? That is, how many nodes have no outgoing edges, how many have one outgoing edge, how many have two outgoing edges, and so on?
- How many nodes of each indegree are there?

In [3]:
def num_nodes_edges():
    """Returns a tuple (num_nodes, num_edges)"""
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    # get the total number of nodes in the graph
    num_nodes = nodes.count()
    # split each line to several separate nodes
    nodes_new = nodes.map(lambda x:x.split('\t'))
    nodes_and_edges = nodes_new.flatMap(lambda x:x)
    # get the total number of edges in the graph
    num_edges = nodes_and_edges.count() - num_nodes
    return (num_nodes,num_edges)
    

def out_counts():
    """Returns a dictionary where the keys are the outdegrees, and the 
    values are the number of nodes of the corresponding outdegree """
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    nodes_new = nodes.map(lambda x:x.split('\t'))
    # get the outdegree of each node
    out_degree = nodes_new.map(lambda x:(x[0],len(x)-1))
    # get the number of nodes of each outdegree, and then collect
    aggregated_out_degree = out_degree.map(lambda x:(x[1],x[0])).groupByKey().map(lambda x:(x[0],len(x[1]))).sortByKey().collect()
    # transform the output into desired dictionary format
    dic_out_degree = dict(aggregated_out_degree)
    return dic_out_degree
    

def in_counts():
    """Returns a dictionary where the keys are the indegrees, and the 
    values are the number of nodes of the corresponding indegree """
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    # split each line to several separate nodes (the adjacent nodes)
    nodes_new = nodes.map(lambda x:x.split('\t')[1:])
    # get the indegree of each node
    in_degree = nodes_new.flatMap(lambda x:x).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
    # get the number of nodes of each indegree, and then collect
    aggregated_in_degree = in_degree.map(lambda x:(x[1],x[0])).groupByKey().map(lambda x:(x[0],len(x[1]))).sortByKey().collect()
    # transform the output into desired dictionary format
    dic_in_degree = dict(aggregated_in_degree)
    return dic_in_degree

---
**Some background information about page rank:**  
The description of ordinary page rank is in Section 5.3 of [the course textbook](http://mapreduce.cc/).   Personalized page rank is like ordinary page rank except:
- One node in the graph is designated as the *source* node. Personalized page rank is performed with respect to that source node.
- Personalized page rank is initialized by assigning all probability mass to the source node, and none to the other nodes. In contrast, ordinary page rank is initialized by giving all nodes the same probability mass.
- Whenever personalized page rank makes a random jump, it jumps back to the source node. In contrast, ordinary page rank may jump to any node.
- In personalized page rank, all probability mass lost dangling nodes is put back into the source nodes.  In ordinary page rank, lost mass is distributed evenly over all nodes.

#### Part 2:

The task is to write a Spark program to perform personalized page rank over the Gnutella graph for a specified number of iterations, and of course a specific node. The function takes three input values:
- source node id (a positive integer)
- iteration count (a positive integer)
- random jump factor value (a float between 0 and 1)


The output should be a list of the 10 nodes with the highest personalized page rank with respect to the given source. For each of the 10 nodes, return the node's id and page rank value as a tuple. The list returned by the function should therefore look something like this: `[(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]`

In [4]:
import re

def personalized_page_rank(source_node_id, num_iterations, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    # transform the nodes into integer format, to match the input format of 'souce_node_id'
    nodes_new = nodes.map(lambda x:[int(i) for i in re.findall("[0-9]+",x)])
    
    # define a function that returns the original pagerank value of each node
    def is_source_node(x):
        if (x == source_node_id):
            return 1
        else:
            return 0            
    # call the above function to get the original pagerank value of each node
    nodes_rank = nodes_new.map(lambda x:(x[0],is_source_node(x[0])))
    
    # get the adjacency list of each node 
    """Attention: all probability mass lost dangling nodes is put back into the source nodes,
       which means the adjaceny list of a dangling node is the source node"""
    nodes_adjacency_list = nodes_new.map(lambda x:(x[0],x[1:]) if x[1:] != [] else (x[0],[source_node_id]))
    
    # join the above two RDD to get the detailed information of each node
    nodes_detailed = nodes_adjacency_list.join(nodes_rank).map(lambda x:(x[0],list(x[1])))
    
    # define a function that returns the sum of pagerank values from its adjacent nodes
    def get_rank_sum(x):
        result = []
        for i in x[1][0]:
            result.append((i,x[1][1]/len(x[1][0])))
        return result
    
    # define a function that takes the jump factor into account
    def set_jump_factor(x,source_node_id,jump_factor):
        if x[0] == source_node_id:
            x[1][1] = jump_factor
        else:
            x[1][1] = 0
        return x
    
    #### iteration part ####
    for i in range(num_iterations):
        # call the function 'get_rank_sum' to get the sum of pagerank values from each node's adjacent nodes
        # (use the 'repartition' to faster the later 'join' process)
        rank_sum = nodes_detailed.flatMap(lambda x:get_rank_sum(x)).reduceByKey(lambda x,y:x+y).repartition(1)
        # join 'nodes_detailed' and 'rank_sum', and call the function 'set_jump_factor' to update 'nodes_detailed'
        nodes_detailed = nodes_detailed.map(lambda x:set_jump_factor(x,source_node_id,jump_factor)).repartition(1)\
                         .join(rank_sum).map(lambda x:(x[0],[x[1][0][0],x[1][0][1]+(1-jump_factor)*x[1][1]]))
    
    # extract the desired outcome from the most updated 'nodes_detailed' and sort by values
    nodes_rank_sorted = nodes_detailed.map(lambda x:(x[1][1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))
            
    return nodes_rank_sorted.take(10)

---
#### Part 3:

For the previous part, the personalized page rank implementation runs for a specified number of iterations.  However, it is also common to write iterative algorithms that run until some specified termination condition is reached.
For example, for page rank, suppose the $p_i(x)$ represents the probability mass assigned to node $x$ after the $i$th iteration of the algorithm.  ($p_0(x)$ is the initial probability mass of node $x$.)   We define the change of $x$'s probability mass on the $i$th iteration as $\lvert p_i(x)-p_{i-1}(x) \rvert$.   Then, we can iterate personalized page rank until the maximum (over all nodes) change is less than a specified threshold, i.e, until all nodes' page ranks have converged.

The task of this part is to modify the code in part 2 so that it iterates until the 
maximum node change is less than $\frac{0.5}{N}$, where $N$ represents the number of nodes in the graph.
This version of the function should take only two inputs: the source node id and the random jump factor.

In [5]:
import re

def personalized_page_rank_stopping_criterion(source_node_id, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    # transform the nodes into integer format, to match the input format of 'souce_node_id'
    nodes_new = nodes.map(lambda x:[int(i) for i in re.findall("[0-9]+",x)]).cache()
    # get the number of nodes for further use
    num_nodes = nodes.count()
    
    # define a function that returns the original pagerank value of each node
    def is_source_node(x):
        if (x == source_node_id):
            return 1
        else:
            return 0            
    # call the above function to get the original pagerank value of each node
    nodes_rank = nodes_new.map(lambda x:(x[0],is_source_node(x[0])))
    
    # get the adjacency list of each node 
    """Attention: all probability mass lost dangling nodes is put back into the source nodes,
       which means the adjaceny list of a dangling node is the source node"""
    nodes_adjacency_list = nodes_new.map(lambda x:(x[0],x[1:]) if x[1:] != [] else (x[0],[source_node_id])).cache()
    
    # join the above two RDD to get the detailed information of each node
    nodes_detailed = nodes_adjacency_list.join(nodes_rank).map(lambda x:(x[0],list(x[1]))).cache()
    
    # define a function that returns the sum of pagerank values from its adjacent nodes
    def get_rank_sum(x):
        result = []
        for i in x[1][0]:
            result.append((i,x[1][1]/len(x[1][0])))
        return result
    
    # define a function that takes the jump factor into account
    def set_jump_factor(x,source_node_id,jump_factor):
        if x[0] == source_node_id:
            x[1][1] = jump_factor
        else:
            x[1][1] = 0
        return x
    
    #### modified iteration part ####
    # set the origianl value of 'nodes_value_diff_max' to 1 to start the while loop
    nodes_value_diff_max = 1
    while(nodes_value_diff_max >= 0.5/num_nodes):
        # get the pagerank value of each node before the current iteration
        nodes_value_0 = nodes_detailed.map(lambda x:(x[0],x[1][1]))
        # call the function 'get_rank_sum' to get the sum of pagerank values from each node's adjacent nodes
        # (use the 'repartition' to faster the later 'join' process)
        rank_sum = nodes_detailed.flatMap(lambda x:get_rank_sum(x)).reduceByKey(lambda x,y:x+y).repartition(1).cache()
        # join 'nodes_detailed' and 'rank_sum', and call the function 'set_jump_factor' to update 'nodes_detailed'
        nodes_detailed = nodes_detailed.map(lambda x:set_jump_factor(x,source_node_id,jump_factor)).repartition(1)\
                         .join(rank_sum).map(lambda x:(x[0],[x[1][0][0],x[1][0][1]+(1-jump_factor)*x[1][1]])).cache()
        # get the pagerank value of each node after the current iteration
        nodes_value_1 = nodes_detailed.map(lambda x:(x[0],x[1][1]))
        # join 'nodes_value_0' and 'nodes_value_1' to get the difference of pagerank values after the current iteration
        nodes_value_diff = nodes_value_1.join(nodes_value_0).map(lambda x:abs(x[1][0]-x[1][1])).collect()
        # get the maximum number of 'nodes_value_diff' to decide whether to continue the while loop or stop
        nodes_value_diff_max = max(nodes_value_diff)
    
    # extract the desired outcome from the most updated 'nodes_detailed' and sort by values
    nodes_rank_sorted = nodes_detailed.map(lambda x:(x[1][1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))
    
    return nodes_rank_sorted.take(10)