**Assignment 8: PageRank with Spark**

Amanda Baker, adbaker

Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you under the algorithm well before starting on this exercise. 

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext() 

# page rank

In [2]:
def parse_line(line):
    '''helper method of read_graph; takes in a line from a file (line must be in format: [page name, page neighbor, 
    page neighbor, ... page neighbor]) and parses the line to return the page and a list of its neighbors'''
    line = line.split()
    y = line[0]
    neighbors_lst = [line[i] for i in range(1,len(line))]
    return y, neighbors_lst

In [3]:
def read_graph(fname):
    '''reads a file, parses each line with function parse_line, and returns an RDD of all pages and a list of that page's
    neighbors'''
    file = sc.textFile(fname)
    graph = file.map(lambda x: parse_line(x))
    return graph

In [4]:
read_graph('graph-1.txt').collect()

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]

In init_ranks you will need to use what is termed a "broadcast" variable. Do read online about these.  Following are some links 
[link 1](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html), [link 2](https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/when_to_use_broadcast_variable.html), [link 3](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#shared-variables)

In [5]:
def init_ranks(graph):
    '''initializes page ranks based on the number of pages to be ranked, and returns and RDD of pages and their 
    initial ranking'''
    num_nodes = graph.count()
    init_ranks = graph.map(lambda x: (x[0], 1/num_nodes))
    return init_ranks

In [6]:
g = read_graph('graph-1.txt')
init_ranks(g).collect()

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]

In [7]:
def calc_partials(x):
    '''helper method of calc_contribs; calculates the probability of navigating to a neighbor page based on how many other pages
    a user could navigate to from the current page, and returns the page and its partial probability'''
    for n in x[1][0]:
        yield n, (1/len(x[1][0]))*x[1][1]
    yield x[0], 0

In [8]:
def calc_contribs(ranks, graph):
    '''joins the pages in a graph and their current ranks, and returns an RDD that contains a page 
    and a partial probability'''
    join_r_g = graph.join(ranks)
    contribs = join_r_g.flatMap(lambda x: calc_partials(x))
    return contribs
    

In [9]:
g = read_graph('graph-1.txt')
r = init_ranks(g)

**Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?**

We also pair with 0 to ensure that websites that aren't linked to (like 'D' in graph-1.txt) aren't lost in the pageRank.

In [10]:
c1 = calc_contribs(r, g)
c1.collect()

[('B', 0.125),
 ('C', 0.125),
 ('A', 0),
 ('C', 0.25),
 ('B', 0),
 ('A', 0.25),
 ('C', 0),
 ('C', 0.25),
 ('D', 0)]

In [11]:
def calc_ranks(contribs, num_nodes, beta=.85):
    '''groups all partial probabilities together by page (the key), and returns an RDD that calculates each page's
    new rank by summing all partial probabilities, multipyling the sum by beta, and adding the product to (1-beta)/num_nodes;
    returns an RDD of each page and it's new page rank'''
    calc = contribs.groupByKey()
    probs = calc.map(lambda x: (x[0], ((1-beta)/num_nodes) + (beta * sum(x[1]))))
    return probs

In [12]:
calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()

[('B', 0.14375), ('A', 0.25), ('C', 0.56875), ('D', 0.037500000000000006)]

In [13]:
def pagerank(fname, beta=0.85, n=10):
    '''calculates pagerank by reading in a graph of pages and page neighbors, intializing page ranks based on number of 
    pages, and re-calculating each page's rank n times; returns a list of tuples that include the page name and page rank'''
    g = read_graph(fname)
    r = init_ranks(g)
    for i in range(n):    
        c1 = calc_contribs(r, g)
        r = calc_ranks(c1, num_nodes=g.count(), beta=0.85)
    return r.collect()

In [14]:
sorted(pagerank('graph-1.txt', beta=0.85, n=10))

[('A', 0.375054382302053),
 ('B', 0.1949370588413849),
 ('C', 0.3925085588565621),
 ('D', 0.037500000000000006)]


***Additional testing on graph-2.txt and wikipedia-example.txt.

In [15]:
# sorted(pagerank('graph-2.txt', beta=0.85, n=10))

[('A', 0.3552329235133619),
 ('B', 0.18087715033826585),
 ('C', 0.26671408227310545),
 ('D', 0.08133891194042742),
 ('E', 0.11583693193483957)]

In [16]:
# sorted(pagerank('wikipedia-example.txt', beta=.85, n=10))

[('A', 0.02764650931040434),
 ('B', 0.3036191818465732),
 ('C', 0.3097295723325943),
 ('D', 0.03297885857776607),
 ('E', 0.06821469112858616),
 ('F', 0.03297885857776607),
 ('G', 0.01363636363636364),
 ('H', 0.01363636363636364),
 ('I', 0.01363636363636364),
 ('J', 0.01363636363636364),
 ('K', 0.01363636363636364)]