---
#### Overview
For this assignment, you will be using Python and Spark to perform some graph analysis, using a graph of the Gnutella server network.   In this graph, each node represents a server, and each (directed) edge represents a connection between servers in Gnutella's peer-to-peer network.  The input file for this assignment, `p2p-Gnutella08-adj.txt`, represents the graph as an adjacency list.   Each server (node) is identified by a unique number, and each line in the file gives the adjacency list for a single server.
For example, this line:
> 91	243	1923	2194

gives the adjacency list for server `91`.   It indicates that there are edges from server `91` to servers `243`, `1923`, and `2194`.    According to the Stanford Network Analysis Project, which collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).

As you know from the previous assignment, you must tell the Python interpreter where to find Spark before performing any Spark operations in your program.   So, start by doing that:

In [1]:
import findspark
findspark.init("/u/cs451/packages/spark")

from pyspark import SparkContext, SparkConf


and then create a `SparkContext`:

In [2]:
sc = SparkContext(appName="YourTest", master="local[2]")

---
#### Question 1  (6/24 marks):

To get warmed up, you should first write Spark code to confirm or determine some basic properties of the Gnutella graph.  Write code that will display answers to the following questions:
- How many nodes and edges are there in the graph?  (This should confirm the numbers given above.)
- How many edges of each outdegree are there?   That is, how many nodes have no outgoing edges, how many have one outgoing edge, how many have two outgoing edges, and so on?
- How many edges of each indegree are there?

You should use Spark to answer these questions.   Do *not* load the entire graph into your Python driver program.   It is OK to use a separate set of Spark operations to answer each question.

In [29]:
#### Your code for Question 1 should go here
textFile = sc.textFile("p2p-Gnutella08-adj.txt")
M=textFile.count()
M1=textFile.flatMap(lambda x:x.split('\t')[1:])\
           .count()         
print('The number of nodes is {0}'.format(M))
print('The number of edges is {0}\n'.format(M1))

def outdegree(T):
    T1=T.split('\t')   
    return(((len(T1)-1),1))


M2=textFile.map(outdegree)\
           .reduceByKey(lambda a,b:a+b)
M_out=M2.collect()
count1=0
for i in range(len(M_out)):
    print('{0} nodes have {1} outgoing edges'.format(M_out[i][1],M_out[i][0]))
    count1+=M_out[i][1]
    

print('\n')

def indegree(T):
    T1=T.split('\t')
    r=[]
    
    for i in T1[1:]:
        r.append((i,1))   
    return r
        
M3=textFile.flatMap(indegree)\
           .reduceByKey(lambda a,b:a+b)\
           .map(lambda x:(x[1],1))\
           .reduceByKey(lambda a,b:a+b)
M_in=M3.collect()
count=0
for i in range(len(M_in)):
    print('{0} nodes have {1} ingoing edges'.format(M_in[i][1],M_in[i][0]))  
    count=count+M_in[i][1]
print('{0} nodes have {1} ingoing edges'.format(M-count,0))         








The number of nodes is 6301
The number of edges is 20777

3836 nodes have 0 outgoing edges
28 nodes have 2 outgoing edges
28 nodes have 4 outgoing edges
10 nodes have 6 outgoing edges
44 nodes have 8 outgoing edges
1531 nodes have 10 outgoing edges
3 nodes have 12 outgoing edges
1 nodes have 34 outgoing edges
2 nodes have 14 outgoing edges
1 nodes have 48 outgoing edges
2 nodes have 18 outgoing edges
1 nodes have 46 outgoing edges
1 nodes have 22 outgoing edges
1 nodes have 24 outgoing edges
1 nodes have 28 outgoing edges
294 nodes have 1 outgoing edges
16 nodes have 3 outgoing edges
107 nodes have 5 outgoing edges
9 nodes have 7 outgoing edges
372 nodes have 9 outgoing edges
1 nodes have 11 outgoing edges
2 nodes have 13 outgoing edges
1 nodes have 47 outgoing edges
4 nodes have 17 outgoing edges
1 nodes have 19 outgoing edges
1 nodes have 41 outgoing edges
1 nodes have 25 outgoing edges
1 nodes have 29 outgoing edges
1 nodes have 31 outgoing edges


