## CS431/631 Data Intensive Distributed Analytics
### Fall 2019 - Assignment 3
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:**
* **ID:**

---
#### Overview
For this assignment, you will be using Python and Spark to perform some graph analysis, using a graph of the Gnutella server network.   In this graph, each node represents a server, and each (directed) edge represents a connection between servers in Gnutella's peer-to-peer network.  The input file for this assignment, `p2p-Gnutella08-adj.txt`, represents the graph as an adjacency list.   Each server (node) is identified by a unique number, and each line in the file gives the adjacency list for a single server.
For example, this line:
> 91	243	1923	2194

gives the adjacency list for server `91`.   It indicates that there are edges from server `91` to servers `243`, `1923`, and `2194`.    According to the Stanford Network Analysis Project, which collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).

As you know from the previous assignment, you must tell the Python interpreter where to find Spark before performing any Spark operations in your program.   So, start by doing that:

In [1]:
import findspark
findspark.init("/u/cs451/packages/spark")

from pyspark import SparkContext, SparkConf

and then create a `SparkContext`:

In [2]:
import random
sc = SparkContext(appName="YourTest", master="local[2]", conf=SparkConf().set('spark.ui.port', random.randrange(4000,5000)))

---
### Important

###### The questions that follow ask you to implement functions whose prototypes are given to you. Do **NOT** change the prototypes of the functions. Do **NOT** write code outside of functions. All necessary code should be included in the function body (except for import statements). You may declare functions inside of the function body. When marking, we will execute your code by calling the functions from an external program, which is why your code cannot rely on statements running outside functions. Please remove any call to the functions that you may have introduced for test purposes before submitting your notebook.

---
#### Question 1  (6/24 marks):

To get warmed up, you should first write Spark code to confirm or determine some basic properties of the Gnutella graph.  Write code in the provided functions that will return answers to the following questions, as specified in each function's documentation:
- How many nodes and edges are there in the graph?  (This should confirm the numbers given above.)
- How many nodes of each outdegree are there? That is, how many nodes have no outgoing edges, how many have one outgoing edge, how many have two outgoing edges, and so on?
- How many nodes of each indegree are there?

You should use Spark to answer these questions.   Do *not* load the entire graph into your Python driver program.

In [3]:
def pairs(num, num_list):
    pair_list = [(num_list[0],num_list[index]) for index in range(1,len(num_list)) ]
    return pair_list
def add_in(num_list):
    return [(num_list[index],1)for index in range(1,len(num_list))]


def num_nodes_edges():
    """Returns a tuple (num_nodes, num_edges)"""
    graph_data_nodes = sc.textFile("p2p-Gnutella08-adj.txt").flatMap(lambda line: line.split("\t"))
    graph_unique_nodes =  graph_data_nodes.map(lambda node: (node,1)).reduceByKey(lambda x,y : x+y).count()

    graph_data_edges = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line: line.split('\t')).filter(lambda adjlist: len(adjlist)> 1)
    graph_edge_pairs = graph_data_edges.flatMap(lambda adjlist: pairs(adjlist[0],adjlist))  
    graph_num_edges = graph_edge_pairs.count()
    
    return(graph_unique_nodes,graph_num_edges)
    
    


def out_counts():
    """Returns a dictionary where the keys are the outdegrees, and the 
    #values are the number of nodes of the corresponding outdegree """
    graph_data_out = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line : line.split('\t')).map(lambda adj : (adj[0],len(adj[1:])))
    graph_custom_out = graph_data_out.map(lambda x : (x[1],1)).reduceByKey(lambda x,y: x+y).sortByKey().collectAsMap()
    
    return graph_custom_out



