## Shortest Path using Dynamic Programming
## Jagadeesh Vasudevamurthy

In [1]:
import enum # For enum
import math  # for infinity

In [2]:
class Data:
    def __init__(self, n: "string"):
        self._name = n  ### _name is used as key for this object

    def __hash__(self):
        t = hash(self._name)
        return t

    def __eq__(self, other: "Node") -> "bool":
        if not isinstance(other, type(self)):
            assert False
        return self._name == other._name

    def __str__(self):
        return self._name

    def get_key(self) -> "string":
        return self._name

In [3]:
class GraphType(enum.Enum): 
    NONE = 0
    UNDIRECTED = 1
    DIRECTED = 2
    WEIGHTED_UNDIRECTED = 3
    WEIGHTED_DIRECTED  = 4

## graph

In [4]:
# Node class
class Node:
    def __init__(self, T):
        self._item = T
        self._fanins = {}
        self._fanouts = {}

    def get_name(self):
        return self._item

    def add_fan_out(self, e):
        self._fanouts[e.get_node()] = e

    def add_fan_in(self, e):
        self._fanins[e.get_node()] = e

    def fanout_of_a_node(self):
        return list(self._fanouts.keys())

    def fanin_of_a_node(self):
        return list(self._fanins.keys())

    def node_get_fanout_node_weight(self, other_node):
        return self._fanouts[other_node].get_weight()

# Edge class
class Edge:
    def __init__(self, n, weight):
        self._other = n
        self._weight = weight

    def get_node(self):
        return self._other

    def get_weight(self):
        return self._weight

# Graph class
class Graph:
    def __init__(self, graph_type):
        self._numE = 0
        self._type = graph_type
        self._dict = {}

    def is_directed_graph(self):
        return self._type == GraphType.DIRECTED or self._type == GraphType.WEIGHTED_DIRECTED

    def build_node_and_to_graph(self, name):
        if name not in self._dict:
            self._dict[name] = Node(name)
        return self._dict[name]

    def add_edge(self, fname, tname, w):
        f = self.build_node_and_to_graph(fname)
        t = self.build_node_and_to_graph(tname)
        edge = Edge(t, w)
        f.add_fan_out(edge)
        t.add_fan_in(edge)

    def list_of_nodes(self):
        return list(self._dict.values())

## DFS

In [5]:
class Timestamp:
    def __init__(self):
        self.in_time = 0
        self.out_time = 0
        
# Time Complexity: O(V + E)
# Space Complexity: O(V)


class GraphDfsUsingTimeStamp:
    def __init__(self, graph, graph_name, dfs_order, has_loop, work):
        self._g = graph
        self._graph_name = graph_name
        self._dfs_order = dfs_order
        self._has_loop = has_loop
        self._work = work  # Work is passed in and tracked externally
        self._timestamp_counter = 0
        self._visited = set()
        self._stack = set()  # To detect cycles
        self._timestamps = {}
        self._dfs()

    def _dfs(self):
        nodes = self._g.list_of_nodes()
        for node in nodes:
            if node not in self._visited:
                self._increment_work()  # Increment work when visiting a new node
                if self._dfs_recursive(node):
                    self._has_loop[0] = True
                    break
        self._dfs_order.reverse()

    def _dfs_recursive(self, node):
        self._visited.add(node)
        self._stack.add(node)
        self._timestamp_counter += 1
        self._timestamps[node] = Timestamp()
        self._timestamps[node].in_time = self._timestamp_counter

        self._increment_work()  # Increment work when processing a node's adjacency list
        for adjacent_node in node.fanout_of_a_node():
            self._increment_work()  # Increment work for each edge traversal
            if adjacent_node not in self._visited:
                if self._dfs_recursive(adjacent_node):
                    return True
            elif adjacent_node in self._stack:
                return True

        self._timestamp_counter += 1
        self._timestamps[node].out_time = self._timestamp_counter
        self._stack.remove(node)
        self._dfs_order.append(node)
        return False

    def _increment_work(self):
        self._work[0] += 1

## Dynamic programming

