**Question 3**

In your local machine’s Spark setup, develop a PySpark program using PySpark RDD APIs to perform the following tasks. 

Show your full PySpark program and provide screenshots and results for all key steps where applicable.

Data sources used in this question are: 

**(i) 5-node-graph.txt,**

**(ii) 20-node-graph.txt,**

**(iii) 40-node-graph.txt.**

Note that these data files can be downloaded from ICT337 Canvas webpage.

Based on PySpark framework, we like to implement Dijkstra’s algorithm:

https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm

So as to compute shortest path from a given source node to every other destination nodes in a weighted graph.

**Resources:**

Dijkstra's algorithm in 3 minutes

https://www.youtube.com/watch?v=_lHSawdgXpI

Graph Data Structure 4. Dijkstra’s Shortest Path Algorithm

https://www.youtube.com/watch?v=pVfj6mxhdMw

In [8]:
from pyspark.sql import SparkSession

from pyspark.sql import functions as f

from pyspark.sql.functions import *

from functools import reduce

from builtins import max as py_max

import matplotlib.pyplot as plt

import seaborn as sns

import pandas as pd

import numpy as np

import sys

""" Set the SPARK_LOCAL_IP environment variable: Before running your script, set this environment variable: """
import os
os.environ['SPARK_LOCAL_IP'] = 'localhost'

# Start spark session

""" Set the spark.driver.bindAddress: Add the following configuration to your SparkSession builder:

Use a specific port: If the issue persists, try specifying a port explicitly: """
spark = SparkSession \
    .builder \
    .appName("ICT337 ECA July 2024 Semester Question 3") \
    .config("spark.driver.bindAddress", "localhost") \
    .config("spark.driver.port", "4043") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

""" Get the SparkContext from the SparkSession 

This line of code retrieves the SparkContext object from a SparkSession: 

The SparkContext (sc) is the main entry point for Spark functionality, 

allowing you to create RDDs and perform lower-level operations. """
sc = spark.sparkContext


"""
5-node-graph.txt data: 
1 0 2,10:3,5:
2 10000 3,2:4,1:
3 10000 2,3:4,9:5,2:
4 10000 5,4:
5 10000 1,7:4,6:
"""

'\n5-node-graph.txt data: \n1 0 2,10:3,5:\n2 10000 3,2:4,1:\n3 10000 2,3:4,9:5,2:\n4 10000 5,4:\n5 10000 1,7:4,6:\n'

**Question 3(a) (3 marks)**

Read the 5-node-graph.txt input file and parse the input to RDD structure of: 

(node_ID,(distance, list of neighbors with associated weight, path)). 

Show the RDD content.


In [9]:
""" Question 3(a) (3 marks) """
print("\nQuestion 3(a) (3 marks)\n")

""" Read the 5-node-graph.txt file and store the content using Spark RDDs.  """
five_node_graph_Spark_RDD = sc.textFile("/Users/shawnyang/Downloads/ICT337 ECA July 2024 Semester/ECA Datasets/5-node-graph.txt")

""" Defines a function to process each line of the input file  """
def parse_line_from_node_graph_file(line):
    """ The line is split into parts using line.split() """
    line_Parts = line.split()

    """ The first part of each line is converted to an integer and assigned as the node_id. """
    line_Node_ID = int( line_Parts[0] )

    """ The second part of each line is converted to an integer and assigned as the distance. """
    line_Distance = int( line_Parts[1] )

    """ An empty list neighbors is initialized. """
    node_neighbours_List = []

    """ Use an if condition to check if there are more than two parts in the line (i.e., if there are neighbors):  """
    if len(line_Parts) > 2:
        
        """ Checks if there are more than 3 parts, loop through each neighbor to split the third part by ':' 
        
        to separate different neighbors. """
        for node_Neighbour in line_Parts[2].split(':'):
            
            """ Checks if the neighbor string is not empty. """
            if node_Neighbour:
                
                """ For each neighbor, it splits by ',' to get the neighbor's ID and weight. """
                node_Neighbor_Node_ID, node_Neighbor_Weight  = node_Neighbour.split(',')

                """ The neighbor's ID and weight are converted to integers and 
                
                appended as a tuple to the neighbors list. """
                node_neighbours_List.append(
                    ( int(node_Neighbor_Node_ID), int(node_Neighbor_Weight) )
                )
    
    """ A path list is created. If the node_id is 1, it contains [1], otherwise it's empty. """

    """ An empty list called path is initialized. """
    path_List = []
    
    """ Checks if the node_id is equal to 1. """
    if line_Node_ID == 1:
        """ If the condition is true, it appends the node_id to the path list. """
        path_List.append(line_Node_ID)

    """ Finally, it returns a tuple containing the node_id and another tuple with distance, neighbors, and path. """
    return ( line_Node_ID, (line_Distance, node_neighbours_List, path_List) )

""" Uses map() to apply the parse_line() function to each line, creating the required RDD structure. 

Apply the parse_line function to each line in five_node_graph_rdd. """
node_graph_RDD_Five = five_node_graph_Spark_RDD.map(parse_line_from_node_graph_file)

print("5-node-graph.txt RDD Content with the structure (node_ID,(distance, list of neighbors with associated weight, path)):\n")

""" Show the 5-node-graph.txt input file RDD content

Collect all elements from the distributed graph_rdd into the driver program as a list and starts iterating over them. """
for node_graph_RDD_Five_Contents in node_graph_RDD_Five.collect():

    """ For each item in the collected list, it prints the item to the console. """
    print(node_graph_RDD_Five_Contents)