1287 nodes have 2 ingoing edges
55

---
Your main objective for this assignment is to perform *single-source personalized page rank* over the Gnutella graph.  To get started, you should make sure that you have a clear understanding of ordinary (i.e., non-personalized) page rank.  Read the description of page rank in Section 5.3 of [the course textbook](http://mapreduce.cc/).

Personalized page rank is like ordinary page rank except:
- One node in the graph is designated as the *source* node.   Personalized page rank is performed with respect to that source node.
- Personalized page rank is initialized by assigning all probability mass to the source node, and none to the other nodes.   In contrast, ordinary page rank is initialized by giving all nodes the same probability mass.
- Whenever personalized page rank makes a random jump, it jumps back to the source node.   In contrast, ordinary page rank may jump to any node.
- In personalized page rank, all probability mass lost dangling nodes is put back into the source nodes.  In ordinary page rank, lost mass is distributed evenly over all nodes.

#### Question 2  (10/24 marks):

Your task is to write a Spark program to perform personalized page rank over the Gnutella graph for a specified number of iterations.   Your program should interactively prompt for two input values:
- source node id (a positive integer)
- iteration count (a positive integer)

Your program should then perform personalized page rank, with respect to the specified source node, over the Gnutella graph, for the specified number of iterations.
The output of your analysis should be the 10 nodes with the highest personalized page rank with respect to the given source.   For each of the 10 nodes, report the node's id and page rank value.

Be sure that your code is clearly commented.   In particular, for each significant Spark RDD that your code produces, your comments should describe the RDD.   That is, they should indicate how many elements you expect the RDD to have (in terms of graph parameters such as the number of nodes or edges) and what each element consists of.   For example, you might have a comment like this:
```
# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, list of outgoing edges for node-id)
```

In case you are not able to get this working completely, try to make as much progress as possible.   Produce code that runs and clearly document what it does, so that you can receive some partial credit.

In [None]:
# your solution to Question 3 here
textFile = sc.textFile("p2p-Gnutella08-adj.txt")



N=textFile.count()
alpha=0.1

#(A,(B,C,D))
def f1(T):
    T1=T.split('\t')
    if len(T1)==1:
        return (T1[0],())
    else:
        return (T1[0],(T1[1:]))

def f2(T):
    T1=T.split('\t')
    if T1[0]==SourceNode:
        return (T1[0],1)
    else:
        return (T1[0],0)

def f3(T):
    if T[1][0]==():
        if T[0]==SourceNode:
            return True
        else:
            return False
    else:
        return False



def f4(T):
    num=len(T[1][0])
    PR=T[1][1]
    s=[]
    if T[1][0]!=():
        for i in T[1][0]:
            s.append((i,PR/num))
    else:
        s.append((T[0],0))########
    return s

def f5(T):
    if T[1][1]==None:
        if T[0]==SourceNode:
            PR=alpha
        else:
            PR=0
    else:
        if T[0]==SourceNode:
            PR=alpha+(1-alpha)*T[1][1]
        else:
            PR=(1-alpha)*T[1][1]
    return (T[0],(T[1][0],PR))

    
def f7(T):
    if T[1][0]==():
        return True
    else:
        return False

def f8(T):
    if T[1][0]==():
        return (T[0],0) 
    else:
        return (T[0],T[1][1])
     
    
def f9(T):
    if T[0]==SourceNode:
        PR=alpha+(1-alpha)*(Lost_MASS+T[1])
    else:
        PR=(1-alpha)*T[1]
    return (T[0],PR)



def f11(T):  
    if T[1][1]==None:
        return True
    else:
        return False

# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, (list of outgoing nodes for node-id))
a1=textFile.map(f1).cache()


#(A,((B,C,D),1))


while True:
    SourceNode=input('input a source node(return to quit):')
    if len(SourceNode) == 0:
        break
    else:
        Iter=int(input('input a Iteration number:'))
        # this RDD has one element for each node in the graph
        # each element is a two-tuple:  (node-id, initial page rank value for nodeid)
        PR_value_old=textFile.map(f2)
        #(A,(([B,C,D]),1))
        # this RDD has one element for each node in the graph
        # each element is a two-tuple:  (node-id, ((list of outgoing nodes for node-id),initial page rank value for nodeid))
        a2=a1.join(PR_value_old)
        #First filter those with no outgoing edges,if the source node is one of those, then the probability mass is trapped,can output directly
        s=a2.filter(f3).count()
        if s==1:
            V=PR_value_old.sortBy(lambda x:-x[1]).take(10)
        else:
            s1=100
            for i in range(Iter):             
                a31=a1.join(PR_value_old).flatMap(f4).reduceByKey(lambda a,b:a+b) 
                # Here the reason to outerjoin is that a31 left those nodes with 0 ingoing edges. 
                a3=a1.leftOuterJoin(a31).map(f5)
                #calculate the lost Mass
                Lost_MASS=a3.filter(f7).map(lambda x:x[1][1]).reduce(lambda a,b:a+b)
                a5=a3.map(f8)
                PR_value_new=a5.map(f9)
                PR_value_old=PR_value_new
            V=PR_value_new.sortBy(lambda x:-x[1]).take(10)
        for i in range(10):
            print('The node is {0},the page rank value is {1}'.format(V[i][0],V[i][1]))


            
            
            
            
            


---
#### Question 3  (4/24 marks):

For the previous question, you implemented personalized page rank that ran for a specified number of iterations.  However, it is also common to write iterative algorithms that run until some specified termination condition is reached.
For example, for page rank, suppose the $p_i(x)$ represents the probability mass assigned to node $x$ after the $i$th iteration of the algorithm.  ($p_0(x)$ is the initial probability mass of node $x$.)   We define the change of $x$'s probability mass on the $i$th iteration as $\lvert p_i(x)-p_{i-1}(x) \rvert$.   Then, we can iterate personalized page rank until the maximum (over all nodes) change is less then a specified threshold, i.e, until all nodes' page ranks have converged.

For this question, modify your personalized page rank implementation from Question 2 so that it iterates until the 
maximum node change is less than $\frac{0.5}{N}$, where $N$ represents the number of nodes in the graph.
This version of your program should take only one input (the source node id).

If you were unable to get personalized page rank working in Question 2, replace the code cell below with a text (Markdown) cell, and briefly describe how you *would* have modified your program to incorporate a termination condition based on maximum node change. 

In [6]:
# your solution to Question 3 here
textFile = sc.textFile("p2p-Gnutella08-adj.txt")
N=textFile.count()
alpha=0.1
#(A:(B,C,D))
def f1(T):
    T1=T.split('\t')
    if len(T1)==1:
        return (T1[0],())
    else:
        return (T1[0],(T1[1:]))

#(A:1)
def f2(T):
    T1=T.split('\t')
    if T1[0]==SourceNode:
        return (T1[0],1)
    else:
        return (T1[0],0)

def f3(T):
    if T[1][0]==():
        if T[0]==SourceNode:
            return True
        else:
            return False
    else:
        return False




def f4(T):
    num=len(T[1][0])
    PR=T[1][1]
    s=[]
    if T[1][0]!=():
        for i in T[1][0]:
            s.append((i,PR/num))
    else:
        s.append((T[0],0))
    return s

def f5(T):
    if T[1][1]==None:
        if T[0]==SourceNode:
            PR=alpha
        else:
            PR=0
    else:
        if T[0]==SourceNode:
            PR=alpha+(1-alpha)*T[1][1]
        else:
            PR=(1-alpha)*T[1][1]
    return (T[0],(T[1][0],PR))

    
def f7(T):
    if T[1][0]==():
        return True
    else:
        return False

def f8(T):
    if T[1][0]==():
        return (T[0],0) 
    else:
        return (T[0],T[1][1])
     
    
def f9(T):
    if T[0]==SourceNode:
        PR=alpha+(1-alpha)*(Lost_MASS+T[1])
    else:
        PR=(1-alpha)*T[1]
    return (T[0],PR)



def f11(T):  
    if T[1][1]==None:
        return True
    else:
        return False

# this RDD has one element for each node in the graph
# each element is a two-tuple:  (node-id, (list of outgoing nodes for node-id))
a1=textFile.map(f1).cache()




while True:
    SourceNode=input('input a source node(return to quit):')
    if len(SourceNode) == 0:
        break
    else:
        # this RDD has one element for each node in the graph
        # each element is a two-tuple:  (node-id, initial page rank value for nodeid)
        PR_value_old=textFile.map(f2)
        #(A,(([B,C,D]),1))
        # this RDD has one element for each node in the graph
        # each element is a two-tuple:  (node-id, ((list of outgoing nodes for node-id),initial page rank value for nodeid))
        a2=a1.join(PR_value_old)
        #First filter those with no outgoing edges,if the source node is one of those, then the probability mass is trapped,can output directly
        s=a2.filter(f3).count()
        if s==1:
            V=PR_value_old.sortBy(lambda x:-x[1]).take(10)
        else:
            s1=100
            while s1>=0.5/N:
                #(A,((a,b,c),1))              
                a31=a1.join(PR_value_old).flatMap(f4).reduceByKey(lambda a,b:a+b)
                # Here the reason to outerjoin is that a31 left those nodes with 0 ingoing edges. 
                a3=a1.leftOuterJoin(a31).map(f5)
                #calculate the lost Mass
                Lost_MASS=a3.filter(f7).map(lambda x:x[1][1]).reduce(lambda a,b:a+b)
                a5=a3.map(f8)
                PR_value_new=a5.map(f9)
                s1=max(PR_value_old.join(PR_value_new).map(lambda x:abs(x[1][1]-x[1][0])).collect())
                PR_value_old=PR_value_new       
            V=PR_value_new.sortBy(lambda x:-x[1]).take(10)
        for i in range(10):
            print('The node is {0},the page rank value is {1}'.format(V[i][0],V[i][1]))





input a source node(return to quit):999
The node is 999,the page rank value is 0.6044085301659551
The node is 716,the page rank value is 0.04929019764593394
The node is 1001,the page rank value is 0.0490348731009766
The node is 1004,the page rank value is 0.04900788063843103
The node is 1005,the page rank value is 0.04896962508742925
The node is 1002,the page rank value is 0.048967550450710655
The node is 174,the page rank value is 0.008300396853917405
The node is 390,the page rank value is 0.005154267038611643
The node is 124,the page rank value is 0.004730618227324815
The node is 176,the page rank value is 0.004725917817696244
input a source node(return to quit):


---
#### Question 4  (4/24 marks):

Spark provides the ability to *cache* RDDs.   This is useful when an RDD will be used more than once.   Instead of computing the same RDD multiple times, it can be computed once, cached, and then re-used from the cache.   Read about caching in the Spark textbook, or in the [persistence section of the Spark RDD programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence).   Caching can be particularly effective for iterative Spark applications, like those you are writing for this assignment.

For this question, go back to the code that you wrote to answer Question 3, and add caching.   (Caching will not affect the functionality of your code, i.e., what it computes.   It should only affect performance.)   Don't worry about different persistence levels.   Just use `cache()`, which caches RDDs at the default persistence level.

In addition to adding `cache()` calls in your code, use the text cell below to briefly explain how you decided which RDDs to cache. 

If you were not able to finish Question 3, add caching annotations to your solution for Question 2 instead.

---
#### Your answer to Question 4:

*For Question 3, I basically cached the adjacency list RDD(i.e The 'a1' in Question 3, each line in a1 is like :node_id,(1st outgoing node, 2nd outgoing node,.... )),because for every iteration I have to join it with the PageRank(i.e. The 'PR_value_old' in Question 3) and use it in some other operations during the process. Since it is re-used many times and I only need to compute once, thus here I cached it*




---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.