In [6]:
# Time Complexity: O(V + E)
# Space Complexity: O(V)

class ShortestPathDP:
    def __init__(self, graph, start, dfs_order, work):
        self._g = graph
        self.start = start
        self.dfs_order = dfs_order
        self.cost = {}
        self.path = {}
        self._work = work  # Work passed from outside
        self._initialize()
        self._compute_shortest_paths()

    def _initialize(self):
        for node in self._g.list_of_nodes():
            self.cost[node.get_name()] = math.inf
            self.path[node.get_name()] = None
        self.cost[self.start] = self.start  # Cost to reach the start node from itself is the node value

    def _compute_shortest_paths(self):
        for node in self.dfs_order:
            node_name = node.get_name()
            for adjacent_node in node.fanout_of_a_node():
                adjacent_name = adjacent_node.get_name()
                weight = node.node_get_fanout_node_weight(adjacent_node)
                self._increment_work()
                if self.cost[adjacent_name] > self.cost[node_name] + weight:
                    self._increment_work()
                    self.cost[adjacent_name] = self.cost[node_name] + weight
                    self.path[adjacent_name] = node_name

    def get_shortest_path(self, end):
        if end == self.start:
            return [self.start, self.start], self.cost[end]  # Self-loop path
        if self.cost[end] == math.inf:
            return [], math.inf
        path = []
        current = end
        while current is not None:
            path.append(current)
            current = self.path[current]
        return path[::-1], self.cost[end]

    def _increment_work(self):
        self._work[0] += 1


In [7]:
############################################################
# Exam.py
# Author: Jagadeesh Vasudevamurthy
# Copyright: Jagadeesh Vasudevamurthy 2024
###########################################################

########################################
#Nothing can be changed in class Exam
########################################
class Exam:
    def __init__(self, g: 'list of list of size 3', s: 'start', ans: 'empty list', work: 'list of size 1', show: 'Bool') -> 'None':
        self._g = g  # graph
        self._s = s  # start city
        self._ans = ans  # YOU NEED TO FILL
        self._work = work
        self._show = show

        # Extract nodes and keep track of unique nodes
        self.graph_type = self._determine_graph_type()
        self.graph = Graph(self.graph_type)  # Initialize the graph with the determined type
        self._build_graph()  # Build the graph based on the input list
        
        self._alg()  # Call the algorithm to perform DFS and find topological order

    def _determine_graph_type(self) -> GraphType:
        is_weighted = any(len(edge) > 1 and edge[1] != 1 for edges in self._g for edge in edges)
        for i, edges in enumerate(self._g):
            for edge in edges:
                if [i, edge[0], edge[1]] not in [[x[0], i, x[1]] for x in self._g[edge[0]]]:
                    return GraphType.WEIGHTED_DIRECTED if is_weighted else GraphType.DIRECTED
        return GraphType.WEIGHTED_UNDIRECTED if is_weighted else GraphType.UNDIRECTED

    def _build_graph(self):
        for i, edges in enumerate(self._g):
            from_node = self.graph.build_node_and_to_graph(i)
            for edge in edges:
                to_node = self.graph.build_node_and_to_graph(edge[0])
                self.graph.add_edge(from_node.get_name(), to_node.get_name(), edge[1])

    def _alg(self) -> None:
        has_loop = [False]
        dfs_order = []

        # Perform DFS using the GraphDfsUsingTimeStamp class
        dfs = GraphDfsUsingTimeStamp(
            self.graph,
            "DFS Graph",
            dfs_order,
            has_loop,
            self._work  # Pass work directly to the DFS class
        )

        if has_loop[0]:
            print("The graph has a loop.")
            self._ans.extend([] for _ in range(len(self._g)))  # Empty paths if there's a loop
        else:
            print("Topological Order:", [node.get_name() for node in dfs_order])
            self._perform_dynamic_programming(dfs_order)

    def _perform_dynamic_programming(self, dfs_order):
        # Use dynamic programming to find the shortest paths if no loop is detected
        dp_solver = ShortestPathDP(self.graph, self._s, dfs_order, self._work)  # Pass work directly to the DP class
        
        # Store the paths in ans
        for end_node in range(len(self._g)):
            path, cost = dp_solver.get_shortest_path(end_node)
            self._ans.append(path)
        
        if self._show:
            # Convert the Node objects to their names for display
            final_answer = [[node for node in path] for path in self._ans]
            print(f"No loop detected. Shortest paths: {final_answer}")
            print(f"Work done: {self._work[0]}")