Question 3(a) (3 marks)

5-node-graph.txt RDD Content with the structure (node_ID,(distance, list of neighbors with associated weight, path)):

(1, (0, [(2, 10), (3, 5)], [1]))
(2, (10000, [(3, 2), (4, 1)], []))
(3, (10000, [(2, 3), (4, 9), (5, 2)], []))
(4, (10000, [(5, 4)], []))
(5, (10000, [(1, 7), (4, 6)], []))


**Question 3(b) (8 marks)**

Based on the input from Question 3(a), design and implement an iterative Dijkstra’s algorithm using infinite while loop. 

The final output from the Dijkstra computation should be: 

A list of (node_ID, (shortest path distance, path traversal)). 

We assume that the source node is node_ID = 1 and therefore an example of path traversal output is: 

“1→X→ … →Y→D”, 

Where X, Y are intermediate nodes and D is destination node.

In [10]:
""" Question 3(b) (8 marks) """
print("\nQuestion 3(b) (8 marks)\n")


""" This function defines a custom minimum finder to replace Python's built-in min() function, 

which may not be available in certain Spark environments. """
def custom_Minimum_Finder(iterable, key=None):

    """ Checks if the iterable is empty. If so, it raises a ValueError. """
    if not iterable:
        raise ValueError("iterable is empty")

    
    """ We define a new function called identity that simply returns its input. """
    def identity(x):
        return x

    """ If no key function is provided, it uses the identity function to key (returns the item itself). """
    if key is None:
        key = identity
    
    """ Initializes the minimum item and its value with the first item in the iterable. """
    mininmum_Item = iterable[0]
    
    minimum_Value = key(mininmum_Item)
    
    """ Iterates through the remaining items, updating min_item and min_value if a smaller value is found. """
    for item in iterable[1:]:
        value = key(item)

        if value < minimum_Value:
            mininmum_Item = item
            minimum_Value = value
    
    """ Returns the item with the minimum value. """
    return mininmum_Item


""" This is the main function implementing Dijkstra's algorithm. It takes a graph RDD as input. """
def dijkstra_Algorithm(node_graph_RDD_Five):

    """ This inner function updates the distances for neighboring nodes. """
    def update_Distances_For_Neighbouring_Nodes(node):
        """ It unpacks the node information. """
        node_ID, (neighbour_Distance, node_Neighbours, node_Path) = node

        """ It creates an empty list to store the updated neighbor information. """
        updated_Neighbours = []
        
        """ It iterates through each neighbor and its weight. """
        for node_neighbour, node_weight in node_Neighbours:
            
            """ For each neighbor, it calculates the new distance and creates a new path. """
            new_Neighbour_Distance = neighbour_Distance + node_weight
            
            new_Neighbour_Path = node_Path + [node_neighbour]

            """ It appends the updated information for each neighbor to the list. """
            updated_Neighbours.append((node_neighbour, (new_Neighbour_Distance, new_Neighbour_Path)))

        """  Finally, it returns the list of updated neighbors. """
        return updated_Neighbours


    """ This inner function chooses the shortest path between two options. 
    
    This function compares the first element (index 0) of two tuples a and b,
     
    which represent the distances of two paths.  """
    def select_Shortest_Path(a, b):
        if a[0] < b[0]:
            """  It returns the tuple with the smaller distance, effectively selecting the shorter path. """
            return a
        else:
            return b


    """ Initializes the visited set with the starting node (1) 
    
    and the result list with the starting node's information. """
    visited_set_with_Starting_Node = set([1])

    shortest_Distance = {1: 0}

    shortest_Path = {1: [1]}


    """ This is the main loop of the algorithm, continuing until all nodes have been visited. """
    while len(visited_set_with_Starting_Node) < node_graph_RDD_Five.count():

        """ We define a new function is_node_unvisited that takes a node tuple 
        
        and returns True if the node has not been visited yet. """
        def nodes_yet_to_be_Reached(unreached_Node_Tuple):
            unreached_Node_ID = unreached_Node_Tuple[0]

            return unreached_Node_ID not in visited_set_with_Starting_Node


        """ This part finds candidate nodes to visit next, filtering out already visited nodes 
        
        and choosing the shortest path to each unvisited node. """
        next_Candidate_Reachable_Nodes = node_graph_RDD_Five \
            .flatMap(update_Distances_For_Neighbouring_Nodes) \
                .filter(nodes_yet_to_be_Reached) \
                    .reduceByKey(select_Shortest_Path) \
                        .collect()
        
        """ Finally, if there are no more candidate reachable nodes, the loop breaks. """
        if not next_Candidate_Reachable_Nodes:
            break


        """ We define a new function get_node_distance that takes a node tuple and returns the distance value
         
        which is the first element (index 0) of the second element (index 1) of the tuple. """
        def get_Next_Candidate_Reachable_Node_Distance(candidate_Node_Tuple):
            return candidate_Node_Tuple[1][0]

        
        """ Replace the min() function with the custom minimum finder 
        
        Selects the next node to visit based on the shortest distance. """
        next_Reachable_Node, (neighbour_Distance, node_Path) = custom_Minimum_Finder(next_Candidate_Reachable_Nodes, key=get_Next_Candidate_Reachable_Node_Distance)

        
        """ We add the next reachable node to the visited set"""
        visited_set_with_Starting_Node.add(next_Reachable_Node)

        """ Update the visited set shortest distance and path in the respective dictionaries.  """
        shortest_Distance[next_Reachable_Node] = neighbour_Distance

        shortest_Path[next_Reachable_Node] = node_Path

        """ Update the graph RDD with the new distance and path information 
        
        We define a separate function called update_node_info. This function takes a node as input and returns the updated node information."""
        def update_Shortest_Path_Node_Info(node):

            """ Extract the node_id and its current information (distance, neighbors, and path). """
            node_ID = node[0]

            current_Shortest_Distance, neighbor_Node, current_Shortest_Path = node[1]
            
            """ Check if we have a new shortest distance for this node in the shortest_Distance dictionary. """
            if node_ID in shortest_Distance:
                """ If we do, we use the new distance and path; otherwise, we keep the current ones. """
                new_Shortest_distance = shortest_Distance[node_ID]

                new_Shortest_Path = shortest_Path[node_ID]

            else:
                new_Shortest_distance = current_Shortest_Distance

                new_Shortest_Path = current_Shortest_Path

            """ Return the updated node information. """
            return (node_ID, (new_Shortest_distance, neighbor_Node, new_Shortest_Path))

        """ We then use this update_node_info function with the map operation on our RDD to update all nodes. """
        node_graph_RDD_Five = node_graph_RDD_Five.map(update_Shortest_Path_Node_Info)


    """ Initialize an empty result list """
    dijkstra_Algorithm_Results_List = []
    
    """  We iterate through the sorted keys of shortest_Distance """
    for node_ID in sorted(shortest_Distance.keys()):

        """ Create a tuple for each node with its shortest distance and path, and append it to the result list. """
        dijkstra_Algorithm_Results_List.append((node_ID, (shortest_Distance[node_ID], shortest_Path[node_ID])))

    return dijkstra_Algorithm_Results_List


