## Part 5

Organize you code as per the below Unified Modelling Language (UML) diagram in Figure 2. Furthermore, consider the points listed below and discuss these points in a section labelled Part 4 in your report (where appropriate). 

* Instead of re-writing A* algorithm for this part, treat the class from UML as an “adapter”. 
* Discuss what design principles and patterns are being used in the diagram
* The UML is limited in the sense that graph nodes are represented by the integers. How would you alter the UML diagram to accommodate various needs such as nodes being represented Strings or carrying more information than their names.? Explain how you would change the design in Figure 2 to be robust to these potential changes.
* Discuss what other types of graphs we could have implement “Graph”. What other implementations exist?

This is just a test input graph to test with the ShortPathFinder class and the corresponding SP algorithms.

In [1]:
from abc import ABC, abstractmethod

In [2]:
class DirectedWeightedGraph:

    def __init__(self):
        self.adj = {}
        self.weights = {}

    def are_connected(self, node1, node2):
        for neighbour in self.adj[node1]:
            if neighbour == node2:
                return True
        return False

    def adjacent_nodes(self, node):
        return self.adj[node]

    def add_node(self, node):
        self.adj[node] = []

    def add_edge(self, node1, node2, weight):
        if node2 not in self.adj[node1]:
            self.adj[node1].append(node2)
        self.weights[(node1, node2)] = weight

    def w(self, node1, node2):
        if self.are_connected(node1, node2):
            return self.weights[(node1, node2)]

    def number_of_nodes(self):
        return len(self.adj)
    
    def get_nodes(self,):
        return list(self.adj.keys())

This is needed for the Dijkstra's shortest path algorithms.

In [3]:
class Item:
    def __init__(self, value, key):
        self.value = value
        self.key = key

