## CS431/631 Big Data Infrastructure
### Winter 2018 - Assignment 3
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:** _Yangjie Zhou_
* **ID:** _20744733_

---
#### Overview
For this assignment, you will be using Python and Spark to perform some graph analysis, using a graph of the Gnutella server network.   In this graph, each node represents a server, and each (directed) edge represents a connection between servers in Gnutella's peer-to-peer network.  The input file for this assignment, `p2p-Gnutella08-adj.txt`, represents the graph as an adjacency list.   Each server (node) is identified by a unique number, and each line in the file gives the adjacency list for a single server.
For example, this line:
> 91	243	1923	2194

gives the adjacency list for server `91`.   It indicates that there are edges from server `91` to servers `243`, `1923`, and `2194`.    According to the Stanford Network Analysis Project, which collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).

As you know from the previous assignment, you must tell the Python interpreter where to find Spark before performing any Spark operations in your program.   So, start by doing that:

In [1]:
import findspark
findspark.init("/u/cs451/packages/spark")

from pyspark import SparkContext, SparkConf

and then create a `SparkContext`:

In [2]:
sc = SparkContext(appName="YourTest", master="local[2]")

---
#### Question 1  (6/24 marks):

To get warmed up, you should first write Spark code to confirm or determine some basic properties of the Gnutella graph.  Write code that will display answers to the following questions:
- How many nodes and edges are there in the graph?  (This should confirm the numbers given above.)
- How many edges of each outdegree are there?   That is, how many nodes have no outgoing edges, how many have one outgoing edge, how many have two outgoing edges, and so on?
- How many edges of each indegree are there?

You should use Spark to answer these questions.   Do *not* load the entire graph into your Python driver program.   It is OK to use a separate set of Spark operations to answer each question.

In [3]:
#### Your code for Question 1 should go here
import re
from operator import add

def processLine(line):
    line_split=re.findall(r"[0-9]+",line)
    if len(line_split)==1:
        return (int(line_split[0]),())
    return (int(line_split[0]),tuple([int(x) for x in line_split[1:]]))

def inDegree(line):
    inDegree_list=[]
    if len(line[1]) > 0:
        for i in line[1]:
            inDegree_list.append((i,1))
        return inDegree_list
## Main()
graph=sc.textFile("p2p-Gnutella08-adj.txt") \
        .map(processLine) \
        .persist()

## Question 1.1
print('\n## Question 1.1:')
print('Number of nodes:',graph.count())
print('Number of edges:',graph.map(lambda x: len(x[1])) \
                              .reduce(add),'\n')
    
## Question 1.2
print('## Question 1.2:')
od_distribution=graph.map(lambda x: len(x[1])) \
                     .countByValue()
print('n(x)=m means that there are m nodes have x outgoing edges \n')
for x in od_distribution.items():
    print('n({0})={1}'.format(x[0],x[1]))

    
    
## Question 1.3
print('\n## Question 1.3:')
id_distribution=graph.filter(lambda x:len(x[1])>0) \
              .flatMap(inDegree) \
              .reduceByKey(add) \
              .map(lambda x: x[1]) \
              .countByValue()
print('n(x)=m means that there are m nodes have x ingoing edges \n')

num_0=6301
for x in id_distribution.items():
    num_0-=x[1]
print('n({0})={1}'.format(0,num_0))

for x in id_distribution.items():
    print('n({0})={1}'.format(x[0],x[1]))


## Question 1.1:
Number of nodes: 6301
Number of edges: 20777 

## Question 1.2:
n(x)=m means that there are m nodes have x outgoing edges 

n(0)=3836
n(1)=294
n(2)=28
n(3)=16
n(4)=28
n(5)=107
n(6)=10
n(7)=9
n(8)=44
n(9)=372
n(10)=1531
n(11)=1
n(12)=3
n(13)=2
n(14)=2
n(17)=4
n(18)=2
n(19)=1
n(22)=1
n(24)=1
n(25)=1
n(28)=1
n(29)=1
n(31)=1
n(34)=1
n(41)=1
n(46)=1
n(47)=1
n(48)=1

## Question 1.3:
n(x)=m means that there are m nodes have x ingoing edges 