# Run the algorithm
shortest_Paths_List = dijkstra_Algorithm(node_graph_RDD_Five)

# Format and print the results
print("Show the iterative Dijkstra's algorithm RDD Content with the structure (node_ID, (shortest path distance, path traversal)):\n")

""" Prints the final result, showing the shortest distance and path for each node. """
for node_ID, (shortest_Distance, shortest_Path) in shortest_Paths_List:

    path_string = '→'.join(map(str, shortest_Path))

    print(f"Node {node_ID}: (Shortest distance: {shortest_Distance}, Path: {path_string})")





Question 3(b) (8 marks)

Show the iterative Dijkstra's algorithm RDD Content with the structure (node_ID, (shortest path distance, path traversal)):

Node 1: (Shortest distance: 0, Path: 1)
Node 2: (Shortest distance: 8, Path: 1→3→2)
Node 3: (Shortest distance: 5, Path: 1→3)
Node 4: (Shortest distance: 9, Path: 1→3→2→4)
Node 5: (Shortest distance: 7, Path: 1→3→5)


**Question 3(c) (8 marks)**

Based on your program in Question 3(b), explain in detailed on how the shortest path computation works by showing the results of each iteration step. 

Also, explain what is the condition to break from the infinite while loop.


**Question 3(d) (4 marks)**

Show the number of iterations to complete the shortest path computation for 5-node graph.txt.

Also, show the final output as: 

A list of (node_ID, (shortest path distance, path traversal)), sorted by ascending node_ID.

In [11]:
""" Question 3(d) (4 marks) """
print("\nQuestion 3(d) (4 marks)\n")


