In [1]:
from __future__ import division

# Initialize
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

<h3>Question 13.1</h3>

<p><i>Write a basic Spark implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input. Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).</i></p>

<h4>Solution</h4>

<p>We import the PageRank test file and use that as our starting point. From there, a simple loop is computed over the data to complete the PageRank algorithm.</p>

<p>References: 
<ul>
<li><a href="https://www.youtube.com/watch?v=_Wc9OkMKS3g">Sink Nodes in PageRank</a></li>
<li><a href="https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py">PageRank in Spark</a> (Apache)</li>
</ul>

In [153]:
# Parse 
def parseRaw(line):
    
    """ This function parses the graph elements from the dictionary."""
    
    # Separate
    array = line.split('\t')
    node, neighbors = array 
    neighbors = eval(neighbors)
    
    # Emit
    for k in neighbors.keys(): 
        yield (node, [k])
        yield (k, [])
        

# Calculate PageRank 
def emitPR(line):
    
    # Unpack
    node, prTuple = line
    prList, rank = prTuple
    
    # Emit 
    for neighbor in prList: 
        yield (neighbor, rank / len(prList))
        
        # Danglers 
        yield (node, 0)
        
        
# Compute PR with Teleport 
def dampenedPR(line, d=0.85):
        
    # Unpack
    node, PR = line 
    
    # Update 
    PR *= d
    PR += (1 - d + d * danglerLoss / totalDanglers ) / totalNodes
    
    # Emit 
    return (node, PR)    
        
        
######## INITIALIZE #########

# Load 
graphData = sc.textFile('./pagerank_test.txt').flatMap(lambda x: parseRaw(x)).reduceByKey(lambda a, b: a + b)
rank = graphData.map(lambda x: (x[0], 1))

# Dangling 
danglers = graphData.filter(lambda x: not bool(x[1]))
totalDanglers = danglers.count()

# Diag
totalNodes = graphData.count()

# PageRank
for i in range(10):

    # Neighbor contributions 
    PR = graphData.join(rank).flatMap(lambda x: emitPR(x)).reduceByKey(lambda a, b: a + b)
    
    # Dangling PR
    danglerPR = PR.join(danglers).map(lambda x: (x[0], x[1][0]))
    danglerLoss = danglerPR.map(lambda x: x[1]).sum()

    # Dampening
    dPR = PR.map(lambda x: dampenedPR(x))

    # Normalize 
    totalWeight = dPR.map(lambda x: x[1]).sum()
    nPR = dPR.map(lambda x: (x[0], x[1] / totalWeight))

    # Cycle 
    rank = nPR

# Results 
print rank.map(lambda x: (x[1], x[0])).sortByKey(ascending=False).take(5)


[(0.37985079809520234, 'C'), (0.36031999461053826, u'B'), (0.0772920744806844, 'E'), (0.037443828260832486, 'F'), (0.037443828260832486, u'D')]


<h3>Question 13.2</h3>

<p><i>Run your Spark PageRank implementation on the Wikipedia dataset for 10 iterations,
and display the top 100 ranked nodes (with alpha = 0.85).</i></p>

<h4>Solution</h4>

<p>We write our Python file with the necessary adjustments to point to the S3 bucket. </p>

In [154]:
%%writefile sparkEMR.py

from __future__ import division

# Initialize
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

# Parse 
def parseRaw(line):
    
    """ This function parses the graph elements from the dictionary."""
    
    # Separate
    array = line.split('\t')
    node, neighbors = array 
    neighbors = eval(neighbors)
    
    # Emit
    for k in neighbors.keys(): 
        yield (node, [k])
        yield (k, [])
        

# Calculate PageRank 
def emitPR(line):
    
    # Unpack
    node, prTuple = line
    prList, rank = prTuple
    
    # Emit 
    for neighbor in prList: 
        yield (neighbor, rank / len(prList))
        
        # Danglers 
        yield (node, 0)
        
        
# Compute PR with Teleport 
def dampenedPR(line, d=0.85):
        
    # Unpack
    node, PR = line 
    
    # Update 
    PR *= d
    PR += (1 - d + d * danglerLoss / totalDanglers ) / totalNodes
    
    # Emit 
    return (node, PR)    
        
        
######## INITIALIZE #########


dataFile = ("s3n://AKIAJQOD4KMA46R45NCA:"
    "ASonGZ4q98UmcQInjZHXEy8VWbgc/E5BojK9UwuE"
    "@s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt")

# Load 
graphData = sc.textFile(dataFile).flatMap(lambda x: parseRaw(x)).reduceByKey(lambda a, b: a + b)
rank = graphData.map(lambda x: (x[0], 1))

# Dangling 
danglers = graphData.filter(lambda x: not bool(x[1]))
totalDanglers = danglers.count()

# Diag
totalNodes = graphData.count()

# PageRank
for i in range(10):

    # Neighbor contributions 
    PR = graphData.join(rank).flatMap(lambda x: emitPR(x)).reduceByKey(lambda a, b: a + b)
    
    # Dangling PR
    danglerPR = PR.join(danglers).map(lambda x: (x[0], x[1][0]))
    danglerLoss = danglerPR.map(lambda x: x[1]).sum()

    # Dampening
    dPR = PR.map(lambda x: dampenedPR(x))

    # Normalize 
    totalWeight = dPR.map(lambda x: x[1]).sum()
    nPR = dPR.map(lambda x: (x[0], x[1] / totalWeight))

    # Cycle 
    rank = nPR

# Results 
print rank.map(lambda x: (x[1], x[0])).sortByKey(ascending=False).take(100)


Writing sparkEMR.py