##  CANNOT CHANGE ANYTHING BELOW

## TEST BENCH
## NOTHING CAN BE CHANED BELOW

In [8]:
############################################################
# ExamTest.py
# Author: Jagadeesh Vasudevamurthy
# Copyright: Jagadeesh Vasudevamurthy 2024
###########################################################

############################################################
#     NOTHING CAN BE CHANGED IN THIS FILE
###########################################################

############################################################
# All imports
###########################################################
#from Exam import *

############################################################
# give after exam
###########################################################


def assert_answer(g,s,ans,expected_cost,work):
    print("You may fail if I put assert code here")
  
class ExamTest():
    def __init__(self):
        self._no = 0 
        self._show = False
        self._marks = 0
        self._testBench()

    def _testBench(self):
        self._basic_tests()
        print("All basic tests passed")
        self._marks += 50
        print("You got",self._marks, "marks")
        print("I will see how you can pass my Hidden tests, which I will post after exam")
        self._hidden_tests()

    #Private function
    def _test1(self,g:'list of list of list of size 3', start:'startcity', expected_cost)->'None':
        if self._show:
          print("----------- Test case ",self._no, "----------------")
        self._no = self._no + 1
        ans = []; #must return list of size of nodes 
        work = [0] # work done by alg
        s = Exam(g,start,ans,work,self._show)
        assert_answer(g,start,ans,expected_cost,work[0])
        print(f"Final Answer: {ans}")
        
        
    
    def _hidden_tests(self):
        pass
        

    def _basic_tests(self):
        self._show = True
    
        # Test case 1: Small graph
        g1 = [ 
            [[3, 3.0], [1, 14.0], [2, 5.0]],  # Node 0 -> Node 3 (3.0), Node 1 (14.0), Node 2 (5.0)
            [[4, 7.0], [6, 6.0]],             # Node 1 -> Node 4 (7.0), Node 6 (6.0)
            [[4, 3.0], [5, 2.0]],             # Node 2 -> Node 4 (3.0), Node 5 (2.0)
            [[2, 11.0], [1, 6.0], [4, 7.0]],  # Node 3 -> Node 2 (11.0), Node 1 (6.0), Node 4 (7.0)
            [[6, 5.0]],                       # Node 4 -> Node 6 (5.0)
            [[6, 7.0]],                       # Node 5 -> Node 6 (7.0)
            [],                               # Node 6 has no outgoing edges                               # Node 7 is isolated, no connections
        ]
        c1 = []  # Placeholder for expected costs
        self._test1(g1, 0, c1)  # Running test case 1 (Start from Node 0)
    
        # Test case 2: Larger graph with a different structure
        g2 = [
            [[1, 2.0], [2, 4.0]],  # Node 0 -> Node 1 (2.0), Node 2 (4.0)
            [[2, 1.0], [3, 7.0]],  # Node 1 -> Node 2 (1.0), Node 3 (7.0)
            [[3, 3.0], [4, 5.0]],  # Node 2 -> Node 3 (3.0), Node 4 (5.0)
            [[4, 2.0]],            # Node 3 -> Node 4 (2.0)
            [],                    # Node 4 has no outgoing edges
        ]
        c2 = []  # Placeholder for expected costs
        self._test1(g2, 0, c2)  # Running test case 2 (Start from Node 0)
    
        # Test case 3: Graph with cycles
        g3 = [
            [[1, 1.0], [2, 4.0]],  # Node 0 -> Node 1 (1.0), Node 2 (4.0)
            [[0, 1.0], [2, 2.0]],  # Node 1 -> Node 0 (1.0), Node 2 (2.0)
            [[3, 3.0]],            # Node 2 -> Node 3 (3.0)
            [[1, 1.0], [4, 2.0]],  # Node 3 -> Node 1 (1.0), Node 4 (2.0)
            [],                    # Node 4 has no outgoing edges
        ]
        c3 = []  # Placeholder for expected costs
        self._test1(g3, 0, c3)  # Running test case 3 (Start from Node 0)
    
        # Test case 4: Disconnected graph
        g4 = [
            [[1, 5.0]],  # Node 0 -> Node 1 (5.0)
            [[2, 3.0]],  # Node 1 -> Node 2 (3.0)
            [],          # Node 2 has no outgoing edges
            [[4, 1.0]],  # Node 3 -> Node 4 (1.0)
            []           # Node 4 has no outgoing edges
        ]
        c4 = []  # Placeholder for expected costs
        self._test1(g4, 0, c4)  # Running test case 4 (Start from Node 0)
    
        # Test case 5: Single node graph
        g5 = [
            []  # Node 0 has no outgoing edges
        ]
        c5 = []  # Placeholder for expected costs
        self._test1(g5, 0, c5)  # Running test case 5 (Start from Node 0)
    
        # Test case 6: Dense graph with high weights
        g6 = [
            [[1, 100.0], [2, 200.0], [3, 300.0], [4, 400.0]],  # Node 0 connects to all other nodes
            [[2, 100.0], [3, 150.0], [4, 250.0]],              # Node 1 connects to 2, 3, 4
            [[3, 50.0], [4, 120.0]],                           # Node 2 connects to 3, 4
            [[4, 30.0]],                                       # Node 3 connects to 4
            []                                                 # Node 4 has no outgoing edges
        ]
        c6 = []  # Placeholder for expected costs
        self._test1(g6, 0, c6)  # Running test case 6 (Start from Node 0)
    
        # Test case 7: Large sparse graph (performance test)
        g7 = [[] for _ in range(1000)]  # Graph with 1000 nodes, initially no edges
        g7[0] = [[i, i * 10.0] for i in range(1, 1000, 100)]  # Sparse connections from Node 0
        g7[100] = [[200, 5.0], [300, 15.0]]  # Some sparse connections elsewhere
        c7 = []  # Placeholder for expected costs
        self._test1(g7, 0, c7)  # Running test case 7 (Start from Node 0



        


        
           