""" This is the main function implementing Dijkstra's algorithm. It takes a graph RDD as input. """
def dijkstra_Algorithm(node_graph_RDD_Five):

    """ This inner function updates the distances for neighboring nodes. """
    def update_Distances_For_Neighbouring_Nodes(node):
        """ It unpacks the node information. """
        node_ID, (neighbour_Distance, node_Neighbours, node_Path) = node

        """ It creates an empty list to store the updated neighbor information. """
        updated_Neighbours = []
        
        """ It iterates through each neighbor and its weight. """
        for node_neighbour, node_weight in node_Neighbours:
            
            """ For each neighbor, it calculates the new distance and creates a new path. """
            new_Neighbour_Distance = neighbour_Distance + node_weight
            
            new_Neighbour_Path = node_Path + [node_neighbour]

            """ It appends the updated information for each neighbor to the list. """
            updated_Neighbours.append((node_neighbour, (new_Neighbour_Distance, new_Neighbour_Path)))

        """  Finally, it returns the list of updated neighbors. """
        return updated_Neighbours


    """ This inner function chooses the shortest path between two options. 
    
    This function compares the first element (index 0) of two tuples a and b,
     
    which represent the distances of two paths.  """
    def select_Shortest_Path(a, b):
        if a[0] < b[0]:
            """  It returns the tuple with the smaller distance, effectively selecting the shorter path. """
            return a
        else:
            return b


    """ Initializes the visited set with the starting node (1) 
    
    and the result list with the starting node's information. """
    visited_set_with_Starting_Node = set([1])

    shortest_Distance = {1: 0}

    shortest_Path = {1: [1]}

    number_of_Iterations = 0


    """ This is the main loop of the algorithm, continuing until all nodes have been visited. """
    while len(visited_set_with_Starting_Node) < node_graph_RDD_Five.count():

        number_of_Iterations = number_of_Iterations + 1

        """ We define a new function is_node_unvisited that takes a node tuple 
        
        and returns True if the node has not been visited yet. """
        def nodes_yet_to_be_Reached(unreached_Node_Tuple):
            unreached_Node_ID = unreached_Node_Tuple[0]

            return unreached_Node_ID not in visited_set_with_Starting_Node


        """ This part finds candidate nodes to visit next, filtering out already visited nodes 
        
        and choosing the shortest path to each unvisited node. """
        next_Candidate_Reachable_Nodes = node_graph_RDD_Five \
            .flatMap(update_Distances_For_Neighbouring_Nodes) \
                .filter(nodes_yet_to_be_Reached) \
                    .reduceByKey(select_Shortest_Path) \
                        .collect()
        
        """ Finally, if there are no more candidate reachable nodes, the loop breaks. """
        if not next_Candidate_Reachable_Nodes:
            break


        """ We define a new function get_node_distance that takes a node tuple and returns the distance value
         
        which is the first element (index 0) of the second element (index 1) of the tuple. """
        def get_Next_Candidate_Reachable_Node_Distance(candidate_Node_Tuple):
            return candidate_Node_Tuple[1][0]

        
        """ Replace the min() function with the custom minimum finder 
        
        Selects the next node to visit based on the shortest distance. """
        next_Reachable_Node, (neighbour_Distance, node_Path) = custom_Minimum_Finder(next_Candidate_Reachable_Nodes, key=get_Next_Candidate_Reachable_Node_Distance)

        
        """ We add the next reachable node to the visited set"""
        visited_set_with_Starting_Node.add(next_Reachable_Node)

        """ Update the visited set shortest distance and path in the respective dictionaries.  """
        shortest_Distance[next_Reachable_Node] = neighbour_Distance

        shortest_Path[next_Reachable_Node] = node_Path



        """ Update the graph RDD with the new distance and path information 
        
        We define a separate function called update_node_info. This function takes a node as input and returns the updated node information."""
        def update_Shortest_Path_Node_Info(node):

            """ Extract the node_id and its current information (distance, neighbors, and path). """
            node_ID = node[0]

            current_Shortest_Distance, neighbor_Node, current_Shortest_Path = node[1]
            
            """ Check if we have a new shortest distance for this node in the shortest_Distance dictionary. """
            if node_ID in shortest_Distance:
                """ If we do, we use the new distance and path; otherwise, we keep the current ones. """
                new_Shortest_distance = shortest_Distance[node_ID]

                new_Shortest_Path = shortest_Path[node_ID]

            else:
                new_Shortest_distance = current_Shortest_Distance

                new_Shortest_Path = current_Shortest_Path

            """ Return the updated node information. """
            return (node_ID, (new_Shortest_distance, neighbor_Node, new_Shortest_Path))

        """ We then use this update_node_info function with the map operation on our RDD to update all nodes. """
        node_graph_RDD_Five = node_graph_RDD_Five.map(update_Shortest_Path_Node_Info)

    """ Initialize an empty result list """
    dijkstra_Algorithm_Results_List = []
    
    """  We iterate through the sorted keys of shortest_Distance """
    for node_ID in sorted(shortest_Distance.keys()):

        """ Create a tuple for each node with its shortest distance and path, and append it to the result list. """
        dijkstra_Algorithm_Results_List.append((node_ID, (shortest_Distance[node_ID], shortest_Path[node_ID])))

    return dijkstra_Algorithm_Results_List, number_of_Iterations


""" This line runs the Dijkstra's algorithm on the input graph RDD 
 
and returns the shortest paths and the number of iterations it took to complete."""
shortest_Paths_List, num_of_Shortest_Path_Iterations = dijkstra_Algorithm(node_graph_RDD_Five)

print(f"Number of iterations to complete the shortest path computation for 5-node-graph.txt: {num_of_Shortest_Path_Iterations}")

print("\nFinal output (node_ID, (shortest path distance, path traversal)), sorted by ascending node_ID:\n")

""" The for loop iterates through the shortest_paths result:

This unpacks each result into node ID, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in shortest_Paths_List:

    """ This prints the formatted output for each node including, node ID, 
     
    shortest distance to that node from the start node and path to reach that node, 
    
    formatted as a string with arrows (→) between node numbers  """
    print(f"(Node {node_ID}: (Shortest Path Distance: {shortest_Path_Distance}, Path Traversal: {'→'.join(map(str, shortest_Path))}))")





Question 3(d) (4 marks)

Number of iterations to complete the shortest path computation for 5-node-graph.txt: 4

Final output (node_ID, (shortest path distance, path traversal)), sorted by ascending node_ID:

(Node 1: (Shortest Path Distance: 0, Path Traversal: 1))
(Node 2: (Shortest Path Distance: 8, Path Traversal: 1→3→2))
(Node 3: (Shortest Path Distance: 5, Path Traversal: 1→3))
(Node 4: (Shortest Path Distance: 9, Path Traversal: 1→3→2→4))
(Node 5: (Shortest Path Distance: 7, Path Traversal: 1→3→5))


**Question 3(e) (9 marks)**

Perform the following tasks and show the results in each step:

+ Find the Top Three (3) furthermost node and its path & distance. 
  
  Sort the result by descending distance.

+ Find the destination node(s) that have the most number of traversal hops in the path.
  
  Show the detailed output path and distance.

+ Find the set of node(s) that are not reachable from source node (node_ID=1). 
  
  Sort the result by ascending node_ID.

In [12]:
""" Question 3(e) (9 marks) """
print("\nQuestion 3(e) (9 marks)\n")

""" This line runs the Dijkstra's algorithm on the input graph RDD 
 
