In [1]:
# %load degrees-of-separation.py
#Boilerplate stuff:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)

In [2]:
# The characters we wish to find the degree of separation between:
startCharacterID = 5306 #SpiderMan
targetCharacterID = 14  #ADAM 3,031 (who?)

# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)

In [13]:
# 5983, 1165, 3386, 4361, 1282 => (5983, (1165, 3836, 4361, 1282), 9999, WHITE) 
def convertToBFS(line):
    fields = line.split()
    heroID = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))

    color = 'WHITE'
    distance = 9999

    if (heroID == startCharacterID):
        color = 'GRAY'
        distance = 0

    return (heroID, (connections, distance, color))

In [14]:
def createStartingRdd():
    inputFile = sc.textFile("file:///home/dmadhok/spark_course/Marvel-Graph.txt")
#     inputFile = sc.textFile("file:///sparkcourse/marvel-graph.txt")
    return inputFile.map(convertToBFS)

In [15]:
def bfsReduce(data1, data2):
    edges1 = data1[0]
    edges2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    color1 = data1[2]
    color2 = data2[2]

    distance = 9999
    color = color1
    edges = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Preserve darkest color
    if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
        color = color2

    if (color1 == 'GRAY' and color2 == 'BLACK'):
        color = color2

    if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
        color = color1

    if (color2 == 'GRAY' and color1 == 'BLACK'):
        color = color1

    return (edges, distance, color)

In [16]:
def bfsMap(node):
    characterID = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    color = data[2]

    results = []

    #If this node needs to be expanded...
    if (color == 'GRAY'):
        for connection in connections:
            newCharacterID = connection
            newDistance = distance + 1
            newColor = 'GRAY'
            if (targetCharacterID == connection):
                hitCounter.add(1)

            newEntry = (newCharacterID, ([], newDistance, newColor))
            results.append(newEntry)

        #We've processed this node, so color it black
        color = 'BLACK'

    #Emit the input node so we don't lose it.
    results.append( (characterID, (connections, distance, color)) )
    return results

In [25]:
#Main program here:
iterationRdd = createStartingRdd()
print ("Starting RDD: ")
print (iterationRdd.take(5))
print ("\n")

for iteration in range(0, 10):
    print("Running BFS iteration# " + str(iteration+1))

    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRdd.flatMap(bfsMap)
    print (mapped.take(5))

    # Note that mapped.count() action here forces the RDD to be evaluated, and
    # that's the only reason our accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")

    if (hitCounter.value > 0):
        print("Hit the target character! From " + str(hitCounter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID, preserving the darkest
    # color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)

Starting RDD: 
[(5988, ([748, 1722, 3752, 4655, 5743, 1872, 3413, 5527, 6368, 6085, 4319, 4728, 1636, 2397, 3364, 4001, 1614, 1819, 1585, 732, 2660, 3952, 2507, 3891, 2070, 2239, 2602, 612, 1352, 5447, 4548, 1596, 5488, 1605, 5517, 11, 479, 2554, 2043, 17, 865, 4292, 6312, 473, 534, 1479, 6375, 4456], 9999, 'WHITE')), (5989, ([4080, 4264, 4446, 3779, 2430, 2297, 6169, 3530, 3272, 4282, 6432, 2548, 4140, 185, 105, 3878, 2429, 1334, 4595, 2767, 3956, 3877, 4776, 4946, 3407, 128, 269, 5775, 5121, 481, 5516, 4758, 4053, 1044, 1602, 3889, 1535, 6038, 533, 3986], 9999, 'WHITE')), (5982, ([217, 595, 1194, 3308, 2940, 1815, 794, 1503, 5197, 859, 5096, 6039, 2664, 651, 2244, 528, 284, 1449, 1097, 1172, 1092, 108, 3405, 5204, 387, 4607, 4545, 3705, 4930, 1805, 4712, 4404, 247, 4754, 4427, 1845, 536, 5795, 5978, 533, 3984, 6056], 9999, 'WHITE')), (5983, ([1165, 3836, 4361, 1282, 716, 4289, 4646, 6300, 5084, 2397, 4454, 1913, 5861, 5485], 9999, 'WHITE')), (5980, ([2731, 3712, 1587, 6084, 2472, 254