In [107]:
import numpy as np
from collections import defaultdict
import heapq
from pqdict import pqdict

In [108]:
vertices = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# graph
edges = defaultdict(dict)
edges[1][2] = 1
edges[1][6] = 1
edges[2][1] = 1
edges[2][5] = 1
edges[2][3] = 1
edges[3][2] = 1
edges[3][4] = 1
edges[4][3] = 1
edges[4][5] = 1
edges[5][2] = 1
edges[5][4] = 1
edges[5][6] = 1
edges[6][1] = 1
edges[6][5] = 1
edges[6][7] = 1
edges[7][6] = 1
edges[7][8] = 1
edges[8][7] = 1
edges[8][9] = 1
edges[9][8] = 1
edges[9][10] = 1
edges[10][9] = 1

# heuristic
heuristic = {}
heuristic[1] = 5
heuristic[2] = 4
heuristic[3] = 3
heuristic[4] = 2
heuristic[5] = 3
heuristic[6] = 4
heuristic[7] = 3
heuristic[8] = 2
heuristic[9] = 1
heuristic[10] = 0

In [109]:
class Node(object):
    def __init__(self, key):
        self.key = key
        self.parent = None
        self.f = float('inf')
        self.g = float('inf')
        self.h = 0.0
        self.is_open = False
        self.is_closed = False
        self.children = []

    def setParent(self, parent):
        self.parent = parent

    def setChildren(self, children):
        self.children = children

    def setG(self, g):
        self.g = g
        self.f = self.g + self.h

    def setHeuristic(self, h):
        self.h = h
        self.f = self.g + self.h

    def getHeuristic(self):
        return self.h

    def isOpen(self):
        return self.is_open

    def isClosed(self):
        return self.is_closed

    def getParent(self):
        return self.parent

    def getG(self):
        return self.g

    def getF(self):
        self.f = self.g + self.h
        return self.f

In [110]:
def tie_breaking_precedes(a, b):
    # a, b are (priority, key) tuples
    # Prefer smaller priority first; if equal, prefer smaller key
    return a[0] < b[0] or (a[0] == b[0] and a[1] < b[1])

