In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)

In [2]:
# Set the characters we wish to find the degree of separation between 

In [41]:
startCharacterID = 5306 # Spiderman
targetCharacterID = 467 # BERSERKER II 859 #CAPTAIN AMERICA 14 # Adam

An accumulator allows many executors to increment a shared variable. 
counter = sc.accumulator(0) sets up a shared accumulator with initial value of 0. We can increment this value upon iteration and check for specific conditions once after iteration.

In [42]:
counter = sc.accumulator(0)

Breadth-First Search(BFS)

In [43]:
# BFS implementation

def convertToBFS(line):
    fields = line.split()
    heroID = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))
    color = 'WHITE'
    distance = 9999
    if(heroID == startCharacterID):
        color = 'GRAY'
    distance = 0
    return (heroID,(connections,distance,color))

In [44]:
def createStartingRdd():
    inputFile = sc.textFile("file:///C:/Users/dennis/Spark/Spark/resources/Marvel-Graph.txt")
    return inputFile.map(convertToBFS)

In [45]:
def bfsMap(node):
    characterID = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    color = data[2]

    results = []
    #If this node needs to be expanded...
    if (color == 'GRAY'):
        for connection in connections:
            newCharacterID = connection
            newDistance = distance + 1
            newColor = 'GRAY'
            if (targetCharacterID == connection):
                counter.add(1)

            newEntry = (newCharacterID, ([], newDistance, newColor))
            results.append(newEntry)

        #We've processed this node, so color it black
        color = 'BLACK'

    #Emit the input node so we don't lose it.
    results.append( (characterID, (connections, distance, color)) )
    return results

In [46]:
def bfsReduce(data1, data2):
    edges1 = data1[0]
    edges2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    color1 = data1[2]
    color2 = data2[2]

    distance = 9999
    color = color1
    edges = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Preserve darkest color
    if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
        color = color2

    if (color1 == 'GRAY' and color2 == 'BLACK'):
        color = color2

    if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
        color = color1

    if (color2 == 'GRAY' and color1 == 'BLACK'):
        color = color1

    return (edges, distance, color)

In [47]:
#Main program here:
iterationRdd = createStartingRdd()

In [48]:
for iteration in range(0, 100):
    print("Running BFS iteration# " + str(iteration+1))

    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRdd.flatMap(bfsMap)

    # Note that mapped.count() action here forces the RDD to be evaluated, and
    # that's the only reason our accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")

    if (counter.value > 0):
        print("Hit the target character! From " + str(counter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID, preserving the darkest
    # color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)

Running BFS iteration# 1
Processing 8330 values.
Running BFS iteration# 2
Processing 220615 values.
Running BFS iteration# 3
Processing 126747 values.
Running BFS iteration# 4
Processing 6777 values.
Running BFS iteration# 5
Processing 6486 values.
Running BFS iteration# 6
Processing 6486 values.
Running BFS iteration# 7
Processing 6486 values.
Running BFS iteration# 8
Processing 6486 values.
Running BFS iteration# 9
Processing 6486 values.
Running BFS iteration# 10
Processing 6486 values.
Running BFS iteration# 11
Processing 6486 values.
Running BFS iteration# 12
Processing 6486 values.
Running BFS iteration# 13
Processing 6486 values.
Running BFS iteration# 14
Processing 6486 values.
Running BFS iteration# 15
Processing 6486 values.
Running BFS iteration# 16
Processing 6486 values.
Running BFS iteration# 17
Processing 6486 values.
Running BFS iteration# 18
Processing 6486 values.
Running BFS iteration# 19
Processing 6486 values.
Running BFS iteration# 20
Processing 6486 values.
Runni