In [1]:
from pyspark import SparkContext
import dijkstra_spark as ds
import time

sc = SparkContext()

In [2]:
textFile = sc.textFile("../data/input.dat")

count = sc.accumulator(0)

In [3]:
nodes = textFile.map(lambda node: ds.get_keyvalue_from_line(node))
nodes.collect()

[('1', (0, ['2,10', '3,5'], '1')),
 ('2', (999, ['3,2', '4,1'], '2')),
 ('3', (999, ['2,3', '4,9', '5,2'], '3')),
 ('4', (999, ['5,4'], '4')),
 ('5', (999, ['1,7', '4,6', '6,2'], '5')),
 ('6', (999, [], '6'))]

In [4]:
nodesValues = nodes.map(lambda x: x[1])
nodesValues.collect()

[(0, ['2,10', '3,5'], '1'),
 (999, ['3,2', '4,1'], '2'),
 (999, ['2,3', '4,9', '5,2'], '3'),
 (999, ['5,4'], '4'),
 (999, ['1,7', '4,6', '6,2'], '5'),
 (999, [], '6')]

In [5]:
neighbors = nodesValues.filter(lambda nodeDataFilter: nodeDataFilter[1]).map(
        lambda nodeData: map(
            lambda neighbor: ds.customSplitNeighbor(
                nodeData[2], nodeData[0], neighbor
            ), nodeData[1]
        )
    ).flatMap(lambda x: x)

neighbors.collect()

[('2', (10, None, '1->2')),
 ('3', (5, None, '1->3')),
 ('3', (1001, None, '2->3')),
 ('4', (1000, None, '2->4')),
 ('2', (1002, None, '3->2')),
 ('4', (1008, None, '3->4')),
 ('5', (1001, None, '3->5')),
 ('5', (1003, None, '4->5')),
 ('1', (1006, None, '5->1')),
 ('4', (1005, None, '5->4')),
 ('6', (1001, None, '5->6'))]

In [6]:
mapper = nodes.union(neighbors)
mapper.collect()

[('1', (0, ['2,10', '3,5'], '1')),
 ('2', (999, ['3,2', '4,1'], '2')),
 ('3', (999, ['2,3', '4,9', '5,2'], '3')),
 ('4', (999, ['5,4'], '4')),
 ('5', (999, ['1,7', '4,6', '6,2'], '5')),
 ('6', (999, [], '6')),
 ('2', (10, None, '1->2')),
 ('3', (5, None, '1->3')),
 ('3', (1001, None, '2->3')),
 ('4', (1000, None, '2->4')),
 ('2', (1002, None, '3->2')),
 ('4', (1008, None, '3->4')),
 ('5', (1001, None, '3->5')),
 ('5', (1003, None, '4->5')),
 ('1', (1006, None, '5->1')),
 ('4', (1005, None, '5->4')),
 ('6', (1001, None, '5->6'))]

In [7]:
reducer = mapper.reduceByKey(lambda x, y: ds.min_distance(x, y))
reducer.collect()

[('4', (999, ['5,4'], '4')),
 ('3', (5, ['2,3', '4,9', '5,2'], '1->3')),
 ('6', (999, [], '6')),
 ('1', (0, ['2,10', '3,5'], '1')),
 ('2', (10, ['3,2', '4,1'], '1->2')),
 ('5', (999, ['1,7', '4,6', '6,2'], '5'))]

In [8]:
nodes2 = reducer.map(lambda node: ds.customSplitNodesIterative(node))
nodes2.collect()

[('4', (999, ['5,4'], '4')),
 ('3', (5, ['2,3', '4,9', '5,2'], '1->3')),
 ('6', (999, [], '6')),
 ('1', (0, ['2,10', '3,5'], '1')),
 ('2', (10, ['3,2', '4,1'], '1->2')),
 ('5', (999, ['1,7', '4,6', '6,2'], '5'))]

In [9]:
start_time = time.time()
nodes = textFile.map(lambda node: ds.get_keyvalue_from_line(node))

oldCount = 0
iterations = 0
while True:
    iterations += 1
    nodesValues = nodes.map(lambda x: x[1])
    neighbors = nodesValues.filter(lambda nodeDataFilter: nodeDataFilter[1]).map(
        lambda nodeData: map(
            lambda neighbor: ds.customSplitNeighbor(
                nodeData[2], nodeData[0], neighbor
            ), nodeData[1]
        )
    ).flatMap(lambda x: x)
    mapper = nodes.union(neighbors)
    reducer = mapper.reduceByKey(lambda x, y: ds.min_distance(x, y))
    nodes = reducer.map(lambda node: ds.customSplitNodesIterative(node))
    if oldCount == count.value:
        break
    oldCount=count.value

print(f'Finished after: {iterations} iterations in {round(time.time() - start_time, 2)}s')
result = reducer.map(lambda node: ds.format_result(node))

Finished after: 1 iterations in 0.06s


In [10]:
result.collect()

[('4', 999, '4'),
 ('3', 5, '1->3'),
 ('6', 999, '6'),
 ('1', 0, '1'),
 ('2', 10, '1->2'),
 ('5', 999, '5')]