In [1]:
from pyspark import SparkConf, SparkContext
import os 

os.environ["PYSPARK_PYTHON"]="python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="python3"

In [2]:
#TAGS
UNPROCESSED = 'UNPROCESSED'
PROCESSED = 'PROCESSED'
QUEUE = 'QUEUE'

In [3]:
def convertToGraph(line):
    fields = line.split()
    id = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))

    tag = UNPROCESSED
    distance = 9999

    if (id == startId):
        tag = QUEUE
        distance = 0

    return (id, (connections, distance, tag))

In [4]:
def expandGraph(node):
    id = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    tag = data[2]
    results = []
    
    if (tag == QUEUE):
        for connection in connections:
            newId = connection
            newDistance = distance + 1
            newTag = QUEUE
            if (targetId == connection):
                hitCounter.add(1)

            newEntry = (newId, ([], newDistance, newTag))
            results.append(newEntry)
        
        tag = PROCESSED
    
    results.append( (id, (connections, distance, tag)) )
    return results    

In [5]:
def reduceGraph(data1, data2):
    node1 = data1[0]
    node2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    tag1 = data1[2]
    tag2 = data2[2]

    distance = 9999
    tag = tag1
    nodes = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(node1) > 0):
        nodes.extend(node1)
    if (len(node2) > 0):
        nodes.extend(node2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Move queues along
    if (tag1 == UNPROCESSED and (tag2 == QUEUE or tag2 == PROCESSED)):
        tag = tag2

    if (tag1 == QUEUE and tag2 == PROCESSED):
        tag = tag2

    if (tag2 == UNPROCESSED and (tag1 == QUEUE or tag1 == PROCESSED)):
        tag = tag1

    if (tag2 == QUEUE and tag1 == PROCESSED):
        tag = tag1

    return (nodes, distance, tag)

In [6]:
#Set up
conf = SparkConf().setMaster("local").setAppName("BreadthFirstSearch")
sc = SparkContext(conf = conf)

In [7]:
#Search IDs. Change start and target IDs to get differnt search results
startId = 5306
targetId = 100
hitCounter = sc.accumulator(0)

In [8]:
#Main code
inputFile = sc.textFile("test-data/data.txt")
BFSrdd = inputFile.map(convertToGraph)

for iter in range(0, 10):
    print("Running BFS iteration# " + str(iter+1))
    
    # Expand the graph to explore unprocessed nodes
    mapped = BFSrdd.flatMap(expandGraph)

    print("Processing " + str(mapped.count()) + " values.")

    if (hitCounter.value > 0):
        print("Hit the target id! From " + str(hitCounter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID
    BFSrdd = mapped.reduceByKey(reduceGraph)

Running BFS iteration# 1
Processing 8330 values.
Running BFS iteration# 2
Processing 220615 values.
Hit the target id! From 4 different direction(s).