and returns the shortest paths and the number of iterations it took to complete."""
shortest_Paths_List, num_of_Shortest_Path_Iterations = dijkstra_Algorithm(node_graph_RDD_Five)


""" We define a new function get_node_distance that takes a node tuple and returns the distance value
         
which is the first element (index 0) of the second element (index 1) of the tuple. """
def get_Candidate_Reachable_Node_Distance(candidate_Node_Tuple):
    return candidate_Node_Tuple[1][0]


""" Sort the paths by distance in descending order

We pass this function as the key argument to sorted. 

This tells the sorting function to use the distance value when comparing paths.

The reverse=True argument ensures that the paths are sorted in descending order of distance. """
sorted_shortest_Paths_List = sorted(shortest_Paths_List, key=get_Candidate_Reachable_Node_Distance, reverse=True)

""" Get the top 3 furthermost nodes """
top_3_Furthermost_Shortest_Paths_Nodes = sorted_shortest_Paths_List[:3]

print("Top Three (3) furthermost nodes, their paths & distances (sorted by descending distance):\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in top_3_Furthermost_Shortest_Paths_Nodes:
    print(f"Node {node_ID}: (Distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))})")




""" This line runs the Dijkstra's algorithm on the input graph RDD 
 
and returns the shortest paths and the number of iterations it took to complete."""
shortest_Paths_List, num_of_Shortest_Path_Iterations = dijkstra_Algorithm(node_graph_RDD_Five)


""" We define a new function that takes a path tuple and returns the length of the path. 

The function unpacks the tuple to access the path, then returns its length. """
def get_Shortest_Paths_Length(path_tuple):
    _, (_, path) = path_tuple

    return len(path)


""" Find the path with the most hops 

We use the py_max() function to apply get_path_length to each item in shortest_Paths_List.

The py_max() function is then used to find the maximum length among all paths.

Using py_max() instead of max() to find the maximum number of hops. """
maximum_Num_of_Shortest_Paths_Hops = py_max(map(get_Shortest_Paths_Length, shortest_Paths_List))


""" We initialize an empty list nodes_with_max_hops to store the results. """
shortest_Paths_Nodes_With_Maximum_Hops_List = []


""" Find all nodes with paths equal to max_hops 

We iterate through each item in shortest_Paths_List, unpacking it into node, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in shortest_Paths_List:
    
    """ For each item, we check if the length of the path is equal to maximum_Num_of_Shortest_Paths_Hops. """
    if len(shortest_Path) == maximum_Num_of_Shortest_Paths_Hops:
        """ If the condition is true, we append a tuple containing the node, distance, 
        
        and path to nodes_with_max_hops. """
        shortest_Paths_Nodes_With_Maximum_Hops_List.append((node_ID, (shortest_Path_Distance, shortest_Path)))