class MinHeap:
    def __init__(self, elements):
        self.heap = elements
        self.positions = {element.value: i for i, element in enumerate(elements)}
        self.size = len(elements)
        self.build_heap()

    def parent(self, i):
        return (i - 1) // 2

    def left(self, i):
        return 2 * i + 1

    def right(self, i):
        return 2 * i + 2

    def swap(self, i, j):
        self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
        self.positions[self.heap[i].value], self.positions[self.heap[j].value] = i, j

    def min_heapify(self, i):
        l = self.left(i)
        r = self.right(i)
        smallest = i
        if l < self.size and self.heap[l].key < self.heap[i].key:
            smallest = l
        if r < self.size and self.heap[r].key < self.heap[smallest].key:
            smallest = r
        if smallest != i:
            self.swap(i, smallest)
            self.min_heapify(smallest)

    def build_heap(self):
        for i in range(self.size // 2, -1, -1):
            self.min_heapify(i)

    def extract_min(self):
        min_element = self.heap[0]
        self.size -= 1
        self.heap[0] = self.heap[self.size]
        self.positions[self.heap[0].value] = 0
        self.heap.pop()
        self.min_heapify(0)
        return min_element

    def decrease_key(self, value, new_key):
        i = self.positions[value]
        if new_key < self.heap[i].key:
            self.heap[i].key = new_key
            while i > 0 and self.heap[self.parent(i)].key > self.heap[i].key:
                self.swap(i, self.parent(i))
                i = self.parent(i)

    def insert(self, element):
        self.size += 1
        self.heap.append(element)
        self.positions[element.value] = self.size - 1
        self.decrease_key(element.value, element.key)

    def is_empty(self):
        return self.size == 0

UML diagram implementation starts here: 

In [15]:
class ShortPathFinder:

    def __init__(self):
        self.Graph = Graph()
        #self.Graph = DirectedWeightedGraph()
        self.SPAlgorithm = SPAlgorithm()

    def calc_short_path(self, source, dest):
        dist = self.SPAlgorithm(self.Graph, source, dest)
        total = 0
        for key in dist.keys():
            total += dist[key]
        return total

    @property
    def set_graph(self):
        self._Graph

    @set_graph.setter
    def set_graph(self, graph):
        self._Graph = graph

    @property
    def set_algorithm(self,):
        self._SPAlgorithm
    
    @set_algorithm.setter
    def set_algorithm(self, algorithm):
        self._SPAlgorithm = algorithm

In [16]:
class SPAlgorithm():

    @abstractmethod
    def calc_sp(self, g: DirectedWeightedGraph, source: int, k: int):
        pass

In [27]:
class Dijkstra(SPAlgorithm): 
    
    def calc_sp(self, g, source):
        paths = {source: [source]} 
        dist = {} 
        nodes = list(g.adj.keys())
        
        Q = MinHeap([])

        for node in nodes:
            Q.insert(Item(node, float("inf")))
            dist[node] = float("inf")
        
        Q.decrease_key(source, 0)
    
        while not Q.is_empty(): 
            current_element = Q.extract_min() 
            current_node = current_element.value
            dist[current_node] = current_element.key 
            for neighbour in g.adj[current_node]:
                if dist[current_node] + g.w(current_node, neighbour) < dist[neighbour]:
                    Q.decrease_key(neighbour, dist[current_node] + g.w(current_node, neighbour))
                    dist[neighbour] = dist[current_node] + g.w(current_node, neighbour)
                    paths[neighbour] = paths.get(current_node, []) + [neighbour]
                
        return dist

In [18]:
class Bellman_Ford(SPAlgorithm): 
    
    def calc_sp(self, g, source): 
        paths = {source: [source]} 
        dist = {} 
        nodes = list(g.adj.keys())

        for node in nodes: 
            dist[node] = float("inf")
        
        dist[source] = 0 
    
        for _ in range(g.number_of_nodes() - 1):
            for node in nodes: 
                for neighbour in g.adj[node]: 
                    if dist[neighbour] > dist[node] + g.w(node,neighbour):
                        dist[neighbour] = dist[node] + g.w(node,neighbour) 
                        paths[neighbour] = paths.get(node, []) + [neighbour]
                        
        for node in nodes:
            for neighbour in g.adj[node]:
                if dist[neighbour] > dist[node] + g.w(node, neighbour):
                    # negative cycle detected, return infinity
                    return float('inf'), relax_count, paths
                                        
        return dist

In [19]:
class AStar(SPAlgorithm): 
    def sort_h_cost(self, g, nodes_left, h, dist, source): 
        combined_costs = {} 
        sorted_nodes = [] 
        
        for node in dist.keys(): 
            if node in nodes_left: 
                combined_costs[node] = h(source, node, g) + dist[node] 
                
        while combined_costs: 
            min_cost = float("inf") 
            min_node = None
            
            for node, cost in combined_costs.items(): 
                if cost <= min_cost: 
                    min_cost = cost
                    min_node = node
                    
            sorted_nodes.append(min_node) 
            combined_costs.pop(min_node)
        
        return sorted_nodes 
    
    def astar(self, g, h, source, destination): 
        came_from = {}
        dist = {} 
        Q = MinHeap([])
        
        nodes = list(g.adj.keys()) 
        
        for node in nodes:
            Q.insert(Item(node, float("inf")))
            dist[node] = float("inf")
            
        Q.decrease_key(source, 0)
        
        while not Q.is_empty():
            current_element = Q.extract_min()
            current_node = current_element.value
            dist[current_node] = current_element.key
            if current_node == destination:
                break
            nodes_left = g.adj[current_node]
            nodes_left = self.sort_h_cost(g, nodes_left, h, dist, source)

            for neighbour in nodes_left:
                if dist[current_node] + g.w(current_node, neighbour) < dist[neighbour]:
                    Q.decrease_key(neighbour, dist[current_node] + g.w(current_node, neighbour))
                    dist[neighbour] = dist[current_node] + g.w(current_node, neighbour)
                    came_from[neighbour] = current_node
        path = []
        node = destination
        while node in came_from:
            path.append(node)
            node = came_from[node]
        path.append(source)
        path.reverse()
        
        return path 

In [20]:
class Adapter(SPAlgorithm): 
    
    def __init__(self):
        self.class_astar = AStar()
    
    def astar(self, g, h, source_node, dest_node):
        return self.class_astar.astar(g, h, source_node, dest_node) 

In [21]:
class Graph(ABC):

    def __init__(self):
        self.adj = {}
        self.weights = {}

    @abstractmethod
    def adjacent_nodes(self, node):
        pass

    @abstractmethod
    def add_node(self, node):
        pass

    @abstractmethod
    def add_edge(self, node1, node2, weight):
        pass

    @abstractmethod
    def number_of_nodes(self):
        pass
    
    @abstractmethod
    def w(self, node):
        pass

In [22]:
class WeightedGraph(Graph):

    def w(self, node1, node2):
        if self.are_connected(node1, node2):
            return self.weights[(node1, node2)]
        
class HeuristicGraph(WeightedGraph): 
    
    def __init__(self, heuristic): 
        self.heuristic = heuristic 
        
    @property
    def get_heuristic(self): 
        return self.heuristic 

Test cases: 

In [23]:
# Create an instance of DirectedWeightedGraph
g = DirectedWeightedGraph()

# Add nodes
g.add_node(0)
g.add_node(1)
g.add_node(2)
g.add_node(3)

# Add edges with weights
g.add_edge(0, 1, 4)
g.add_edge(0, 2, 3)
g.add_edge(1, 2, 1)
g.add_edge(1, 3, 2)
g.add_edge(2, 3, 5)

source_node = 0 
dest_node = 3 
relaxation_limit = 100 # Arbitrarily high value to get full shortest paths 

In [24]:
# heuristic for AStar
def heuristic(source, destination, graph):
    # check if the destination is directly connected to the source
    if graph.are_connected(source, destination):
        return 0  # heuristic cost of 1 if directly connected
    else:
        return 1  # heuristic cost of 0 otherwise

In [28]:
finder1 = Dijkstra()
finder2 = Bellman_Ford() 
finder3 = Adapter() 

# Calculate the shortest path
shortest_distances = finder1.calc_sp(g, source_node)

print(shortest_distances)

shortest_distances = finder2.calc_sp(g, source_node)

print(shortest_distances)

shortest_distances = finder3.astar(g, heuristic, source_node, dest_node)

print(shortest_distances)


{0: 0, 1: 4, 2: 3, 3: 6}
{0: 0, 1: 4, 2: 3, 3: 6}
[0, 1, 3]