def in_counts():
    """Returns a dictionary where the keys are the indegrees, and the 
    values are the number of nodes of the corresponding indegree """
    graph_data_in = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line : line.split('\t')).flatMap(lambda x : add_in(x))
    graph_custom_in = graph_data_in.reduceByKey(lambda x,y : x+y).map(lambda x : (x[1],1)).reduceByKey(lambda x,y : x+y).sortByKey().collectAsMap()
    zero_in = num_nodes_edges()[0] - sum(graph_custom_in.values())
    graph_custom_in[0] = zero_in
    graph_custom_in = dict(sorted(graph_custom_in.items()))
    
    return graph_custom_in



print("Number of nodes = ",num_nodes_edges()[0])
print("Number of edges = ",num_nodes_edges()[1])


print("out_counts")
print(out_counts())

print("in_counts")
print(in_counts())

Number of nodes =  6301
Number of edges =  20777
out_counts
{0: 3836, 1: 294, 2: 28, 3: 16, 4: 28, 5: 107, 6: 10, 7: 9, 8: 44, 9: 372, 10: 1531, 11: 1, 12: 3, 13: 2, 14: 2, 17: 4, 18: 2, 19: 1, 22: 1, 24: 1, 25: 1, 28: 1, 29: 1, 31: 1, 34: 1, 41: 1, 46: 1, 47: 1, 48: 1}
in_counts
{0: 80, 1: 2452, 2: 1287, 3: 868, 4: 559, 5: 333, 6: 227, 7: 144, 8: 76, 9: 70, 10: 37, 11: 29, 12: 23, 13: 19, 14: 13, 15: 8, 16: 1, 18: 2, 19: 2, 20: 4, 21: 2, 22: 1, 23: 1, 25: 1, 27: 1, 30: 1, 31: 2, 32: 2, 33: 1, 35: 1, 38: 1, 41: 1, 44: 1, 47: 2, 49: 1, 50: 1, 51: 1, 52: 1, 54: 1, 55: 1, 56: 2, 57: 1, 59: 1, 60: 3, 61: 1, 62: 2, 63: 1, 66: 2, 67: 3, 69: 2, 70: 3, 71: 3, 72: 2, 73: 2, 74: 1, 77: 2, 78: 1, 79: 1, 81: 4, 82: 1, 83: 1, 85: 1, 86: 1, 87: 1, 91: 1}