############################################################
# main 
# YOU CANNOT CHANGE ANYTHING BELOW
###########################################################
def main():
    print("Testing ExamTest.py Starts")
    s = ExamTest()
    print("All tests passed. You are a genius")
    print("Can you rate me on linkedLn: https://www.linkedin.com/in/jagadeesh-vasudevamurthy-6796591/")
    print("Testing ExamTest.py ENDS")


In [9]:
############################################################
# main
###########################################################
if (__name__    == '__main__'):
    main()

Testing ExamTest.py Starts
----------- Test case  0 ----------------
Topological Order: [0, 3, 1, 2, 5, 4, 6]
No loop detected. Shortest paths: [[0, 0], [0, 3, 1], [0, 2], [0, 3], [0, 2, 4], [0, 2, 5], [0, 2, 4, 6]]
Work done: 42
You may fail if I put assert code here
Final Answer: [[0, 0], [0, 3, 1], [0, 2], [0, 3], [0, 2, 4], [0, 2, 5], [0, 2, 4, 6]]
----------- Test case  1 ----------------
Topological Order: [0, 1, 2, 3, 4]
No loop detected. Shortest paths: [[0, 0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 4]]
Work done: 26
You may fail if I put assert code here
Final Answer: [[0, 0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 4]]
----------- Test case  2 ----------------
The graph has a loop.
You may fail if I put assert code here
Final Answer: [[], [], [], [], []]
----------- Test case  3 ----------------
Topological Order: [3, 4, 0, 1, 2]
No loop detected. Shortest paths: [[0, 0], [0, 1], [0, 1, 2], [], []]
Work done: 15
You may fail if I put assert code here
Final Answer: [[0