In [None]:
import numpy as np
import pickle
from memory_profiler import memory_usage
import time
import heapq
import math
import tracemalloc
import time
import gc
import csv


In [None]:

# General Notes:
# - Update the provided file name (code_<RollNumber>.py) as per the instructions.
# - Do not change the function name, number of parameters or the sequence of parameters.
# - The expected output for each function is a path (list of node names)
# - Ensure that the returned path includes both the start node and the goal node, in the correct order.
# - If no valid path exists between the start and goal nodes, the function should return None.

#intializing global variables
global path_exists
path_exists = None


class Node:
    def __init__(self, state, h_score=0, parent=None, depth=0, path_cost=0, x=0, y=0, f_score=0):
        self.state = state
        self.parent = parent
        self.depth = depth
        self.path_cost = path_cost

        # Used only in case of A* algorithms
        self.x = x
        self.y = y
        self.h_score = h_score
        self.f_score = f_score

    def expand(self, adj_matrix, rev=False):
        # Expands a node and allocates memory for each child node
        children = []
        node = self.state
        for child, cost in enumerate(adj_matrix[node]):
            if cost != 0:
                children.append(Node(child, parent=self, depth=self.depth + 1, path_cost=self.path_cost + cost))
        if rev:
            children.reverse()
        return children

    # Used for A* algorithms
    def expand_heuristic(self, adj_matrix, node_attributes, start_node, goal_node):
        children = []
        node = self.state
        for child_state, cost in enumerate(adj_matrix[node]):
            if cost != 0:
                h_score = calculate_heuristic(node_attributes, start_node, goal_node, child_state)
                child = Node(
                    state=child_state,
                    h_score=h_score,
                    parent=self,
                    depth=self.depth + 1,
                    path_cost=self.path_cost + cost,
                    x=node_attributes[child_state]['x'],
                    y=node_attributes[child_state]['y'],
                    f_score=self.path_cost + h_score
                )
                children.append(child)

        return children

    def __lt__(self, other):
        # Used in the A* algorithm, for inserting the nodes in a priority queue based on heuristic
        return self.f_score < other.f_score



def find_path(node):
    path = []
    while node is not None:
        path.append(node.state)
        node = node.parent
    path.reverse()
    return path


def join_nodes(node_f, node_b):
    # Finds the path given the forward frontier and backward frontier
    path = []
    while node_f:
        path.append(node_f.state)
        node_f = node_f.parent
    path.reverse()

    # Collect the backward path
    node_b_path = []
    while node_b:
        node_b_path.append(node_b.state)
        node_b = node_b.parent

    # Combine paths
    path.extend(node_b_path)
    return path

# Algorithm: A* Search Algorithm

# Input:
#   - adj_matrix: Adjacency matrix representing the graph.
#   - node_attributes: Dictionary of node attributes containing x, y coordinates for heuristic calculations.
#   - start_node: The starting node in the graph.
#   - goal_node: The target node in the graph.

# Return:
#   - A list of node names representing the path from the start_node to the goal_node.
#   - If no path exists, the function should return None.

# Sample Test Cases:

#   Test Case 1:
#     - Start node: 1, Goal node: 2
#     - Return: [1, 7, 6, 2]

#   Test Case 2:
#     - Start node: 5, Goal node: 12
#     - Return: [5, 97, 28, 10, 12]

#   Test Case 3:
#     - Start node: 12, Goal node: 49
#     - Return: None

#   Test Case 4:
#     - Start node: 4, Goal node: 12
#     - Return: [4, 6, 27, 9, 8, 5, 97, 28, 10, 12]

def dist(node_attributes, node1, node2):
    node1_x = node_attributes[node1]['x']
    node1_y = node_attributes[node1]['y']
    node2_x = node_attributes[node2]['x']
    node2_y = node_attributes[node2]['y']
    return math.sqrt((node1_x - node2_x) ** 2 + (node1_y - node2_y) ** 2)

