Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you under the algorithm well before starting on this exercise. 

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext() 

# page rank

In [289]:
def parse_line(line):
    return line.split()

In [290]:
parse_line("A B C")

['A', 'B', 'C']

In [8]:
def read_graph(fname):
    return sc.textFile(fname)

In [13]:
read_graph('graph-1.txt').collect()

['A B C', 'B C', 'C A', 'D C']

In [40]:
def init_ranks(graph):
    letters = graph.groupBy(lambda x: x[0])
    num_letters = letters.count()
    return letters.map(lambda x : (x[0], 1/num_letters)).sortByKey(ascending=True)

In [101]:
g = read_graph('graph-1.txt')
#test = g.map(lambda x : isinstance(x, str))
#print(test.collect())
print(init_ranks(g).collect())

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]


In [278]:
def calc_partials(line_lst):
    _, *neighbors = line_lst
    print(neighbors)
    neighbors_lst = []
    for neigh in neighbors:
        neighbors_lst.append(1/len(neighbors))
    return neighbors_lst

In [279]:
print(calc_partials(['A','B','C']))

['B', 'C']
[0.5, 0.5]


In [420]:
def calc_contribs(ranks, graph):
    # replace initial ranks with 0
    zero_ranks = ranks.map(lambda x : (x[0], 0))
    
    init_rank = 1/ranks.count()
    #print("ranks")
    #print(ranks.collect())
    line = graph.map(lambda x : parse_line(x))
    neigh_partials = line.flatMap(lambda x : calc_partials(x))
    #print("neigh_partials")
    #print(neigh_partials.collect())
    neighs = graph.flatMap(lambda x : x[1:]).filter(lambda x : x != ' ')
    
    
    neigh_ranks = neigh_partials.map(lambda x : init_rank)
    #print("neigh_ranks")
    #print(neigh_ranks.collect())
    
    
    adj_ranks = neigh_ranks.zip(neigh_partials)
    calc_adj_ranks = adj_ranks.map(lambda x : x[0]*x[1])
    tmp = neighs.zip(calc_adj_ranks)
    

    
    
    final = tmp.union(zero_ranks)
    return final

In [421]:
g = read_graph('graph-1.txt')
r = init_ranks(g)
calc_contribs(r, g).collect()

[('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0),
 ('B', 0),
 ('C', 0),
 ('D', 0)]

In [422]:
g = read_graph('graph-1.txt')
r = init_ranks(g)

Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?

In [423]:
c1 = calc_contribs(r, g)
c1.collect()

[('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0),
 ('B', 0),
 ('C', 0),
 ('D', 0)]

In [424]:
def calc_ranks(contribs, num_nodes, beta):
    groups = contribs.groupByKey()
    #print(groups.collect())
    
    # just to visually confirm what's in the list, don't use for calculations
    groups_viewable = groups.map(lambda x : (x[0], list(x[1]))) 
    
    sum_groups = groups.map(lambda x : (x[0], sum(x[1])))
    #print(sum_groups.collect())
    calc = sum_groups.map(lambda x : (x[0], (1-beta)/num_nodes + beta*x[1]))
    return calc

In [427]:
calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()
new = calc_ranks(c1, num_nodes=g.count(), beta=0.85)
print(new.collect())
test = calc_contribs(new,g)
print(test.collect())

[('C', 0.56875), ('A', 0.25), ('B', 0.14375), ('D', 0.037500000000000006)]
[('B', 0.125), ('C', 0.125), ('C', 0.25), ('A', 0.25), ('C', 0.25), ('C', 0), ('A', 0), ('B', 0), ('D', 0)]


In [430]:
def pagerank_helper(rank,graph,beta,n):
    num_nodes = graph.count()
    if n == 0:
        return rank
    else:
        c2 = calc_contribs(rank, graph)
        new_rank = calc_ranks(c2, num_nodes, beta)
        print(new_rank.collect())
        n-=1
        return pagerank_helper(new_rank,graph,beta,n)
    

def pagerank(fname, beta=0.85, n=10):
    g = read_graph(fname)
    #print(g.collect())
    r = init_ranks(g)
    #print(r.collect())
    return pagerank_helper(r,g,beta,n)

In [431]:
pagerank('graph-1.txt', beta=0.85, n=10).collect()

[('C', 0.56875), ('A', 0.25), ('B', 0.14375), ('D', 0.037500000000000006)]
[('C', 0.56875), ('D', 0.037500000000000006), ('B', 0.14375), ('A', 0.25)]
[('A', 0.25), ('D', 0.037500000000000006), ('C', 0.56875), ('B', 0.14375)]
[('A', 0.25), ('C', 0.56875), ('B', 0.14375), ('D', 0.037500000000000006)]
[('C', 0.56875), ('D', 0.037500000000000006), ('A', 0.25), ('B', 0.14375)]
[('D', 0.037500000000000006), ('C', 0.56875), ('A', 0.25), ('B', 0.14375)]
[('B', 0.14375), ('A', 0.25), ('D', 0.037500000000000006), ('C', 0.56875)]
[('D', 0.037500000000000006), ('B', 0.14375), ('C', 0.56875), ('A', 0.25)]
[('A', 0.25), ('C', 0.56875), ('B', 0.14375), ('D', 0.037500000000000006)]
[('A', 0.25), ('B', 0.14375), ('C', 0.56875), ('D', 0.037500000000000006)]


[('A', 0.25), ('B', 0.14375), ('C', 0.56875), ('D', 0.037500000000000006)]