# Notebook Set-Up

In [1]:
# imports
import re
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt

In [2]:
from pyspark.sql import SparkSession
app_name = "EC"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

23/04/26 11:56:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
testRDD = sc.textFile('soc-sign-bitcoinotc.csv')

In [4]:
testRDD.take(5)

                                                                                

['6,2,4,1289241911.72836',
 '6,5,2,1289241941.53378',
 '1,15,1,1289243140.39049',
 '4,3,7,1289245277.36975',
 '13,16,8,1289254254.44746']

# SSSP Implementation

![alt text](Dijkstra_Animation.gif)

The Dijkstra's algorithm is the well known algorithm of finding the shortest path for weighted graph. The above animation (source: https://en.wikipedia.org/wiki/Dijkstra's_algorithm) illustrates the algorithm. It is a BFS traversal algorithm using priority queue to keep track of the frontier nodes from iteration to iteration. In order to implement this in a distributed parallel computing environment with map and reduce, we have to avoid the global priority queue. Thus, we need to define node states and keep track of them distributedly. To this end, each node can be in one of the following state to identify its status of traversing the graph:
  - Visited
  - Queue
  - Unvisited

First, I initialize the graph with records in the form of `node_id (edges, distance_from_source, path_from_source, state)`. The source node will be initialized with distance 0 and path is the source node itself. The state of source node is `Q`, namely, the source node is initialized as the frontier node. All the other nodes are initialized with distance infinity to the source node and state of `U`, unvisited state. The initialization function is shown below.

### Initialize the Graph

In [5]:
def initGraph(dataRDD, sourceID):
    """
    Spark job to read in the raw data and initialize an 
    adjacency list representation with a record for each
    node, initial distance to sourceID, and graph traversal state.
    
    Returns: 
        graphRDD -  a pair RDD of (node_id , (out_nodes, distance, path, state))
    """
    # helper functions
    def parse(line):
        node, dst, dist, _ = line.split(',')
        dist = int(dist)
        if dist < 0:
            dist = 0
        if (node != sourceID):
            yield (node, ({dst: dist}, np.Inf, [], 'U'))
        else:
            yield (node, ({dst: dist}, 0, [node], 'Q'))
    # main Spark code
    graphRDD = dataRDD.flatMap(parse).reduceByKey(lambda x, y: (dict(x[0], **y[0]), x[1], x[2], x[3])).cache()
    return graphRDD

In [6]:
def Dijkstra(initGraphRDD, verbose = True):
    accum_Qstate = sc.accumulator(0)
    # helper function
    def update_Qstate_counter(x):
        if (x[1][3] == 'Q'):
            accum_Qstate.add(1)
    def mapper(x):
        if (x[1][3] == 'U' or x[1][3] == 'V'):
            yield x
        else: # Q state
            yield x[0], (x[1][0], x[1][1], x[1][2], 'V')
            for k,v in x[1][0].items():
                if (x[1][2]):
                    yield k, ({}, x[1][1] + v, x[1][2] + [k] , 'Q')
                else:
                    yield k, ({}, x[1][1] + v, [k], 'Q')
    def reducer(x, y):
        """Weighted Graph:
          if Q,V and distance(Q) < distance(V):    new_state = Q
            else:    new_state = V
          if Q,U:    new_state = Q """
        out_nodes = {**x[0], **y[0]}
        if (x[1] <= y[1]):
            dist = x[1]
            path = x[2]
        else:
            dist = y[1]
            path = y[2]
        if (x[3] == y[3]):
            state = x[3]
        elif (x[3] == 'V' and y[3] == 'Q'):
            if (y[1] < x[1]):
                state = 'Q'
            else:
                state = 'V'
        elif (x[3] == 'Q' and y[3] == 'V'):
            if (x[1] < y[1]):
                state = 'Q'
            else:
                state = 'V'
        elif ((x[3] == 'Q' and y[3] == 'U') or (x[3] == 'U' and y[3] == 'Q')):
            state = 'Q'
        elif ( (x[3] == 'V' and y[3] == 'U') or (x[3] == 'U' and y[3] == 'V')):
            state = 'V'
        return (out_nodes, dist, path, state)
           
            
    # main spark code
    rdd = initGraphRDD
    rdd.foreach(update_Qstate_counter)
    iteration = 0
    while (accum_Qstate.value):
        rdd = rdd.flatMap(mapper).reduceByKey(reducer).cache()
        accum_Qstate.value = 0
        rdd.foreach(update_Qstate_counter)
        if (verbose):
            iteration += 1
            print(f"At the end of iteration {iteration}, number of frontier nodes is {accum_Qstate.value}.")
    return rdd

In [7]:
%%time
sourceID = '4'
testGraph = initGraph(testRDD, sourceID )
start = time.time()
test_results = Dijkstra(testGraph, verbose = False).collect()
print(f'Completed in {time.time() - start} seconds.')
nodes = set()
edges = []
for line in test_results:
    (n, x) = line
    if (x[3] == 'V'):
        print(f'{sourceID}->{n}: distance={x[1]}, path={x[2]}')