def calculate_heuristic(node_attributes, start_node, goal_node, node):
    return dist(node_attributes, start_node, node) + dist(node_attributes, node, goal_node)

def get_astar_search_path(adj_matrix, node_attributes, start_node, goal_node):
    start = Node(
        start_node,
        x=node_attributes[start_node]['x'],
        y=node_attributes[start_node]['y'],
        h_score=calculate_heuristic(node_attributes, start_node, goal_node, start_node)
    )
    goal = Node(
        goal_node,
        x=node_attributes[goal_node]['x'],
        y=node_attributes[goal_node]['y'],
        h_score=calculate_heuristic(node_attributes, start_node, goal_node, goal_node)
    )

    nodes_to_be_evaluated = []
    heapq.heappush(nodes_to_be_evaluated, start)

    nodes_expanded = set()

    while nodes_to_be_evaluated:
        node = heapq.heappop(nodes_to_be_evaluated)
        if node.state == goal.state:
            return find_path(node)
        nodes_expanded.add(node.state)

        for child in node.expand_heuristic(adj_matrix, node_attributes, start_node, goal_node):
            tentative_path_cost = node.path_cost + adj_matrix[node.state][child.state]

            # If child not yet expanded or a shorter path to child is found
            if child.state not in nodes_expanded or tentative_path_cost < child.path_cost:
                child.path_cost = tentative_path_cost
                child.f_score = child.path_cost + child.h_score
                heapq.heappush(nodes_to_be_evaluated, child)


    return None


# Algorithm: Bi-Directional Heuristic Search

# Input:
#   - adj_matrix: Adjacency matrix representing the graph.
#   - node_attributes: Dictionary of node attributes containing x, y coordinates for heuristic calculations.
#   - start_node: The starting node in the graph.
#   - goal_node: The target node in the graph.

# Return:
#   - A list of node names representing the path from the start_node to the goal_node.
#   - If no path exists, the function should return None.

# Sample Test Cases:

#   Test Case 1:
#     - Start node: 1, Goal node: 2
#     - Return: [1, 7, 6, 2]

#   Test Case 2:
#     - Start node: 5, Goal node: 12
#     - Return: [5, 97, 98, 12]

#   Test Case 3:
#     - Start node: 12, Goal node: 49
#     - Return: None

#   Test Case 4:
#     - Start node: 4, Goal node: 12
#     - Return: [4, 34, 33, 11, 32, 31, 3, 5, 97, 28, 10, 12]

def get_bidirectional_heuristic_search_path(adj_matrix, node_attributes, start_node, goal_node):

    start_f = Node(
        state=start_node,
        x=node_attributes[start_node]['x'],
        y=node_attributes[start_node]['y'],
        h_score=calculate_heuristic(node_attributes, start_node, goal_node, start_node)
    )
    goal_b = Node(
        state=goal_node,
        x=node_attributes[goal_node]['x'],
        y=node_attributes[goal_node]['y'],
        h_score=calculate_heuristic(node_attributes, start_node, goal_node, goal_node)
    )

    frontier_f = []
    frontier_b = []
    heapq.heappush(frontier_f, (start_f.f_score, start_f))
    heapq.heappush(frontier_b, (goal_b.f_score, goal_b))

    visited_f = {start_f.state: start_f}
    visited_b = {goal_b.state: goal_b}

    while frontier_f and frontier_b:
        # Forward search
        f_score, nf = heapq.heappop(frontier_f)
        if nf.state in visited_b:
            return join_nodes(nf, visited_b[nf.state])

        for child in nf.expand_heuristic(adj_matrix, node_attributes, start_node, goal_node):
            tentative_path_cost = nf.path_cost + adj_matrix[nf.state][child.state]
            if child.state not in visited_f or tentative_path_cost < child.path_cost:
                child.path_cost = tentative_path_cost
                child.f_score = child.path_cost + child.h_score
                visited_f[child.state] = child
                heapq.heappush(frontier_f, (child.f_score, child))

        # Backward search
        b_score, nb = heapq.heappop(frontier_b)
        if nb.state in visited_f:
            return join_nodes(visited_f[nb.state], nb)

        for child in nb.expand_heuristic(adj_matrix, node_attributes, start_node, goal_node):
            tentative_path_cost = nb.path_cost + adj_matrix[nb.state][child.state]
            if child.state not in visited_b or tentative_path_cost < child.path_cost:
                child.path_cost = tentative_path_cost
                child.f_score = child.path_cost + child.h_score
                visited_b[child.state] = child
                heapq.heappush(frontier_b, (child.f_score, child))

    return None