class RTAA():
    def __init__(self, start, goal, heuristic, edges, vertices, step=4):
        self.start = start
        self.goal = goal
        self.heuristic = heuristic
        self.step = step
        self.edges = edges
        self.vertices = vertices

    def init_nodes(self, start):
        open_heap = pqdict(precedes=tie_breaking_precedes).minpq()
        nodes = {}
        for v in self.vertices:
            node = Node(v)
            node.setHeuristic(self.heuristic[v])
            node.setChildren(list(self.edges[v].keys()))
            if v == start:
                node.setG(0)
                node.is_open = True
                f = node.getF()
                open_heap[v] = (f, v)
            nodes[v] = node
        return open_heap, nodes

    def find_node(self, nodes, key):
        return nodes.get(key)

    def find_optimal(self, open_set):
        if not open_set:
            return None, float('inf')
        return min(open_set.items(), key=lambda item: (item[1], item[0]))

    def update_heuristic(self, closed_list, node_dict, f):
        for v in closed_list:
            node = node_dict[v]
            self.heuristic[v] = f - node.getG()
            #print(f"Node: {v}, f: {f}, g: {node.getG()}")

    def get_path(self, optimal_key, node_dict):
        path = []
        current_node = node_dict[optimal_key]

        while current_node is not None:
            path.append(current_node.key)
            current_node = current_node.getParent()
        return list(reversed(path))

    def a_star(self, start):
        open_heap, node_dict = self.init_nodes(start)
        closed_list = []

        # Reset the nodes for a fresh search
        for key, node in node_dict.items():
            if key != start:
                node.setG(float('inf'))
            node.parent = None
            node.is_open = False
            node.is_closed = False

        start_node = node_dict[start]
        start_node.setG(0)
        start_node.is_open = True

        expanded_count = 0

        while open_heap and expanded_count < self.step:
            # Get the node with the lowest f-score
            item = open_heap.popitem()
            current_key = item[0]
            current_f = item[1][0]

            current_node = node_dict[current_key]
            current_node.is_open = False
            current_node.is_closed = True
            closed_list.append(current_key)

            # Check if goal reached
            if current_key == self.goal:
                return open_heap, closed_list, node_dict, True

            # expand node
            for child_key, edge_cost in self.edges[current_key].items():
                child_node = node_dict[child_key]
                if child_node.is_closed:
                    continue

                tentative_g = current_node.getG() + edge_cost
                if tentative_g < child_node.getG():
                    child_node.setParent(current_node)
                    child_node.setG(tentative_g)

                    # update
                    f = child_node.getF()
                    open_heap[child_key] = (f, child_key)
                    child_node.is_open = True

            expanded_count += 1
        return open_heap, closed_list, node_dict, False


    def print_open_and_heuristic(self, open_set):
        keys = sorted(open_set.keys())
        open_keys_str = "OPEN:     [" + "  ".join(f"{k:2}" for k in keys) + "]"
        f_values_str = "OPEN f:   [" + "  ".join(f"{open_set[k]:2}" for k in keys) + "]"
        print(open_keys_str)
        print(f_values_str)

    def print_heuristic(self):
        keys = sorted(self.heuristic.keys())
        open_keys_str = "i:     [" + "  ".join(f"{k:2}" for k in keys) + "]"
        h_str = "hi:    [" + "  ".join(f"{self.heuristic[k]:2}" for k in keys) + "]"
        print(open_keys_str)
        print(h_str)

    def process_openheap(self, open_heap):
        open_set = {}
        for node, min_dist in open_heap.popitems():
            open_set[node] = min_dist[0]
        return open_set


    def run(self):
        path = [self.start]
        current = self.start
        iteration = 0

        while current != self.goal:
            print("================================================")
            print(f"Iteration {iteration + 1}: Current position = {current}")

            # expand by A* for a limited number of steps
            open_heap, closed_list, node_dict, goal_reached = self.a_star(current)
            open_set = self.process_openheap(open_heap)

            if goal_reached:
                # complete the path to the goal
                remaining_path = self.get_path(self.goal, node_dict)[1:]
                path.extend(remaining_path)
                print(f"Goal reached Final path: {path}")
                return path

            if not open_set:
                print("Failed to find a path. No nodes in open set.")
                return path

            # find the best next node to move to
            optimal_key, optimal_cost = self.find_optimal(open_set)
            if optimal_key is None:
                print("No optimal node found. Path finding failed.")
                return path

            # update heuristics for closed nodes
            self.update_heuristic(closed_list, node_dict, optimal_cost)
            print(f"CLOSED list: {closed_list}")
            self.print_open_and_heuristic(open_set)
            self.print_heuristic()

            # move to the next best node
            next_node = optimal_key
            next_path = self.get_path(next_node, node_dict)
            if len(next_path) > 1:
                move_segment = next_path[1:]
                path.extend(move_segment)
                current = next_node
                print(f"Moving to node {current}, path segment: {move_segment}")
            else:
                print("No valid move found. Path finding failed.")
                return path
            iteration += 1
        return path




In [111]:
print("Starting RTAA* algorithm")
rtaa = RTAA(start=1, goal=10, heuristic=heuristic, edges=edges, vertices=vertices)
path = rtaa.run()

Starting RTAA* algorithm
Iteration 1: Current position = 1
CLOSED list: [1, 2, 3, 4]
OPEN:     [ 5   6]
OPEN f:   [ 5   5]
i:     [ 1   2   3   4   5   6   7   8   9  10]
hi:    [ 5   4   3   2   3   4   3   2   1   0]
Moving to node 5, path segment: [2, 5]
Iteration 2: Current position = 5
CLOSED list: [5, 4, 2, 3]
OPEN:     [ 1   6]
OPEN f:   [ 7   5]
i:     [ 1   2   3   4   5   6   7   8   9  10]
hi:    [ 5   4   3   4   5   4   3   2   1   0]
Moving to node 6, path segment: [6]
Iteration 3: Current position = 6
CLOSED list: [6, 7, 8, 9]
OPEN:     [ 1   5  10]
OPEN f:   [ 6   6   4]
i:     [ 1   2   3   4   5   6   7   8   9  10]
hi:    [ 5   4   3   4   5   4   3   2   1   0]
Moving to node 10, path segment: [7, 8, 9, 10]
