# Spark Assignment

## Find Spark
Some meme magic to be able to use Spark locally. If this does not work for you, just replace all of this with whatever you use.

In [1]:
# Print some env variables
# Commented-out lines were old tests to set up the environment variables from
# Python, but this turned out to either not work or not be needed anyway.
import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/opt/apache-spark"
#os.environ["PYTHONPATH"] = os.path.join(os.environ["SPARK_HOME"], "python") + os.pathsep + os.path.join(os.environ["SPARK_HOME"], "python/build")
#os.environ["PATH"] += os.pathsep + os.path.join(os.environ["SPARK_HOME"], "python") + os.pathsep + os.path.join(os.environ["SPARK_HOME"], "python/build")
print(os.environ["SPARK_HOME"]) # /opt/apache-spark
print(os.environ["PYTHONPATH"]) # $SPARK_HOME/python:$SPARK_HOME/python/build

/opt/apache-spark
$SPARK_HOME/python:$SPARK_HOME/python/build


In [2]:
# This was the most convenient way to make Spark work for me
import findspark
findspark.init()

In [3]:
from pyspark import SparkConf, SparkContext

# WARNING: You may want to customise the following values to your setup
USE_FULL_CLUSTER = False
if USE_FULL_CLUSTER:
    # This line is for the 3-node hardware cluster that we set up in class for the memes:
    #   - osmium (Librem Mini v1, Intel Core i7-8565U (4c/8t), 32 GiB DDR4) - MASTER
    #   - graphite (HP ProBook 4740s, Intel Core i7-2670QM (4c/8t), 8 GiB DDR3)
    #   - bloodfest (Prodrive Hermes, Intel Xeon E-2226G (6c/6t), 32 GiB DDR4)
    conf = SparkConf().setMaster("spark://osmium:7077").setAppName("My ASSignment")
else:
    # This line is for the "cluster" with a single master/worker node (graphite), which
    # is optimised for portability. No need to carry and set up a mini PC, a full ATX
    # server/workstation mainboard on a cardboard box and a router EVERY time; just plop
    # down the laptop, start the Spark services and you're ready to ignite the Spark :^)
    conf = SparkConf().setMaster("spark://127.0.0.1:7077").setAppName("My ASSignment")

In [4]:
sc = SparkContext.getOrCreate(conf=conf)
#sc.close()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/28 17:23:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Actual Assignment Begins Here

In [5]:
import networkx as nx

G = nx.DiGraph()
G.add_weighted_edges_from([
    ("A", "B", 3.0), ("A", "C", 10.0), ("A", "E", 4.0),
    ("B", "C", 2.0), ("B", "D", 8.0),  ("B", "F", 7.0),
    ("C", "D", 5.0), ("C", "G", 3.0),
    ("D", "H", 6.0),
    ("E", "F", 2.0), ("E", "I", 9.0),
    ("F", "G", 1.0), ("F", "J", 5.0),
    ("G", "H", 2.0), ("G", "K", 4.0),
    ("I", "J", 3.0),
    ("J", "K", 6.0),
])

# or https://networkx.org/documentation/stable/reference/generators.html
# G = nx.barabasi_albert_graph(100, 3)
# G = nx.watts_strogatz_graph(100, 4, 0.1)
# G = nx.random_geometric_graph(100, 0.1)
# G = nx.random_k_out_graph(100, 4, 0.5)
# G = nx.random_k_graph(100, 4)
# G = nx.random_power_law_graph(100, 3, 0.1)

In [6]:
import math

class GraphNode:
    def __init__(self, name, neighbours):
        self.name = name
        self.neighbours = neighbours
        self.distance = math.inf
        self.visited = False
        self.path = []

    def __str__(self):
        return f"('{self.name}', ({self.neighbours}, {self.distance}, {self.visited}, {self.path}))"

def to_node_graph(nx_graph, start_city):
    graph = {node: GraphNode(node, {nbr: nx_graph.edges[node, nbr]["weight"] for nbr in nx_graph.successors(node)}) for node in nx_graph.nodes()}
    start_node = graph.get(start_city)
    if start_node is None:
        raise KeyError(f"City '{start_city}' not in graph")
    start_node.distance = 0
    return graph

def make_node_graph():
    return to_node_graph(G, "A")

In [7]:
for city, node in make_node_graph().items():
    print(f"{city}: {node}")

A: ('A', ({'B': 3.0, 'C': 10.0, 'E': 4.0}, 0, False, []))
B: ('B', ({'C': 2.0, 'D': 8.0, 'F': 7.0}, inf, False, []))
C: ('C', ({'D': 5.0, 'G': 3.0}, inf, False, []))
E: ('E', ({'F': 2.0, 'I': 9.0}, inf, False, []))
D: ('D', ({'H': 6.0}, inf, False, []))
F: ('F', ({'G': 1.0, 'J': 5.0}, inf, False, []))
G: ('G', ({'H': 2.0, 'K': 4.0}, inf, False, []))
H: ('H', ({}, inf, False, []))
I: ('I', ({'J': 3.0}, inf, False, []))
J: ('J', ({'K': 6.0}, inf, False, []))
K: ('K', ({}, inf, False, []))