In [None]:
def measure_time_2(algorithm, adj_matrix, node_attributes, start_node, goal_node):

    start_time = time.time()  # Start time
    path = algorithm(adj_matrix, node_attributes, start_node, goal_node)
    end_time = time.time() # End time

    print(f"Algorithm: {algorithm.__name__}")
    print(f"Time taken: {end_time - start_time} seconds")

    return end_time - start_time



def measure_memory_2(algorithm, adj_matrix, node_attributes, start_node, goal_node):

    tracemalloc.start()  # Start tracing memory allocation
    path = algorithm(adj_matrix, node_attributes, start_node, goal_node)
    current_mem, peak_mem = tracemalloc.get_traced_memory()  # Get current memory usage
    tracemalloc.stop()  # Stop tracing memory

    current_mem_mb = current_mem / 10**6

    print(f"Algorithm: {algorithm.__name__}")
    print(f"Current memory usage: {current_mem_mb} MB")

    return current_mem_mb

In [None]:

adj_matrix = np.load('IIIT_Delhi.npy')
with open('IIIT_Delhi.pkl', 'rb') as f:
    node_attributes = pickle.load(f)

astar_time_total = 0
astar_memory_total = 0
bidirectional_time_total = 0
bidirectional_memory_total = 0

astar_times = []
astar_memories = []
bidirectional_times = []
bidirectional_memories = []

iteration = 1

for i in range(len(adj_matrix)):
    for j in range(len(adj_matrix)):
        print(f"Iteration: {iteration}")

        # Measure time and memory for A* Search
        astar_time = measure_time_2(get_astar_search_path, adj_matrix, node_attributes, i, j)
        astar_memory = measure_memory_2(get_astar_search_path, adj_matrix, node_attributes, i, j)
        astar_times.append(astar_time)
        astar_memories.append(astar_memory)

        # Measure time and memory for Bidirectional Heuristic Search
        bidirectional_time = measure_time_2(get_bidirectional_heuristic_search_path, adj_matrix, node_attributes, i, j)
        bidirectional_memory = measure_memory_2(get_bidirectional_heuristic_search_path, adj_matrix, node_attributes, i, j)
        bidirectional_times.append(bidirectional_time)
        bidirectional_memories.append(bidirectional_memory)

        # Update totals
        astar_time_total += astar_time
        astar_memory_total += astar_memory
        bidirectional_time_total += bidirectional_time
        bidirectional_memory_total += bidirectional_memory

        iteration += 1

with open('informed_algorithm_performance.csv', mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Iteration', 'A* Time (seconds)', 'A* Memory (MB)', 'Bidirectional Time (seconds)', 'Bidirectional Memory (MB)'])

    for iter_num in range(len(astar_times)):
        writer.writerow([
            iter_num + 1,
            astar_times[iter_num],
            astar_memories[iter_num],
            bidirectional_times[iter_num],
            bidirectional_memories[iter_num]
        ])

# Print results
print(f"A* Search total time: {astar_time_total:.6f} seconds")
print(f"A* Search total memory: {astar_memory_total:.6f} MB")
print(f"Bidirectional Heuristic Search total time: {bidirectional_time_total:.6f} seconds")
print(f"Bidirectional Heuristic Search total memory: {bidirectional_memory_total:.6f} MB")