Completed in 3.074655294418335 seconds.
4->1: distance=1, path=['4', '832', '492', '1']
4->34: distance=1, path=['4', '832', '270', '3756', '1363', '1352', '13', '34']
4->17: distance=1, path=['4', '832', '270', '3756', '1810', '4681', '1565', '17']
4->20: distance=0, path=['4', '832', '732', '3314', '135', '2877', '20']
4->375: distance=2, path=['4', '832', '732', '3314', '135', '2877', '219', '375']
4->522: distance=0, path=['4', '832', '732', '3314', '135', '2877', '522']
4->4: distance=0, path=['4']
4->3219: distance=1, path=['4', '832', '270', '3756', '1810', '4870', '4172', '1612', '3219']
4->1859: distance=1, path=['4', '832', '270', '3756', '2388', '3451', '2647', '1859']
4->3756: distance=0, path=['4', '832', '270', '3756', '1363', '1352', '1316', '3756']
4->3757: distance=0, path=['4', '832', '270', '3756', '1363', '1352', '1316', '3757']
4->3759: distance=0, path=['4', '832', '270', '3756', '1363', '1352', '2045', '3759']
4->2840: distance=1, path=['4', '832', '270', '3756',

## Bellman-Ford

### Initialize the Graph

In [8]:
def initGraph(dataRDD, sourceID):
    """
    Spark job to read in the raw data and initialize an 
    adjacency list representation with a record for each
    node, initial distance to sourceID, and graph traversal state.
    
    Returns: 
        graphRDD -  a pair RDD of (node_id , (out_nodes, distance, path, state))
    """
    # helper functions
    def parse(line):
        node, dst, dist, _ = line.split(',')
        dist = int(dist)
        if dist < 0:
            dist = 0
        if (node != sourceID):
            yield (node, ({dst: dist}, np.Inf, []))
        else:
            yield (node, ({dst: dist}, 0, [node]))
    # main Spark code
    graphRDD = dataRDD.flatMap(parse).reduceByKey(lambda x, y: (dict(x[0], **y[0]), x[1], x[2])).cache()
    return graphRDD

In [9]:
def Bellman_Ford(initGraphRDD, verbose = True):

    def mapper(x):
        yield x[0], (x[1][0], x[1][1], x[1][2])
        for k,v in x[1][0].items():
            if (x[1][2]):
                yield k, ({}, x[1][1] + v, x[1][2] + [k])
            else:
                yield k, ({}, x[1][1] + v, [k])
    def reducer(x, y):
        out_nodes = {**x[0], **y[0]}
        if (x[1] <= y[1]):
            dist = x[1]
            path = x[2]
        else:
            dist = y[1]
            path = y[2]
        return (out_nodes, dist, path)
           
            
    # main spark code
    rdd = initGraphRDD.cache()
    for i in range(testGraph.count() - 1):
        new_rdd = rdd.flatMap(mapper).reduceByKey(reducer).cache()
        if (verbose):
            print(f"At the end of iteration {i}")
        if not rdd.join(new_rdd).map(lambda x:x[1][0][1] != x[1][1][1]).reduce(lambda x, y: x or y):
            break
        rdd = new_rdd
    
    return rdd

In [10]:
%%time
sourceID = '4'
testGraph = initGraph(testRDD, sourceID)
start = time.time()
test_results2 = Bellman_Ford(testGraph, verbose = True).collect()
print(f'Completed in {time.time() - start} seconds.')
nodes = set()
edges = []
for line in test_results2:
    (n, x) = line
    print(f'{sourceID}->{n}: distance={x[1]}, path={x[2]}')

At the end of iteration 0
At the end of iteration 1
At the end of iteration 2
At the end of iteration 3
At the end of iteration 4
At the end of iteration 5
At the end of iteration 6
At the end of iteration 7
At the end of iteration 8
At the end of iteration 9
At the end of iteration 10
At the end of iteration 11
Completed in 3.9523961544036865 seconds.
4->1: distance=1, path=['4', '832', '492', '1']
4->34: distance=1, path=['4', '832', '270', '3756', '1363', '1352', '13', '34']
4->17: distance=1, path=['4', '832', '732', '3314', '135', '2877', '522', '17']
4->20: distance=0, path=['4', '832', '732', '3314', '135', '2877', '20']
4->54: distance=2, path=['4', '832', '492', '1', '54']
4->56: distance=1, path=['4', '832', '270', '3756', '1363', '1352', '13', '56']
4->74: distance=2, path=['4', '832', '492', '1', '74']
4->68: distance=1, path=['4', '832', '64', '68']
4->119: distance=2, path=['4', '832', '492', '1', '119']
4->138: distance=2, path=['4', '832', '492', '1', '138']
4->110: dis