## Sequential Implementation
Done to use as reference to verify the parallel implementation.

In [8]:
def sequential_solver(graph):
    while True:
        closest_node = None
        for node in graph.values():
            if not node.visited and (closest_node is None or node.distance < closest_node.distance):
                closest_node = node

        if closest_node is None:
            break

        closest_node.visited = True
        # Original path was a reference to another path, make a copy before modifying.
        # This is done to avoid making unnecessary list copies inside the loop below.
        closest_node.path = closest_node.path.copy()
        closest_node.path.append(closest_node.name)
        for nbr, nbr_dist in closest_node.neighbours.items():
            node = graph.get(nbr)
            if node is None or node.visited:
                continue

            new_dist = closest_node.distance + nbr_dist
            if new_dist < node.distance:
                node.distance = new_dist
                # Delay making copies until we need to modify this.
                node.path = closest_node.path
    return sorted(graph.values(), key=lambda node: node.name)

seq_node_graph = sequential_solver(make_node_graph())

## Parallel Implementation

In [9]:
def parallel_solver(in_graph):
    def one_iteration(graph):
        # We absolutely have to make a new RDD each iteration, or else we end up chaining
        # operations across iterations for some reason. Which is HORRIBLE for performance.
        rdd = sc.parallelize(graph)
        def fold_nodes(u, v):
            if u is None:
                return v
            elif v is None:
                return u
            else:
                return v if u.distance > v.distance else u 
        closest_node = rdd.filter(lambda node: not node.visited).fold(None, fold_nodes)
        if closest_node is None:
            return rdd.collect(), True

        closest_node.visited = True
        # Original path was a reference to another path, make a copy before modifying.
        # This is done to avoid making unnecessary list copies inside the loop below.
        closest_node.path = closest_node.path.copy()
        closest_node.path.append(closest_node.name)
        def update_node(t):
            (node, nbr_dist) = t
            node.distance = closest_node.distance + nbr_dist
            # Delay making copies until we need to modify this.
            node.path = closest_node.path
            return node
        # Since RDDs are immutable, we have to create a new RDD with the updated values
        # Behold: this (un)holy line of code is functional programming at its peak
        upd = rdd.filter(lambda node: not node.visited).map(lambda node: (node, closest_node.neighbours.get(node.name))).filter(lambda t: t[1] is not None).filter(lambda t: closest_node.distance + t[1] < t[0].distance).map(update_node)
        # Now we have to convert the RDDs to a key-value pair to do a left outer join. We
        # also have to update the closest node, thankfully we can throw it in with a union.
        def to_kv(node):
            return (node.name, node)
        def to_node(kv):
            return kv[1]
        def merge_kvrdd(kvrdd_base, kvrdd_over):
            def coalesce_joined_kvrdd(kv2):
                (k, (lv, rv)) = kv2
                return (k, lv) if rv is None else (k, rv)
            return kvrdd_base.leftOuterJoin(kvrdd_over).map(coalesce_joined_kvrdd)
        new_rdd = merge_kvrdd(rdd.map(to_kv), upd.map(to_kv).union(sc.parallelize([to_kv(closest_node)]))).map(to_node)
        return new_rdd.collect(), False

    graph = in_graph.values()
    for _ in range(len(graph)):
        graph, do_exit = one_iteration(graph)
        if do_exit:
            break
    # Sorted so that comparisons are easy to make
    graph.sort(key=lambda node: node.name)
    return graph

par_node_graph = parallel_solver(make_node_graph())

                                                                                

## Results Comparison
They should be the same.

In [11]:
print("Seq:")
for node in seq_node_graph:
    print(f"{node.name}: {node.distance}, {node.visited}, {node.path}")
print("Par:")
for node in par_node_graph:
    print(f"{node.name}: {node.distance}, {node.visited}, {node.path}")

Seq:
A: 0, True, ['A']
B: 3.0, True, ['A', 'B']
C: 5.0, True, ['A', 'B', 'C']
D: 10.0, True, ['A', 'B', 'C', 'D']
E: 4.0, True, ['A', 'E']
F: 6.0, True, ['A', 'E', 'F']
G: 7.0, True, ['A', 'E', 'F', 'G']
H: 9.0, True, ['A', 'E', 'F', 'G', 'H']
I: 13.0, True, ['A', 'E', 'I']
J: 11.0, True, ['A', 'E', 'F', 'J']
K: 11.0, True, ['A', 'E', 'F', 'G', 'K']
Par:
A: 0, True, ['A']
B: 3.0, True, ['A', 'B']
C: 5.0, True, ['A', 'B', 'C']
D: 10.0, True, ['A', 'B', 'C', 'D']
E: 4.0, True, ['A', 'E']
F: 6.0, True, ['A', 'E', 'F']
G: 7.0, True, ['A', 'E', 'F', 'G']
H: 9.0, True, ['A', 'E', 'F', 'G', 'H']
I: 13.0, True, ['A', 'E', 'I']
J: 11.0, True, ['A', 'E', 'F', 'J']
K: 11.0, True, ['A', 'E', 'F', 'G', 'K']
