<a href="https://colab.research.google.com/github/Pere91/SAC_Indirect/blob/main/SAC_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [103]:
!apt-get update -qq
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar xf spark-3.5.7-bin-hadoop3.tgz

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [104]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["SPARK_HOME"], "bin")

In [105]:
from pyspark import SparkConf, SparkContext
import networkx as nx

In [106]:
conf = SparkConf().setMaster("local").setAppName("SAC_Spark")
sc = SparkContext.getOrCreate(conf=conf)

In [107]:
G = nx.DiGraph()
G.add_weighted_edges_from([
    ("A", "C", 3.0), ("A", "F", 2.0),
    ("C", "D", 4.0), ("C", "F", 2.0), ("C", "E", 1.0),
    ("F", "E", 3.0), ("F", "G", 5.0), ("F", "B", 6.0),
    ("E", "B", 2.0),
    ("D", "B", 1.0),
    ("A", "C", 3.0),
    ("G", "B", 2.0)
])

In [108]:
def get_neighbors(node):
  return node[1][0]

In [109]:
def mark_visited(node):
  return (node[0], (node[1][0], node[1][1], True, node[1][3]))

In [110]:
def generate_path(src, dst):
  return src[1][3] + [dst[0]]

In [111]:
def explored(node):
  return node[1][2]

In [112]:
def dijkstra(src):

  # Build graph taking node 'src' as starting node
  pyspark_graph = []
  graph_dict = {}

  for node in G.nodes():
    neighbors = [(nbr, G.edges[node, nbr]["weight"]) for nbr in G.successors(node)]

    if node == src:
      weight = 0
      path = [src]
    else:
      weight = float("inf")
      path = []

    pyspark_graph.append((node, (neighbors, weight, False, path)))
    graph_dict[node] = (neighbors, weight, False, path)


  vertices = sc.parallelize(pyspark_graph)

  # Iterate until all nodes have been explored
  while True:

    # Get the unvisited nodes
    unvisited_nodes = vertices.filter(lambda x: not x[1][2])
    if unvisited_nodes.isEmpty():
      break

    # Find the lowest cost node
    next_node = unvisited_nodes.reduce(lambda x, y: x if x[1][1] < y[1][1] else y)

    # Mark node as visited
    next_node = mark_visited(next_node)

    # Get current node neighbors that need updating
    neighbor_list = get_neighbors(next_node)
    neighbor_dict = dict(neighbor_list)
    neighbors = vertices.filter(lambda x: x[0] in [n[0] for n in neighbor_list] and min(neighbor_dict[x[0]] + next_node[1][1], x[1][1]) == neighbor_dict[x[0]] + next_node[1][1])

    # Update costs and paths
    updated_neighbors = neighbors.map(lambda x: (x[0], (x[1][0], neighbor_dict[x[0]] + next_node[1][1], x[1][2], generate_path(next_node, x)))).collect()

    # Update the full graph
    graph_dict[next_node[0]] = next_node[1]
    graph_dict.update(dict(updated_neighbors))
    pyspark_graph = list(graph_dict.items())
    vertices = sc.parallelize(pyspark_graph)

  return vertices.collect()


In [113]:
dijkstra('A')

[('A', ([('C', 3.0), ('F', 2.0)], 0, True, ['A'])),
 ('C', ([('D', 4.0), ('F', 2.0), ('E', 1.0)], 3.0, True, ['A', 'C'])),
 ('F', ([('E', 3.0), ('G', 5.0), ('B', 6.0)], 2.0, True, ['A', 'F'])),
 ('D', ([('B', 1.0)], 7.0, True, ['A', 'C', 'D'])),
 ('E', ([('B', 2.0)], 4.0, True, ['A', 'C', 'E'])),
 ('G', ([('B', 2.0)], 7.0, True, ['A', 'F', 'G'])),
 ('B', ([], 6.0, True, ['A', 'C', 'E', 'B']))]