print("\nDestination node(s) with the most number of traversal hops in the path:\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in shortest_Paths_Nodes_With_Maximum_Hops_List:
    print(f"Node {node_ID}: (Distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))}, Hops: {len(shortest_Path) - 1})")




""" This line runs the Dijkstra's algorithm on the input graph RDD 
 
and returns the shortest paths and the number of iterations it took to complete."""
shortest_Paths_List, num_of_Shortest_Path_Iterations = dijkstra_Algorithm(node_graph_RDD_Five)

""" Get all node IDs from the graph

It gets all node IDs from the graph using graph_rdd.keys().collect(). """
get_Set_of_Node_ID_From_All_Graph_Nodes = set(node_graph_RDD_Five.keys().collect())


""" We initialize an empty set get_Set_of_Reachable_Graph_Nodes to store the reachable nodes. """
get_Set_of_Reachable_Graph_Nodes_Set = set()

""" Get the set of reachable nodes 

It creates a set of reachable nodes by checking which nodes have a finite distance in shortest_paths. 

We iterate through each item in shortest_Paths_List, unpacking it into node, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in shortest_Paths_List:

    """ For each item, we check if the distance is not equal to infinity (float('inf')). """
    if shortest_Path_Distance != float('inf'):
        """ If the condition is true, we add the node to the get_Set_of_Reachable_Graph_Nodes set. """
        get_Set_of_Reachable_Graph_Nodes_Set.add(node_ID)


""" Find unreachable nodes

It finds unreachable nodes by subtracting the set of reachable nodes from the set of all nodes. """
unreachable_Graph_Nodes = get_Set_of_Node_ID_From_All_Graph_Nodes - get_Set_of_Reachable_Graph_Nodes_Set


""" Sort unreachable nodes by ascending node_ID 

It sorts the unreachable nodes in ascending order. """
sorted_by_Ascending_Unreachable_Graph_Nodes_List = sorted(unreachable_Graph_Nodes)


print("\nNodes that are not reachable from source node (node_ID=1), sorted by ascending node_ID:\n")

""" It then checks if there are any unreachable nodes in sorted_by_Ascending_Unreachable_Graph_Nodes. 

If there are unreachable nodes: """
if sorted_by_Ascending_Unreachable_Graph_Nodes_List:

    """ It iterates through each node in sorted_by_Ascending_Unreachable_Graph_Nodes """
    for unreachable_Nodes_From_Source_Node in sorted_by_Ascending_Unreachable_Graph_Nodes_List:
        """ For each unreachable node, it prints "Node {node}" """
        print(f"Node {unreachable_Nodes_From_Source_Node}")

else:
    """ If there are no unreachable nodes: 
    
    It prints the statement """
    print("All nodes are reachable from the Node 1.")





Question 3(e) (9 marks)

Top Three (3) furthermost nodes, their paths & distances (sorted by descending distance):

Node 4: (Distance: 9, Path: 1→3→2→4)
Node 2: (Distance: 8, Path: 1→3→2)
Node 5: (Distance: 7, Path: 1→3→5)

Destination node(s) with the most number of traversal hops in the path:

Node 4: (Distance: 9, Path: 1→3→2→4, Hops: 3)

Nodes that are not reachable from source node (node_ID=1), sorted by ascending node_ID:

All nodes are reachable from the Node 1.


**Question 3(f) (8 marks)**

Repeat the above computation for the scenarios of: ✔️

“20-node-graph.txt”✔️ and “40-node-graph.txt” ✔️

And show the respective results as in Question 3(d) and Question 3(e).✔️


In [13]:
""" Question 3(f) (8 marks) """
print("\nQuestion 3(f) (8 marks)\n")


print("Computations and Reslts for 20-node-graph.txt:\n")

""" Question 3(a) Section: """

""" Read the 20-node-graph.txt file and store the content using Spark RDDs. """
twenty_node_graph_Spark_RDD = sc.textFile("/Users/shawnyang/Downloads/ICT337 ECA July 2024 Semester/ECA Datasets/20-node-graph.txt")

""" Uses map() to apply the parse_line() function to each line, creating the required RDD structure. """
node_graph_RDD_Twenty = twenty_node_graph_Spark_RDD.map(parse_line_from_node_graph_file)

print("20-node-graph.txt RDD Content with the structure (node_ID,(distance, list of neighbors with associated weight, path)):\n")

""" Show the 5-node-graph.txt input file RDD content

Prints the content of the RDD using collect() and a for loop. """
for node_graph_RDD_Twenty_Contents in node_graph_RDD_Twenty.collect():
    print(node_graph_RDD_Twenty_Contents)


""" Question 3(d) Section: """

# Run Dijkstra's algorithm
twenty_Node_Graph_RDD_Shortest_Paths_Lists, twenty_Node_Graph_RDD_num_of_Shortest_Path_Iterations = dijkstra_Algorithm(node_graph_RDD_Twenty)

print(f"\nNumber of iterations to complete the shortest path computation for 20-node-graph.txt: {twenty_Node_Graph_RDD_num_of_Shortest_Path_Iterations}")

print("\nFinal output (node_ID, (shortest path distance, path traversal)), sorted by ascending node_ID:\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in twenty_Node_Graph_RDD_Shortest_Paths_Lists:
    print(f"Node {node_ID}: (Shortest distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))})")


""" Question 3(e) Section: """

""" We define a new function get_node_distance that takes a node tuple and returns the distance value
         
which is the first element (index 0) of the second element (index 1) of the tuple. """
def get_Candidate_Reachable_Node_Distance(candidate_Node_Tuple):
    return candidate_Node_Tuple[1][0]


""" Sort the paths by distance in descending order

We pass this function as the key argument to sorted. 

This tells the sorting function to use the distance value when comparing paths.

The reverse=True argument ensures that the paths are sorted in descending order of distance. """
twenty_Sorted_Shortest_Paths_List  = sorted(twenty_Node_Graph_RDD_Shortest_Paths_Lists, key=get_Candidate_Reachable_Node_Distance, reverse=True)


""" Get the top 3 furthermost nodes """
twenty_Node_Top_3_Furthermost_Shortest_Paths_Nodes = twenty_Sorted_Shortest_Paths_List[:3]

print("\n20-node-graph.txt Top Three (3) furthermost nodes, their paths & distances (sorted by descending distance):\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in twenty_Node_Top_3_Furthermost_Shortest_Paths_Nodes:
    print(f"Node {node_ID}: (Distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))})")




""" Find the path with the most hops 

We use the py_max() function to apply get_path_length to each item in shortest_Paths_List.

The py_max() function is then used to find the maximum length among all paths.

Using py_max() instead of max() to find the maximum number of hops. """
maximum_Num_of_Shortest_Paths_Hops = py_max(map(get_Shortest_Paths_Length, twenty_Sorted_Shortest_Paths_List))


""" We initialize an empty list nodes_with_max_hops to store the results. """
twenty_Node_Shortest_Paths_With_Maximum_Hops_List = []


""" Find all nodes with paths equal to max_hops 

We iterate through each item in shortest_Paths_List, unpacking it into node, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in twenty_Sorted_Shortest_Paths_List:
    
    """ For each item, we check if the length of the path is equal to maximum_Num_of_Shortest_Paths_Hops. """
    if len(shortest_Path) == maximum_Num_of_Shortest_Paths_Hops:
        """ If the condition is true, we append a tuple containing the node, distance, 
        
        and path to nodes_with_max_hops. """
        twenty_Node_Shortest_Paths_With_Maximum_Hops_List.append((node_ID, (shortest_Path_Distance, shortest_Path)))


print("\n20-node-graph.txt Destination node(s) with the most number of traversal hops in the path:\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in twenty_Node_Shortest_Paths_With_Maximum_Hops_List:
    print(f"Node {node_ID}: (Distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))}, Hops: {len(shortest_Path) - 1})")




""" Get all node IDs from the graph

It gets all node IDs from the graph using graph_rdd.keys().collect(). """
get_Set_of_Node_ID_From_All_Twenty_Nodes = set(node_graph_RDD_Twenty.keys().collect())


""" We initialize an empty set get_Set_of_Reachable_Graph_Nodes to store the reachable nodes. """
get_Set_of_Reachable_Twenty_Nodes_Set = set()

""" Get the set of reachable nodes 

It creates a set of reachable nodes by checking which nodes have a finite distance in shortest_paths. 

We iterate through each item in shortest_Paths_List, unpacking it into node, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in twenty_Node_Graph_RDD_Shortest_Paths_Lists:

    """ For each item, we check if the distance is not equal to infinity (float('inf')). """
    if shortest_Path_Distance != float('inf'):
        """ If the condition is true, we add the node to the get_Set_of_Reachable_Graph_Nodes set. """
        get_Set_of_Reachable_Twenty_Nodes_Set.add(node_ID)


""" Find unreachable nodes

It finds unreachable nodes by subtracting the set of reachable nodes from the set of all nodes. """
twenty_Node_Unreachable_Nodes_List = sorted(set( get_Set_of_Node_ID_From_All_Twenty_Nodes - get_Set_of_Reachable_Twenty_Nodes_Set ))

""" Sort unreachable nodes by ascending node_ID 

It sorts the unreachable nodes in ascending order. """
sorted_by_Ascending_Twenty_Node_Unreachable_Nodes_List = sorted(twenty_Node_Unreachable_Nodes_List)

print("\nNodes that are not reachable from source node (node_ID=1), sorted by ascending node_ID:\n")

""" It then checks if there are any unreachable nodes in sorted_by_Ascending_Unreachable_Graph_Nodes. 

If there are unreachable nodes: """
if sorted_by_Ascending_Twenty_Node_Unreachable_Nodes_List:

    """ It iterates through each node in sorted_by_Ascending_Unreachable_Graph_Nodes """
    for unreachable_Twenty_Nodes_From_Source_Node in sorted_by_Ascending_Twenty_Node_Unreachable_Nodes_List:
        """ For each unreachable node, it prints "Node {node}" """
        print(f"Node {unreachable_Twenty_Nodes_From_Source_Node}")

else:
    """ If there are no unreachable nodes: 
    
    It prints the statement """
    print("All nodes are reachable from the Node 1.")




print("\n\n\nComputations and Reslts for 40-node-graph.txt:\n")

""" Question 3(a) Section: """

""" Read the 40-node-graph.txt file and store the content using Spark RDDs. """
fourty_node_graph_Spark_RDD = sc.textFile("/Users/shawnyang/Downloads/ICT337 ECA July 2024 Semester/ECA Datasets/40-node-graph.txt")

""" Uses map() to apply the parse_line() function to each line, creating the required RDD structure. """
node_graph_RDD_Fourty = fourty_node_graph_Spark_RDD.map(parse_line_from_node_graph_file)

print("40-node-graph.txt RDD Content with the structure (node_ID,(distance, list of neighbors with associated weight, path)):\n")

""" Show the 40-node-graph.txt input file RDD content

Prints the content of the RDD using collect() and a for loop. """
for item in node_graph_RDD_Fourty.collect():
    print(item)


""" Question 3(d) Section: """

# Run Dijkstra's algorithm
fourty_Node_Graph_RDD_Shortest_Paths_Lists, fourty_Node_Graph_RDD_num_of_Shortest_Path_Iterations = dijkstra_Algorithm(node_graph_RDD_Fourty)

print(f"\nNumber of iterations to complete the shortest path computation for 40-node-graph.txt: {fourty_Node_Graph_RDD_num_of_Shortest_Path_Iterations}")

print("\nFinal output (node_ID, (shortest path distance, path traversal)), sorted by ascending node_ID:\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in fourty_Node_Graph_RDD_Shortest_Paths_Lists:
    print(f"Node {node_ID}: (Shortest distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))})")


""" Question 3(e) Section: """

""" We define a new function get_node_distance that takes a node tuple and returns the distance value
         
which is the first element (index 0) of the second element (index 1) of the tuple. """
def get_Candidate_Reachable_Node_Distance(candidate_Node_Tuple):
    return candidate_Node_Tuple[1][0]


""" Sort the paths by distance in descending order

We pass this function as the key argument to sorted. 

This tells the sorting function to use the distance value when comparing paths.

The reverse=True argument ensures that the paths are sorted in descending order of distance. """
fourty_Sorted_Shortest_Paths_List  = sorted(fourty_Node_Graph_RDD_Shortest_Paths_Lists, key=get_Candidate_Reachable_Node_Distance, reverse=True)


""" Get the top 3 furthermost nodes """
fourty_Node_Top_3_Furthermost_Shortest_Paths_Nodes = fourty_Sorted_Shortest_Paths_List[:3]

print("\n40-node-graph.txt Top Three (3) furthermost nodes, their paths & distances (sorted by descending distance):\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in fourty_Node_Top_3_Furthermost_Shortest_Paths_Nodes:
    print(f"Node {node_ID}: (Distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))})")




""" Find the path with the most hops 

We use the py_max() function to apply get_path_length to each item in shortest_Paths_List.

The py_max() function is then used to find the maximum length among all paths.

Using py_max() instead of max() to find the maximum number of hops. """
maximum_Num_of_Shortest_Paths_Hops = py_max(map(get_Shortest_Paths_Length, fourty_Sorted_Shortest_Paths_List))


""" We initialize an empty list nodes_with_max_hops to store the results. """
fourty_Node_Shortest_Paths_With_Maximum_Hops_List = []


""" Find all nodes with paths equal to max_hops 

We iterate through each item in shortest_Paths_List, unpacking it into node, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in fourty_Sorted_Shortest_Paths_List:
    
    """ For each item, we check if the length of the path is equal to maximum_Num_of_Shortest_Paths_Hops. """
    if len(shortest_Path) == maximum_Num_of_Shortest_Paths_Hops:
        """ If the condition is true, we append a tuple containing the node, distance, 
        
        and path to nodes_with_max_hops. """
        fourty_Node_Shortest_Paths_With_Maximum_Hops_List.append((node_ID, (shortest_Path_Distance, shortest_Path)))


print("\n40-node-graph.txt Destination node(s) with the most number of traversal hops in the path:\n")

for node_ID, (shortest_Path_Distance, shortest_Path) in fourty_Node_Shortest_Paths_With_Maximum_Hops_List:
    print(f"Node {node_ID}: (Distance: {shortest_Path_Distance}, Path: {'→'.join(map(str, shortest_Path))}, Hops: {len(shortest_Path) - 1})")




""" Get all node IDs from the graph

It gets all node IDs from the graph using graph_rdd.keys().collect(). """
get_Set_of_Node_ID_From_All_Fourty_Nodes = set(node_graph_RDD_Fourty.keys().collect())


""" We initialize an empty set get_Set_of_Reachable_Graph_Nodes to store the reachable nodes. """
get_Set_of_Reachable_Fourty_Nodes_Set = set()

""" Get the set of reachable nodes 

It creates a set of reachable nodes by checking which nodes have a finite distance in shortest_paths. 

We iterate through each item in shortest_Paths_List, unpacking it into node, distance, and path. """
for node_ID, (shortest_Path_Distance, shortest_Path) in fourty_Node_Graph_RDD_Shortest_Paths_Lists:

    """ For each item, we check if the distance is not equal to infinity (float('inf')). """
    if shortest_Path_Distance != float('inf'):
        """ If the condition is true, we add the node to the get_Set_of_Reachable_Graph_Nodes set. """
        get_Set_of_Reachable_Fourty_Nodes_Set.add(node_ID)


""" Find unreachable nodes

It finds unreachable nodes by subtracting the set of reachable nodes from the set of all nodes. """
fourty_Node_Unreachable_Nodes_List = sorted(set( get_Set_of_Node_ID_From_All_Fourty_Nodes - get_Set_of_Reachable_Fourty_Nodes_Set ))

""" Sort unreachable nodes by ascending node_ID 

It sorts the unreachable nodes in ascending order. """
sorted_by_Ascending_Fourty_Node_Unreachable_Nodes_List = sorted(fourty_Node_Unreachable_Nodes_List)

print("\nNodes that are not reachable from source node (node_ID=1), sorted by ascending node_ID:\n")

""" It then checks if there are any unreachable nodes in sorted_by_Ascending_Unreachable_Graph_Nodes. 

If there are unreachable nodes: """
if sorted_by_Ascending_Fourty_Node_Unreachable_Nodes_List:

    """ It iterates through each node in sorted_by_Ascending_Unreachable_Graph_Nodes """
    for unreachable_Fourty_Nodes_From_Source_Node in sorted_by_Ascending_Fourty_Node_Unreachable_Nodes_List:
        """ For each unreachable node, it prints "Node {node}" """
        print(f"Node {unreachable_Fourty_Nodes_From_Source_Node}")

else:
    """ If there are no unreachable nodes: 
    
    It prints the statement """
    print("All nodes are reachable from the Node 1.")






Question 3(f) (8 marks)

Computations and Reslts for 20-node-graph.txt:

20-node-graph.txt RDD Content with the structure (node_ID,(distance, list of neighbors with associated weight, path)):

(1, (0, [(14, 18), (11, 15), (10, 13)], [1]))
(2, (10000, [(10, 6)], []))
(3, (10000, [(9, 7)], []))
(4, (10000, [(10, 20), (9, 3)], []))
(5, (10000, [(3, 15), (16, 20), (8, 16), (13, 11)], []))
(6, (10000, [(19, 13)], []))
(7, (10000, [(11, 5), (18, 5)], []))
(8, (10000, [(7, 17), (9, 18)], []))
(9, (10000, [(18, 19), (3, 6)], []))
(10, (10000, [(11, 8), (14, 2)], []))
(11, (10000, [(14, 20), (3, 16), (7, 14)], []))
(12, (10000, [(1, 19), (9, 18), (2, 2)], []))
(13, (10000, [(14, 15), (5, 15)], []))
(14, (10000, [(15, 19)], []))
(15, (10000, [(9, 9)], []))
(16, (10000, [(4, 6), (13, 10)], []))
(17, (10000, [(18, 5), (15, 16)], []))
(18, (10000, [(1, 8), (10, 6), (17, 20), (19, 3)], []))
(19, (10000, [(3, 13)], []))
(20, (10000, [(12, 17), (7, 6), (1, 12)], []))

Number of iterations to complete

In [14]:
spark.stop()