---
Your main objective for this assignment is to perform *single source personalized page rank* over the Gnutella graph.  To get started, you should make sure that you have a clear understanding of ordinary (i.e., non-personalized) page rank.  Read the description of page rank in Section 5.3 of [the course textbook](http://mapreduce.cc/).   Personalized page rank is like ordinary page rank except:
- One node in the graph is designated as the *source* node. Personalized page rank is performed with respect to that source node.
- Personalized page rank is initialized by assigning all probability mass to the source node, and none to the other nodes. In contrast, ordinary page rank is initialized by giving all nodes the same probability mass.
- Whenever personalized page rank makes a random jump, it jumps back to the source node. In contrast, ordinary page rank may jump to any node.
- In personalized page rank, all probability mass lost dangling nodes is put back into the source nodes.  In ordinary page rank, lost mass is distributed evenly over all nodes.

#### Question 2  (10/24 marks):

Your task is to write a Spark program to perform personalized page rank over the Gnutella graph for a specified number of iterations, and of course a specific node. The function you will implement takes three input values:
- source node id (a positive integer)
- iteration count (a positive integer)
- random jump factor value (a float between 0 and 1)

The function should perform personalized page rank, with respect to the specified source node, over the Gnutella graph, for the specified number of iterations, using Spark.
The output of your function should be a list of the 10 nodes with the highest personalized page rank with respect to the given source. For each of the 10 nodes, return the node's id and page rank value as a tuple. The list returned by the function should therefore look something like this: `[(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]`

In [4]:
from __future__ import division
import numpy as np

def mapping(num_list):
    return [int(num_list[i]) for i in range(0,len(num_list))]

def maximum(page_rank,num,source_node_id):
    list_page_values_nodes = []
    for i in range(0,num+1):
        value = max(page_rank)
        node_id = page_rank.index(value)
        page_rank[node_id] = -10
        list_page_values_nodes.append((node_id,value))
        
    return list_page_values_nodes

def page_rank_node(num_nodes,num_list,jump_factor,source_node_id,node_rank):
    out_links = len(num_list)-1
    list_init = [0.0]*num_nodes
    arr_stochastic = np.array(list_init)
    
    if(out_links > 0):

        arr_stochastic[num_list[1:]] =  1/out_links
        
    elif(out_links == 0 ):
        ''' Handling Dead End Problem '''
        arr_stochastic[source_node_id] = 1.0
    
    '''Handling Spider Trap Problem'''    
    arr_stochastic = (1-jump_factor) * arr_stochastic
    arr_stochastic[source_node_id] = arr_stochastic[source_node_id] + (jump_factor)
    
    arr_node_rank = node_rank * arr_stochastic
    return arr_node_rank
    


def personalized_page_rank_1(source_node_id, num_iterations, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    
    graph_data = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line : line.split("\t")).map(lambda val_list : mapping(val_list) )
    num_nodes = num_nodes_edges()[0]
    page_rank = [0.0]*num_nodes
    page_rank[source_node_id] = 1.0
    
    for i in range(0,num_iterations):
        graph_rank = graph_data.map(lambda node : page_rank_node(num_nodes,node,jump_factor,source_node_id,page_rank[node[0]]))
        graph_rank_iter = graph_rank.map(lambda arr : (1,arr)).reduceByKey(lambda x,y : x+y)
        page_rank_updated = list(graph_rank_iter.collect())[0][1]
        page_rank = page_rank_updated
        
    
    top_ten_nodes = maximum(list(page_rank),10,source_node_id)
    
    return top_ten_nodes


source_node_id = int(input("Enter Node id:"))
num_iterations = int(input("Enter Number of iterations:"))
jump_factor = float(input("Jump factor:"))

print("top ten nodes = ", personalized_page_rank_1(source_node_id,num_iterations,jump_factor))


 
    

Enter Node id:28
Enter Number of iterations:4
Jump factor:0.33
top ten nodes =  [(28, 0.54747249791111097), (846, 0.11366806088888884), (1516, 0.11366806088888884), (1517, 0.11366806088888884), (152, 0.0094159268888888842), (602, 0.0094159268888888842), (847, 0.0094159268888888842), (848, 0.0094159268888888842), (849, 0.0094159268888888842), (850, 0.0094159268888888842), (851, 0.0094159268888888842)]


---
#### Question 3  (4/24 marks):

For the previous question, you implemented personalized page rank that ran for a specified number of iterations.  However, it is also common to write iterative algorithms that run until some specified termination condition is reached.
For example, for page rank, suppose the $p_i(x)$ represents the probability mass assigned to node $x$ after the $i$th iteration of the algorithm.  ($p_0(x)$ is the initial probability mass of node $x$.)   We define the change of $x$'s probability mass on the $i$th iteration as $\lvert p_i(x)-p_{i-1}(x) \rvert$.   Then, we can iterate personalized page rank until the maximum (over all nodes) change is less than a specified threshold, i.e, until all nodes' page ranks have converged.

For this question, modify your personalized page rank implementation from Question 2 so that it iterates until the 
maximum node change is less than $\frac{0.5}{N}$, where $N$ represents the number of nodes in the graph.
This version of the function should take only two inputs: the source node id and the random jump factor.

If you were unable to get personalized page rank working in Question 2, replace the code cell below with a text (Markdown) cell, and briefly describe how you *would* have modified your program to incorporate a termination condition based on maximum node change, for partial credit. 

In [5]:
import numpy as np

def maximum(page_rank,num,source_node_id):
    list_page_values_nodes = []
    for i in range(0,num):
        value = max(page_rank)
        node_id = page_rank.index(value)
        page_rank[node_id] = -10
        list_page_values_nodes.append((node_id,value))
        
    return list_page_values_nodes

def return_max_node_diff(page_rank_prev,page_rank_current):
    arr_prev = np.array(page_rank_prev)
    arr_current = np.array(page_rank_current)
    
    arr_result = np.amax(np.absolute(arr_prev - arr_current))
    
    return arr_result
    
    


def page_rank_per_node(num_list,jump_factor,source_node_id,page_rank):
    adjlist = num_list[1:]
    if(len(adjlist)==0):
        '''Handling Dead End Problem'''
        return [(source_node_id , page_rank[num_list[0]])]
        
    elif(len(adjlist)>0):
        
        page_value = (page_rank[num_list[0]] / (len(adjlist)+0))*(1-jump_factor)
        list_page_rank = [(adjlist[index],page_value) for index in range(0,len(adjlist))]
        '''Handling Spider Trap Problem'''
        spider_trap_pair = (source_node_id,page_rank[num_list[0]]*jump_factor)
        list_page_rank.append(spider_trap_pair)
        
        return list_page_rank
    
def personalized_page_rank_stopping_criterion(source_node_id,jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    #[(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    
    
    graph_data = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line : line.split("\t")).map(lambda val_list : mapping(val_list) )
    num_nodes = num_nodes_edges()[0]
    page_rank = [0.0]*num_nodes
    page_rank[source_node_id] = 1.0
    difference = 10.0
    threshold = 0.5 / num_nodes
    
    
    while(difference > threshold):
            graph_data.cache()
            graph_iter_map = graph_data.flatMap(lambda node_list : page_rank_per_node(node_list,jump_factor,source_node_id,page_rank)).map(lambda x : (x[0],x[1]))
            graph_iter_reduce = graph_iter_map.reduceByKey(lambda x,y : x+y).sortByKey()
            graph_iter_result = graph_iter_reduce.collect()
            page_node_update = [node for node,rank in graph_iter_result]
            page_value_update = [rank for node,rank in graph_iter_result]
            page_rank_arr = np.array(page_rank)
            page_rank_arr[page_node_update] = page_value_update
            page_rank_prev = page_rank
            page_rank = list(page_rank_arr)
            difference = return_max_node_diff(page_rank_prev,page_rank)
            
            
            
    top_ten_nodes = maximum(page_rank,10,source_node_id)
    
    return top_ten_nodes


source_node_id = int(input("Enter Node id:"))
jump_factor = float(input("Jump factor:"))

print("top ten nodes = ", personalized_page_rank_stopping_criterion(source_node_id,jump_factor))



Enter Node id:28
Jump factor:0.33
top ten nodes =  [(28, 0.53844249224954277), (1517, 0.12026767556203039), (1516, 0.12026368682524276), (846, 0.12026334287813681), (852, 0.0080588255450985136), (152, 0.0080571010555105307), (847, 0.0080560642951464263), (850, 0.0080560470960751359), (849, 0.0080560447370590267), (848, 0.0080560435988628332)]


---
#### Question 4  (4/24 marks):

Spark provides the ability to *cache* RDDs.   This is useful when an RDD will be used more than once.   Instead of computing the same RDD multiple times, it can be computed once, cached, and then re-used from the cache.   Read about caching in the Spark textbook, or in the [persistence section of the Spark RDD programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).   Caching can be particularly effective for iterative Spark applications, like those you are writing for this assignment.

For this question, go back to the code that you wrote to answer Question 3, and add caching.   (Caching will not affect the functionality of your code, i.e., what it computes.   It should only affect performance.)   Don't worry about different persistence levels.   Just use `cache()`, which caches RDDs at the default persistence level.

In addition to adding `cache()` calls in your code, use the text cell below to briefly explain how you decided which RDDs to cache. 

If you were not able to finish Question 3, add caching annotations to your solution for Question 2 instead.

#### Importance of Caching
I am caching the rdd(graph_data) which stores the structure of the graph. This is because it is the only thing which is invariant to any change during different iterations. As Pyspark uses lazy execution and in memory processing therefore it is very important to store those things in cache which are oftenly used but change seldomly. Cache such rdds enhances the performance significantly.


---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.