n(0)=80
n(1)=2452
n(2)=1287
n(3)=868
n(4)=559
n(5)=333
n(6)=227
n(7)=144
n(8)=76
n(9)=70
n(10)=37
n(11)=29
n(12)=23
n(13)=19
n(14)=13
n(15)=8
n(16)=1
n(18)=2
n(19)=2
n(20)=4
n(21)=2
n(22)=1
n(23)=1
n(25)=1
n(27)=1
n(30)=1
n(31)=2
n(32)=2
n(33)=1
n(35)=1
n(38)=1
n(41)=1
n(44)=1
n(47)=2
n(49)=1
n(50)=1
n(51)=1
n(52)=1
n(54)=1
n(55)=1
n(56)=2
n(57)=1
n(59)=1
n(60)=3
n(61)=1
n(62)=2
n(63)=1
n(66)=2
n(67)=3
n(69)=2
n(70)=3
n(71)=3
n(72)=2
n(73)=2
n(74)=1
n(77)=2
n(78)=1
n(79)=1
n(81)=4
n(82)=1
n(83)=1
n(85)=1
n(86)=1
n(87)=1
n(91)=1


---
Your main objective for this assignment is to perform *single-source personalized page rank* over the Gnutella graph.  To get started, you should make sure that you have a clear understanding of ordinary (i.e., non-personalized) page rank.  Read the description of page rank in Section 5.3 of [the course textbook](http://mapreduce.cc/).

Personalized page rank is like ordinary page rank except:
- One node in the graph is designated as the *source* node.   Personalized page rank is performed with respect to that source node.
- Personalized page rank is initialized by assigning all probability mass to the source node, and none to the other nodes.   In contrast, ordinary page rank is initialized by giving all nodes the same probability mass.
- Whenever personalized page rank makes a random jump, it jumps back to the source node.   In contrast, ordinary page rank may jump to any node.
- In personalized page rank, all probability mass lost dangling nodes is put back into the source nodes.  In ordinary page rank, lost mass is distributed evenly over all nodes.

#### Question 2  (10/24 marks):

Your task is to write a Spark program to perform personalized page rank over the Gnutella graph for a specified number of iterations.   Your program should interactively prompt for two input values:
- source node id (a positive integer)
- iteration count (a positive integer)

Your program should then perform personalized page rank, with respect to the specified source node, over the Gnutella graph, for the specified number of iterations.
The output of your analysis should be the 10 nodes with the highest personalized page rank with respect to the given source.   For each of the 10 nodes, report the node's id and page rank value.

Be sure that your code is clearly commented.   In particular, for each significant Spark RDD that your code produces, your comments should describe the RDD.   That is, they should indicate how many elements you expect the RDD to have (in terms of graph parameters such as the number of nodes or edges) and what each element consists of.   For example, you might have a comment like this:
```
# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, list of outgoing edges for node-id)
```

In case you are not able to get this working completely, try to make as much progress as possible.   Produce code that runs and clearly document what it does, so that you can receive some partial credit.

In [4]:
from operator import add

# for each row of the adjacency lists
# iterate to transfer probability for each outgoing node
def transfer(node):
    transfer_p=[]
    outdegree=node[1][0][1]
    pagerank=node[1][1]
    for i in outdegree:
        transfer_p.append((i,pagerank/len(outdegree)))
    return transfer_p

def update(node):
    node_id=node[0][0]#id
    if node[1] is None:
        p=0
    else:
        p=node[1]
        
    if node_id==source_node:
        return alpha*1+(1-alpha)*(p+mass)
    elif node_id in dangling_point:
        return 0
    return (1-alpha)*p
        
## Main()

## input values
iteration=int(input('the number of iterations:'))
source_node=int(input('the source node ID:'))
alpha=0.1

# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, two-tuple)
# this two-tuple include (node-id, list of outdegree nodes)
# since later I only use mapValues to ganrantee the hash of key won't change, the node-id is also put in the value
# which will be used for identify the dangling point and source point
# example: (0, (0, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
graph=sc.textFile("p2p-Gnutella08-adj.txt") \
        .map(processLine)\
        .map(lambda x:(x[0],(x[0],x[1]))).partitionBy(2).persist()
                
# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, rank)            
rank=graph.mapValues(lambda x:1 if x[0]==source_node else 0)

# this RDD has one element for the nodes which don't have the outdegree in the graph
# each element is a key: node-id which has been actioned to collect as a list
dangling_point=graph.filter(lambda x:len(x[1][1])==0).map(lambda x:x[0]).collect()

# Loop
for i in range(iteration):
    # this RDD has one element for the nodes which have the indegree in the graph
    # each element is a two-tuple:  (node-id, the sum of the contributions)  
    prob_transfer=graph.join(rank).flatMap(transfer).reduceByKey(add).persist()
    
    # this RDD has one element for the nodes which don't have the outdegree in the graph
    # each element is a two-tuple:  (node-id, the sum of the contributions) 
    # which has been actioned to sum for the probability mass  
    mass=prob_transfer.filter(lambda x:x[0] in dangling_point).map(lambda x:x[1]).reduce(add)
    
    # this RDD has one element for each node in the graph
    # each element is a two-tuple:  (node-id, rank)
    # while the rank is modified by the random jumping through alpha and the probability mass of the dangling point     
    rank=graph.leftOuterJoin(prob_transfer).mapValues(update)
    
    
# this RDD has 10 element for each node in the graph
# each element is a two-tuple:  (node-id, rank) which have been sorted by the personalized pagerank    
a=rank.sortBy(lambda x:x[1],ascending=False).take(10)
for i in a:
    print("sourceID:  {0} \t It's personlized pagerank:  {1}".format(i[0],i[1]))    

the number of iterations:30
the source node ID:0
sourceID:  0 	 It's personlized pagerank:  0.46474792932116404
sourceID:  9 	 It's personlized pagerank:  0.04435509457227666
sourceID:  5 	 It's personlized pagerank:  0.04431816293657589
sourceID:  7 	 It's personlized pagerank:  0.04416249240903074
sourceID:  4 	 It's personlized pagerank:  0.04396326261665782
sourceID:  3 	 It's personlized pagerank:  0.04298509143801436
sourceID:  8 	 It's personlized pagerank:  0.042968519157554355
sourceID:  249 	 It's personlized pagerank:  0.010894487309561596
sourceID:  177 	 It's personlized pagerank:  0.010417074839317237
sourceID:  353 	 It's personlized pagerank:  0.01010624810631474


#### ---
#### Question 3  (4/24 marks):

For the previous question, you implemented personalized page rank that ran for a specified number of iterations.  However, it is also common to write iterative algorithms that run until some specified termination condition is reached.
For example, for page rank, suppose the $p_i(x)$ represents the probability mass assigned to node $x$ after the $i$th iteration of the algorithm.  ($p_0(x)$ is the initial probability mass of node $x$.)   We define the change of $x$'s probability mass on the $i$th iteration as $\lvert p_i(x)-p_{i-1}(x) \rvert$.   Then, we can iterate personalized page rank until the maximum (over all nodes) change is less then a specified threshold, i.e, until all nodes' page ranks have converged.

For this question, modify your personalized page rank implementation from Question 2 so that it iterates until the 
maximum node change is less than $\frac{0.5}{N}$, where $N$ represents the number of nodes in the graph.
This version of your program should take only one input (the source node id).

If you were unable to get personalized page rank working in Question 2, replace the code cell below with a text (Markdown) cell, and briefly describe how you *would* have modified your program to incorporate a termination condition based on maximum node change. 

In [5]:
from operator import add

# for each row of the adjacency lists
# iterate to transfer probability for each outgoing node
def transfer(node):
    transfer_p=[]
    outdegree=node[1][0][1]
    pagerank=node[1][1]
    for i in outdegree:
        transfer_p.append((i,pagerank/len(outdegree)))
    return transfer_p

def update(node):
    node_id=node[0][0]#id
    if node[1] is None:
        p=0
    else:
        p=node[1]
        
    if node_id==source_node:
        return alpha*1+(1-alpha)*(p+mass)
    elif node_id in dangling_point:
        return 0
    return (1-alpha)*p
        
## Main()

## input values
source_node=int(input('the source node ID:'))
alpha=0.1
threshold=0.5/6301
delta=1
mass0=1

# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, two-tuple)
# this two-tuple include (node-id, list of outdegree nodes)
# since later I only use mapValues to ganrantee the hash of key won't change, the node-id is also put in the value
# which will be used for identify the dangling point and source point
# example: (0, (0, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
graph=sc.textFile("p2p-Gnutella08-adj.txt") \
        .map(processLine)\
        .map(lambda x:(x[0],(x[0],x[1]))).partitionBy(2).persist()
                
# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, rank)            
rank=graph.mapValues(lambda x:1 if x[0]==source_node else 0)

# this RDD has one element for the nodes which don't have the outdegree in the graph
# each element is a key: node-id which has been actioned to collect as a list
dangling_point=graph.filter(lambda x:len(x[1][1])==0).map(lambda x:x[0]).collect()

# Loop
while delta>threshold:
    # this RDD has one element for the nodes which have the indegree in the graph
    # each element is a two-tuple:  (node-id, the sum of the contributions)  
    prob_transfer=graph.join(rank).flatMap(transfer).reduceByKey(add).persist()
    
    # this RDD has one element for the nodes which don't have the outdegree in the graph
    # each element is a two-tuple:  (node-id, the sum of the contributions) 
    # which has been actioned to sum for the probability mass  
    mass1=prob_transfer.filter(lambda x:x[0] in dangling_point).map(lambda x:x[1]).reduce(add)
    
    # this RDD has one element for each node in the graph
    # each element is a two-tuple:  (node-id, rank)
    # while the rank is modified by the random jumping through alpha and the probability mass of the dangling point     
    rank=graph.leftOuterJoin(prob_transfer).mapValues(update)
    
    delta=abs(mass1-mass0)
    mass0=mass1

# this RDD has 10 element for each node in the graph
# each element is a two-tuple:  (node-id, rank) which have been sorted by the personalized pagerank    
a=rank.sortBy(lambda x:x[1],ascending=False).take(10)
for i in a:
    print("sourceID:  {0} \t It's personlized pagerank:  {1}".format(i[0],i[1]))    

the source node ID:0
sourceID:  0 	 It's personlized pagerank:  0.46474792932116404
sourceID:  9 	 It's personlized pagerank:  0.04435559015069827
sourceID:  5 	 It's personlized pagerank:  0.04431905601079985
sourceID:  7 	 It's personlized pagerank:  0.04416324862750859
sourceID:  4 	 It's personlized pagerank:  0.043963884823998385
sourceID:  3 	 It's personlized pagerank:  0.04298557624087746
sourceID:  8 	 It's personlized pagerank:  0.042968825016626674
sourceID:  249 	 It's personlized pagerank:  0.010895474932644483
sourceID:  177 	 It's personlized pagerank:  0.010417680575450302
sourceID:  353 	 It's personlized pagerank:  0.010106989236940261


---
#### Question 4  (4/24 marks):

Spark provides the ability to *cache* RDDs.   This is useful when an RDD will be used more than once.   Instead of computing the same RDD multiple times, it can be computed once, cached, and then re-used from the cache.   Read about caching in the Spark textbook, or in the [persistence section of the Spark RDD programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).   Caching can be particularly effective for iterative Spark applications, like those you are writing for this assignment.

For this question, go back to the code that you wrote to answer Question 3, and add caching.   (Caching will not affect the functionality of your code, i.e., what it computes.   It should only affect performance.)   Don't worry about different persistence levels.   Just use `cache()`, which caches RDDs at the default persistence level.

In addition to adding `cache()` calls in your code, use the text cell below to briefly explain how you decided which RDDs to cache. 

If you were not able to finish Question 3, add caching annotations to your solution for Question 2 instead.

---
#### Your answer to Question 4:

*Actually I have add the cache function in the above code which allow me to get the result much more faster. After I give partitions to the `graph` I use `persist()` to keep it in the memory which can be also replaced by `cache`.*

*Since later in the iteration stage, the RDD `graph` is keeping being used during the computation of the sum of the contributions and the final modified personalized page rank. Otherwise, every time, for example, when I update the rank for purpose of the random jumping and the dangling points, the driver needs to load the data again and process the previous work which will waste quite a bit resource and time. Besides, keep the partitions unchanged in cache will be helpful for the `join()` function since it is no need to shuffle the graph part whose hash of keys have been saved in memory*




---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.