In [1]:
from pyspark import SparkContext, SparkConf
import random
sc = SparkContext(appName="YourTest", master="local[2]", conf=SparkConf().set('spark.ui.port', random.randrange(4000,5000)))

In [4]:
def num_nodes_edges():
    """Returns a tuple (num_nodes, num_edges)"""
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    num_nodes = nodes.count()
    num_edges = nodes.map(lambda x:len(x.split("	"))-1).reduce(lambda x,y:x+y)
    return (num_nodes, num_edges)
    
    
def out_counts():
    """Returns a dictionary where the keys are the outdegrees, and the 
    values are the number of nodes of the corresponding outdegree """
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    out = nodes.map(lambda x:len(x.split("	"))-1).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[0], True)
    return out.collect()


def in_counts():
    """Returns a dictionary where the keys are the indegrees, and the 
    values are the number of nodes of the corresponding indegree """
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    ind = nodes.flatMap(lambda x : x.split("	")).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0],x[1]-1))
    ind = ind.map(lambda x : (x[1],1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[0], True)
    return ind.collect()


In [17]:
def personalized_page_rank(source_node_id, num_iterations, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    def pagerank(x):
        r = x[1][1]
        outs = x[1][0]
        ranks = []
        for o in outs:
            f = (1-jump_factor)*(r/len(outs))
            ranks.append((o,f))
        if x[0]==str(source_node_id):
            ranks.append((x[0],jump_factor))
        return ranks
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    n = nodes.map(lambda x : x.split("	")).map(lambda x:(x[0],x[1:])).cache()    
    rank = nodes.map(lambda x : x.split("	")).map(lambda x:(x[0],1)if(x[0]==str(source_node_id))else(x[0],0))
    for x in range(num_iterations):
        rank = n.join(rank).flatMap(lambda x:pagerank(x)).reduceByKey(lambda x,y:x+y)
        dif = (1-(rank.map(lambda x : x[1]).sum()))
        rank = rank.map(lambda x:(x[0],x[1]+dif)if(x[0]==str(source_node_id))else(x[0],x[1]))
    return rank.sortBy(lambda x:(x[1]), False).take(10)
    

In [19]:
def personalized_page_rank_stopping_criterion(source_node_id, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    def pagerank(x):
        r = x[1][1]
        outs = x[1][0]
        ranks = []
        for o in outs:
            f = (1-jump_factor)*(r/len(outs))
            ranks.append((o,f))
        if x[0]==str(source_node_id):
            ranks.append((x[0],jump_factor))
        return ranks
    nodes = sc.textFile("p2p-Gnutella08-adj.txt")
    n = nodes.map(lambda x : x.split("	")).map(lambda x:(x[0],x[1:])).cache()    
    ncount = n.count()
    rank = nodes.map(lambda x : x.split("	")).map(lambda x:(x[0],1)if(x[0]==str(source_node_id))else(x[0],0))
    run = True
    while run:
        prev = rank
        rank = n.join(rank).flatMap(lambda x:pagerank(x)).reduceByKey(lambda x,y:x+y).cache()
        dif = (1-(rank.map(lambda x : x[1]).sum()))
        rank = rank.map(lambda x:(x[0],x[1]+dif)if(x[0]==str(source_node_id))else(x[0],x[1])).cache()
        maxchange = rank.join(prev).mapValues(lambda x:(abs(x[0]-x[1]))).map(lambda x : x[1]).sum()
        if (.5/ncount) > maxchange:
            run = False
    return rank.sortBy(lambda x:(x[1]), False).take(10)
