# Anytime heuristic search 

In this lab we will implement one on the seminal <i>anytime</i> heuristic search algorithms -- ARA*.

The idea of an anytime algorithm is ''to provide the best possible solution, given a limited computational (time) budget''. In our case, we will start with finding a bounded suboptimal solution with a high suboptimality bound and then will gradually decrease the bound and resume the search. The crux of the algorithm is that it does not re-start the search from scratch but rather re-uses the previuosly built search tree to (significantly) save the search effort. As we will see, a ''smart'' re-use of the search-tree notably outperforms the straightforward approach when one straightforwadly invokes one WA* search after the other from scratch, while decreasing the suboptimality bound.

For the details of ARA* please refer to the <a href=https://proceedings.neurips.cc/paper/2003/file/ee8fe9093fbbb687bef15a38facc44d2-Paper.pdf>original paper by Likhachev et al.</a>


As before we will consider a pathfinding problem on a 8-connected grid with the Octile distance as the heuristic function.

In [1]:
import copy
import math
from random import shuffle
from heapq import heappop, heappush, heapify
from random import randint
import time
from sys import float_info
import numpy as np

EPS = float_info.epsilon

## Grid Search Domain Representation

In [2]:
class Map:
    '''
    Square grid map class represents the environment for our moving agent
        - width -- the number of columns in the grid
        - height -- the number of rows in the grid
        - cells -- the binary matrix, that represents the grid. 0 - cell is traversable, 1 - cell is blocked
    '''

    def __init__(self):
        '''
        Default constructor
        '''

        self._width = 0
        self._height = 0
        self._cells = []
    

    def read_from_string(self, cell_str, width, height):
        '''
        Converting a string (with '#' representing obstacles and '.' representing free cells) to a grid

        Parameters
        ----------
        cell_str : str
            String which contains map data
        width : int
            Number of grid columns
        height : int
            Number of grid rows
        '''
        self._width = width
        self._height = height
        self._cells = [[0 for _ in range(width)] for _ in range(height)]
        cell_lines = cell_str.split("\n")
        i = 0
        j = 0
        for l in cell_lines:
            if len(l) != 0:
                j = 0
                for c in l:
                    if c == '.':
                        self._cells[i][j] = 0
                    elif c == '#' or c == 'T' or c == '@':
                        self._cells[i][j] = 1
                    else:
                        continue
                    j += 1
                if j != width:
                    raise Exception("Size Error. Map width = ", j, ", but must be", width )
                
                i += 1

        if i != height:
            raise Exception("Size Error. Map height = ", i, ", but must be", height )
    

    def set_grid_cells(self, width, height, grid_cells):
        '''
        Initialization of map by list of cells.

        Parameters
         ----------
        width : int
            Number of grid columns
        height : int
            Number of grid rows
        grid_cells : list[list[int]]
            Map matrix consisting of values of two types: 0 (traversable cells) and 1 (obstacles)
        '''

        self._width = width
        self._height = height
        self._cells = grid_cells


    def in_bounds(self, i, j):
        '''
        Check if the cell is on a grid.

        Parameters
        ----------
        i : int
            The number of the column in which the cell is located
        j : int
            The number of the row in which the cell is located

        Returns
        -------
        bool
            Is the cell inside map bounds
        '''
        return (0 <= j < self._width) and (0 <= i < self._height)
    
    def traversable(self, i, j):
        '''
        Check if the cell is not an obstacle.

        Parameters
        ----------
        i : int
            The number of the column in which the cell is located
        j : int
            The number of the row in which the cell is located

        Returns
        -------
        bool
            Is the cell traversable (true) or obstacle (false)
        '''

        return not self._cells[i][j]


    def get_neighbors(self, i, j):
        '''
        Returns a list of neighbouring cells as (i, j) tuples. 
        Fucntions should returns such neighbours, that allows both cardinal and diagonal moves, 
        but dissalows cutting corners and squezzing. 

        Parameters
        ----------
        i : int
            The number of the column in which the cell is located
        j : int
            The number of the row in which the cell is located

        Returns
        -------
        neighbours : list[tuple[int, int]]
            List of neighbours grid map (i, j) coordinates
        '''

        neighbors = []
        delta = [[0, 1], [1, 0], [0, -1], [-1, 0]]

        for d in delta:
            if self.in_bounds(i + d[0], j + d[1]) and self.traversable(i + d[0], j + d[1]):
                neighbors.append((i + d[0], j + d[1]))

        delta = [[1, 1], [-1, -1], [-1, 1], [1, -1]]
        for d in delta:
            if self.in_bounds(i + d[0], j + d[1]) and self.traversable(i + d[0], j + d[1])\
                and self.traversable(i + d[0], j) and self.traversable(i, j + d[1]):
                neighbors.append((i + d[0], j + d[1]))   

        return neighbors

    def get_size(self):
        '''
        Returns the size of the map in cells

        Returns
        -------
        tuple[int, int]
            Size of the map in cells (height, width)
        '''
        return (self._height, self._width)


In [3]:
def compute_cost(i1, j1, i2, j2):
    '''
    Computes cost of simple moves between cells on grid, 
    or euclidean distance, if there isnt a simple move between cells

    Parameters
    ----------
    i_1, j_1 : int, int
        Position of first cell on the grid map
    i_2, j_2 : int, int
        Position of second cell on the grid map

    Returns
    -------
    float
        Cost of the move between two neighbouring cells

    '''

    d = abs(i1 - i2) + abs(j1 - j2)
    if d == 1:  # cardinal move
        return 1
    elif d == 2:  # diagonal move
        return math.sqrt(2)
    else:
        raise Exception('Trying to compute the cost of non-supported move!')


In [4]:
class Node:
    '''
    Node class represents a search node

    - i, j: coordinates of corresponding grid element
    - g: g-value of the node
    - h: h-value of the node // always 0 for Dijkstra
    - f: f-value of the node // always equal to g-value for Dijkstra
    - parent: pointer to the parent-node 

    '''
    

    def __init__(self, i, j, g = 0, h = 0, f = None, parent = None):
        self.i = i
        self.j = j
        self.g = g
        self.h = h
        if f is None:
            self.f = self.g + h
        else:
            self.f = f        
        self.parent = parent

        
    
    def __eq__(self, other):
        '''
        Estimating where the two search nodes are the same,
        which is needed to detect dublicates in the search tree.
        '''
        return (self.i == other.i) and (self.j == other.j)
    
    def __hash__(self):
        '''
        To implement CLOSED as set of nodes we need Node to be hashable.
        '''
        ij = self.i, self.j
        return hash(ij)


    def __lt__(self, other): 
        '''
        Comparison between self and other. Returns is self < other (self has higher priority).
        '''
        return (self.f < other.f) or ((abs(self.f - other.f) < EPS ) and (self.h < other.h))


In [5]:
def diagonal_distance(i1, j1, i2, j2):
    '''
    Returns a diagonal distance between two cells

    Parameters
    ----------
    i1, j1 : int, int
        Position of first cell on the grid map
    i2, j2 : int, int
        Position of second cell on the grid map

    Returns
    -------
    float
        Diagonal distance between two cells

    '''
    return (abs(abs(i2 - i1) - abs(j2 - j1)) + math.sqrt(2) * (min(abs(i2 - i1), abs(j2 - j1))))


In [6]:
def make_path(goal):
    '''
    Creates a path by tracing parent pointers from the goal node to the start node
    It also returns path's length.
    '''

    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length


### Implementing the search tree (i.e. OPEN,  CLOSED, INCONS)

Efficient implementation of the search tree (OPEN and CLOSED) is crucial for any search algorithm. You should use your efficient implementation of the search tree that you've used in the previous labs. 

It is also necessary to implement several new methods of the search tree class, which are required for ARA*.
 

In [7]:
class SearchTreePQS:            #SearchTree which uses PriorityQueue for OPEN and set for CLOSED

    def __init__(self):
        self._open = []         # prioritized queue for the OPEN nodes
        self._closed= set()     # set for the expanded (at the current invocation of the main search loop) nodes = CLOSED
        self._incons = set()    # set of the inconsistnet nodes
        self._visited = dict()  # Dict of nodes (e.g. key = node, value = node) 
                                # this is needed to 'save' the search tree in between the search invocations
                                # this is also needed to obtain a g/f value of an arbitrary node in the search tree
                                      
    def __len__(self):
        return len(self._open) + len(self._closed) + len(self._incons)
    
    def open_is_empty(self):
        return len(self._open) == 0

    def add_to_open(self, item):
        heappush(self._open, item)   
    
    def was_expanded(self, item):
        return item in self._closed
    
    def get_best_node_from_open(self):
        best_node = heappop(self._open)
        while self.was_expanded(best_node) and not self.open_is_empty():
            best_node = heappop(self._open)
        
        return best_node

    def add_to_closed(self, item):
        self._closed.add(item)

    def add_to_incons(self, item):
        '''
        This is needed only for ARA*.
        If we generated a node that already was expanded BUT the generated node has a better g-value
        we need not to add this generated node to INCONS. This node will not be used to continue
        growing the search tree at the current iteration of the search main loop but will be used as the
        frontier node at the next iteration of the search main loop in ARA*.
        
        Parameters
        ----------
        item : Node
            Node to insert
        '''
        self._incons.add(item)


    def unite_update_open_incons(self, new_weight):
        '''
        This is needed only for ARA*.
        Unites OPEN and INCONS sets and updates f-values of nodes using new suboptimality bound.

        Parameters
        ----------
        new_weight : float
            New weigth for updating f-values of nodes
        '''
        for item in self.opened:
            item.f = item.g + new_weight * item.h
        
        for item in self._incons:
            item.f = item.g + new_weight * item.h
        
        self._open += self._incons
        
        heapify(self._open)


    def clear_closed(self):
        '''
        This is needed only for ARA*.
        Closed should be freshed before each termination of the search main loop.
        The 'closed part' of the search tree will still be saved in memory in between the search main loops
        in the VISITED container.
        '''   
        self._closed.clear()

    def clear_incons(self):
        '''
        This is needed only for ARA*.
        Incons should be freshed before each termination of the search main loop.
        ''' 
        self._incons.clear()
    
    def get_node(self, i, j):
        '''
        Checks if the node was generated (belongs to the search tree). If it is, then we return this node,
        more precisely -- the version of this node (as we might have dublicates) with the best g-value.
        If it is not, then a 'bulk' node with the 'infinite' g-value is returned.
        ''' 
        
        if (i, j) in self._visited:
            return self._visited[(i, j)]
        else:
            return Node(i, j, math.inf)

    @property
    def opened(self):
        return self._open
    
    @property
    def expanded(self):
        return self._closed


## Weighted A*

In [8]:
def weighted_astar(grid_map, start_i, start_j, goal_i, goal_j, weight, heuristic_func = None, search_tree = None):
    '''
    Runs weighted A* search algorithm without re-expansion.

    Parameters
    ----------
    grid_map : Map
        An additional domain information (such as grid map).
    start_i, start_j : int, int
        The start state of search in useful for your implementation form.
    goal_i, goal_j  : int, int
        The goal state of search in useful for your implementation form.
    weight : float
        The weight of heuristics in F-value computation.
    heuristic_func : function
        Heuristic function
    search_tree : type 
        Search tree data structure

    Returns
    -------
    path_found : bool
        Path was found or not.  
    last_node : Node
        The last node in path. None if path was not found.
    steps : int
        The number of search steps
    '''

    ast = search_tree()
    steps = 0

    start_node = Node(start_i, start_j, g=0.0, h=heuristic_func(start_i, start_j, goal_i, goal_j))
    ast.add_to_open(start_node)

    goal = (goal_i, goal_j)
    
    while not ast.open_is_empty():
        steps += 1
        best_node = ast.get_best_node_from_open()
        best = (best_node.i, best_node.j)
        ast.add_to_closed(best_node)
        
        if (best == goal):
            return True, best_node, steps
        
        for succ in grid_map.get_neighbors(*best):
            g =  best_node.g + compute_cost(*best, *succ)
            h = heuristic_func(*succ, *goal)
            f = g + weight * h
            new_node = Node(*succ, g, h, f, parent=best_node)
            if not ast.was_expanded(new_node):
                ast.add_to_open(new_node)           

    return False, None, steps



## Naive Anytime A*

Straighforward implementation of anytime A* which sequientially invocates WA* (from scratch, without saving the search tree in between) with the decreasing suboptimality bound.

The initial suboptimality bound and the increment are the input parameters of the algorithm (start_w, step_w, respectively).


In [9]:
def naive_anytime_astar(grid_map, start_i, start_j, goal_i, goal_j, start_w = 3.0, step_w = 0.5, heuristic_func = None, search_tree = None):
    '''
    Repeatedly runs weighted A* search algorithm without re-expansion on any domain, 
    decreasing current weight from start_w to 1.0 by step_w.

    Parameters
    ----------
    grid_map : Map
        An additional domain information (such as grid map).
    start_i, start_j : int, int
        The start state of search in useful for your implementation form.
    goal_i, goal_j  : int, int
        The goal state of search in useful for your implementation form.
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0, by default 3.0.
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm, by default 0.5.

    Yields
    -------
    path_found : bool
        Path was found or not.  
    last_node : Node
        The last node in path. None if path was not found.
    steps : int
        The number of search steps
    weight : float
        Weight used at iteration
    '''

    if start_w < 1:
        raise Exception("Weight must be greater or equal to 1")

    weight = float(start_w)
    steps = 0

    while True:
        path_found, last_node, iter_steps = weighted_astar(grid_map, start_i, start_j, goal_i, goal_j, weight, heuristic_func, search_tree)
        steps += iter_steps
        yield path_found, last_node, steps, weight
        
        if abs(weight - 1.0) < EPS:
            break

        weight = (weight - step_w) if (weight - step_w) >= 1.0 else 1.0


## Anytime Repairing A*

An involved variant of anytime A* which sequientially invocates WA* with the decreasing suboptimality bound while saving the search tree in between. Each new search is started with the specifically formed frontier, which consists of the nodes that were either OPEN or INCONS by the end of the previous iteration (CLOSED is cleared in between the main search loops).

The initial suboptimality bound and the increment are the input parameters of the algorithm (start_w, step_w, respectively).

Following the notation of the original paper on ARA* the search main loop is encapsulated in the function called improve_path, which is a specific adaptation of the WA* search main loop (with specific handling of INCONS nodes).

anytime_reparing_astar implements the high-level logic of the algorithm - it creates the search tree in the beginning, invokes improve_path, unites OPEN and INCONS (and cleares OPEN and CLOSED) after the termination of the improve_path, decreases the suboptimality bound etc.


In [10]:
def improve_path(goal_i, goal_j, grid_map, heuristic_func, search_tree, weight):

    steps = 0
    goal = (goal_i, goal_j)
    
    if goal in search_tree._visited.keys():
        result = True
        goal_node = search_tree._visited[goal]
    else:
        result = False
        goal_node = None

    while not search_tree.open_is_empty() and (not result or goal_node.g > search_tree._open[0].f):
        best = search_tree.get_best_node_from_open()
        best_cords = best.i, best.j
        
        search_tree.add_to_closed(best)
        
        if best_cords == goal:
            return True, best, steps
        
        for succ in grid_map.get_neighbors(*best_cords):
            g = best.g + compute_cost(*best_cords, *succ)
            heuristic = heuristic_func(*succ, *goal)
            f = g + weight * heuristic
            if succ not in search_tree._visited.keys():
                node = Node(*succ, g, heuristic, f, best)
                search_tree._visited[succ] = node
                search_tree.add_to_open(node)
            else:
                if g < search_tree._visited[succ].g:
                    node = Node(*succ, g, heuristic, f, best)
                    search_tree._visited[succ] = node
                    if search_tree.was_expanded(node):
                        search_tree.add_to_incons(node)
                    else:
                        search_tree.add_to_open(node)
        steps += 1         
    
    return result, goal_node, steps

def anytime_repairing_astar(grid_map, start_i, start_j, goal_i, goal_j, start_w = 3.0, step_w = 0.5, heuristic_func = None, search_tree = None):
    '''
    Runs Anytime Repairing A* search algorithm.

    Parameters
    ----------
    grid_map : Map
        An additional domain information (such as grid map).
    start_i, start_j : int, int
        The start state of search in useful for your implementation form.
    goal_i, goal_j  : int, int
        The goal state of search in useful for your implementation form.
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0, by default 3.0.
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm, by default 0.5.

    Yields
    -------
    path_found : bool
        Path was found or not.  
    last_node : Node
        The last node in path. None if path was not found.
    steps : int
        The number of search steps
    weight : float
        Weight used at iteration
    '''
    
    if start_w < 1:
        raise Exception("Weight must be greater or equal to 1")

    weight = float(start_w)
    steps = 0
    start_node = Node(start_i, start_j, g=0.0, h=heuristic_func(start_i, start_j, goal_i, goal_j))
    start_node.f = start_node.g + weight * start_node.h


    arast = search_tree()
    arast.add_to_open(start_node)

    while True:

        path_found, goal_node, iter_steps = improve_path(goal_i, goal_j, grid_map, heuristic_func, arast, weight)
        steps += iter_steps
        yield path_found, goal_node, steps, weight

        if abs(weight - 1.0) < EPS:
            break

        weight = (weight - step_w) if (weight - step_w) >= 1.0 else 1.0
 
        arast.unite_update_open_incons(weight)
        arast.clear_incons()
        arast.clear_closed()


# Experiment

In [11]:
def simple_test(search_generator, task, start_w, step_w, max_time, *args):
    '''
    simple_test runs search_generator on one task (use a number from 0 to 4 to 
    choose a certain debug task on simple map or None to choose a random task 
    from this pool) with *args as optional arguments and displays:
     - 'Path found!' and some statistics -- path was found
     - 'Path not found!' -- path was not found
     - 'Execution error' -- an error occurred while executing the SearchFunction In first two cases function also draws visualisation of the task

    Parameters
    ----------
    search_generator : generator
        Generator, which implements the method to run
    task : int
        Number of task from pregenerated set. Must be in the range from 0 to 4  
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm 
    max_time : float
        The amount of time given for the pathfinding procedure. Set in seconds.  

    Returns
    -------
    path_found : bool 
        Path was found or not.   
    length : Node 
        The length of the last found path.
    w : int 
        The sub-optimality value of the last found path.
    '''
    
    height = 15
    width = 30
    map_str = '''
. . . # # . . . . . . . . # # . . . # . . # # . . . . . . .  
. . . # # # # # . . # . . # # . . . . . . # # . . . . . . . 
. . . . . . . # . . # . . # # . . . # . . # # . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # # . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # # . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # # # # # . . . . 
. . . # # . . # . . # . . # # . . . # . . # # # # # . . . . 
. . . . . . . # . . # . . # # . . . # . . # . . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # . . . . . . . . 
. . . # # . . # . . # . . # # . . . # . . # . . . . . . . . 
. . . # # . . . . . # . . . . . . . # . . . . . . . . . . . 
. . . # # # # # # # # # # # # # . # # . # # # # # # # . # # 
. . . # # . . . . . . . . # # . . . . . . . . . . . . . . . 
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
. . . # # . . . . . . . . # # . . . . . . . . . . . . . . .
'''

    task_map = Map()
    task_map.read_from_string(map_str, width, height)
    starts = [(0, 1), (6, 2), (5, 6), (13, 7), (4, 23)]
    goals = [(1, 28), (2, 29), (3, 20), (3, 20), (0, 0)]

    if (task is None) or not (0 <= task < 5):
        task = randint(0, 4)

    start = Node(*starts[task])
    goal = Node(*goals[task])
    

    true_result = weighted_astar(task_map, start.i, start.j, goal.i, goal.j, 1, *args) 
    path = make_path(true_result[1])
    length = path[1]
    print("A* Path found! Length: {:.7f}".format(path[1]))

    start_time = time.time()
    last_result = False, None

    iter_count = 0
    timer = 0.0
    finale_timer = 0.0
    w_real = 0.0
    for result in search_generator(task_map, start.i, start.j, goal.i, goal.j, start_w, step_w, *args):
        iter_count += 1
        timer = time.time() - start_time
        
        if timer < max_time:
            last_result = result
            finale_timer = timer
            if last_result[0]:
                w_length = last_result[1].g
                w_real = w_length / length
                w = last_result[-1]
                print(iter_count, "iter (w = {:.3f}). Real sub-optimality value: {:.3f}".format(w, w_real))

                if abs(w_real - 1) < EPS:
                    break
            else:
                print(iter_count, "iter. Weighted path not found!")
        else:
            break
    
    if last_result[0]:
        w_length = last_result[1].g
        w = w_length / length
        print("Weighted path found! Final sub-optimality value: {:.7f}. Iterations: {:d}. Time: {:.7}".format(w, iter_count, finale_timer))
        return True, w_length, w
    else:
        print("Weighted path not found!")
        return False, None, None


In [12]:
def read_map_from_movingai_file(path):
    '''
    Reads map from file

    Parameters
    ----------
    path : str
        Path to file with map

    Returns
    -------
    tuple[int, int, list[list[int]]]
        Tuple, which contains information about map.
        - width -- number of grid columns
        - height -- number of grid rows
        - cells -- map matrix consisting of values of two types: 0 (traversable cells) and 1 (obstacles)
    '''
    
    map_file = open(path)
    map_file.readline()
    line = map_file.readline()
    height = int(line.split()[1])
    line = map_file.readline()
    width = int(line.split()[1])
    line = map_file.readline()


    cells = [[0 for _ in range(width)] for _ in range(height)]

    for i, l in enumerate(map_file):
        for j, c in enumerate(l):
            if j == width:
                break
            
            if c == '.':
                cells[i][j] = 0
            elif c == "#" or c == "@" or c == "T":
                cells[i][j] = 1
            else:
                continue

        if(i == height):
            break

    return (width, height, cells)


In [13]:
def read_tasks_from_movingai_file(path):
    '''
    Reads tasks from file

    Parameters
    ----------
    path : str
        Path to file with task

    Returns
    -------
    list[tuple[int, int, int, int, float]]
        List of tasks. Each element of list contains:
        - start_i, start_j -- coordinates of start cell
        - goal_i, goal_j -- coordinates of goal cell 
        - length -- length of shortest path
    '''
    
    tasks = []
    tasks_file = open(path)
    tasks_file.readline() 
    
    for task_line in tasks_file:
        task_data = task_line.split()
        start_j = int(task_data[4])
        start_i = int(task_data[5])
        goal_j = int(task_data[6])
        goal_i = int(task_data[7])
        length = float(task_data[8])

        tasks.append((start_i, start_j, goal_i, goal_j, length))
    
    return tasks 


In [14]:
def massive_test(search_generator, start_w, step_w, max_time, *args):
    '''
    massive_test runs search_generator on one task (use a number from 0 to 4 to 
    choose a certain debug task on simple map or None to choose a random task 
    from this pool) with *args as optional arguments and displays:
        - 'Path found!' and some statistics -- path was found
        - 'Path not found!' -- path was not found
        - 'Execution error' -- an error occurred while executing the SearchFunction In first two cases function also draws visualisation of the task

    Parameters
    ----------
    search_generator : generator
        Generator, which implements the method to run 
    start_w : float
        The initial weight of heuristics in F-value computation. Must be greater or equal to 1.0
    step_w : float
        The value by which the weight will be reduced, if it is provided by the algorithm 
    max_time : float
        The amount of time given for the pathfinding procedure. Set in seconds.  

    Returns
    -------
    results : dict[list[bool], list[bool], list[float], list[float], list[float]] 
        Statistics about conducted test represented in form of dictionary with next key-values:
        - found -- list containing the result of finding the path in corresponding tasks
        - correct -- list containing the results of checking the coincidence of found path length and the true value of the length in corresponding tasks
        - length -- list containing lengths of found paths in corresponding tasks
        - weight_real -- list containing the ratios of the found paths lengths to the values of the shortest paths lengths
        - weight_used -- list of the last used weights
        - iter -- list containing the number of iterations 
    '''

  



    results = dict()
    results["found"] = []
    results["correct"] = []
    results["length"] = []
    results["weight_real"] = []
    results["weight_used"] = []
    results["iter"] = []

    task_from = 850
    task_to = 900
    task_map = Map()
    directory = "./data"
    maps = ["32room_000.map"]
    
    for map_file in maps:
        map_file_path = directory + '/' + map_file
        width, height, cells = read_map_from_movingai_file(map_file_path)
        task_file_path = map_file_path + ".scen"
        tasks = read_tasks_from_movingai_file(task_file_path)
        task_map.set_grid_cells(width, height, cells)

        for task_n in range(task_from, task_to):
            start_i, start_j, goal_i, goal_j, true_length = tasks[task_n]
            start_time = time.time()
            last_result = False, None

            iter_count = 0
            timer = 0.0
            finale_timer = 0.0
            w_real = 0.0


            for result in search_generator(task_map, start_i, start_j, goal_i, goal_j, start_w, step_w, *args):
                iter_count += 1
                timer = time.time() - start_time
                 
                if timer < max_time:
                    last_result = result
                    finale_timer = timer
                    if last_result[0]:
                        length = last_result[1].g
                        w_real = length / true_length
                        w = last_result[-1]
                        if abs(w_real - 1.0) < EPS:
                            print("End 2")
                            break
                    else:
                        print(iter_count-1, "iter. Weighted path not found!")
                else:
                    iter_count-=1
                    break
            
            if last_result[0]:
                length = last_result[1].g
                w_real = length / true_length
                print("Weighted path found! Final sub-optimality value: {:.7f}. Iterations: {:d}. Time: {:.7}".format(w_real, iter_count, finale_timer))

                results["found"].append(True)
                results["correct"].append(abs(w_real - 1) < 0.00001)
                results["length"].append(length)
                results["weight_real"].append(w_real)
                results["weight_used"].append(last_result[-1])
                results["iter"].append(iter_count)
            else:
                print("Weighted path not found!")
                results["found"].append(False)
                results["correct"].append(False)
                results["length"].append(None)
                results["weight_real"].append(None)
                results["weight_used"].append(None)
                results["iter"].append(1)

    return results


The following two cells run Naive Anytime A* and ARA* on the same 'simple' tasks and report the statistics on how many search iterations were made and what was the resultant suboptimality bound of the reported solution.

By default we start with subotimality bound of 3, decrement it by 0.1 in between each iteration of the search and allocate 0.2 seconds to report a solution.


Indeed, ARA* should generally i) complete more search iterations compared to Naive Anytime A* within the given time limit, ii) end with a better solutions.

In [15]:
for i in range(5):
    simple_test(naive_anytime_astar, i, 3, 0.1, 0.02, diagonal_distance, SearchTreePQS)

A* Path found! Length: 47.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.351
2 iter (w = 2.900). Real sub-optimality value: 1.351
3 iter (w = 2.800). Real sub-optimality value: 1.351
4 iter (w = 2.700). Real sub-optimality value: 1.351
Weighted path found! Final sub-optimality value: 1.3513279. Iterations: 5. Time: 0.01699853
A* Path found! Length: 40.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.469
2 iter (w = 2.900). Real sub-optimality value: 1.469
3 iter (w = 2.800). Real sub-optimality value: 1.469
4 iter (w = 2.700). Real sub-optimality value: 1.469
Weighted path found! Final sub-optimality value: 1.4687484. Iterations: 5. Time: 0.01800466
A* Path found! Length: 38.2426407
1 iter (w = 3.000). Real sub-optimality value: 1.231
2 iter (w = 2.900). Real sub-optimality value: 1.231
3 iter (w = 2.800). Real sub-optimality value: 1.231
4 iter (w = 2.700). Real sub-optimality value: 1.231
5 iter (w = 2.600). Real sub-optimality value: 1.231
6 iter (w = 2.500). Real 

In [16]:
for i in range(5):
    simple_test(anytime_repairing_astar, i, 3, 0.1, 0.02, diagonal_distance, SearchTreePQS)

A* Path found! Length: 47.8994949
1 iter (w = 3.000). Real sub-optimality value: 1.351
2 iter (w = 2.900). Real sub-optimality value: 1.351
3 iter (w = 2.800). Real sub-optimality value: 1.351
4 iter (w = 2.700). Real sub-optimality value: 1.351
5 iter (w = 2.600). Real sub-optimality value: 1.351
6 iter (w = 2.500). Real sub-optimality value: 1.351
7 iter (w = 2.400). Real sub-optimality value: 1.351
8 iter (w = 2.300). Real sub-optimality value: 1.351
9 iter (w = 2.200). Real sub-optimality value: 1.351
10 iter (w = 2.100). Real sub-optimality value: 1.351
11 iter (w = 2.000). Real sub-optimality value: 1.351
12 iter (w = 1.900). Real sub-optimality value: 1.351
13 iter (w = 1.800). Real sub-optimality value: 1.351
14 iter (w = 1.700). Real sub-optimality value: 1.351
15 iter (w = 1.600). Real sub-optimality value: 1.017
16 iter (w = 1.500). Real sub-optimality value: 1.017
17 iter (w = 1.400). Real sub-optimality value: 1.017
18 iter (w = 1.300). Real sub-optimality value: 1.017
19 

The following cell runs a more involved test, i.e. the one that uses a larger map and more instances on this map (as defined in massive_test function).

One should start with subotimality bound of 10, decrement it by 0.1 in between each iteration of the search and allocate 1.0 seconds to report a solution.

NOTE: This test takes ~2 mins to run on my laptop (2022 Asus ExpertBook equipped with Intel Core i7).

As before ARA* should generally i) complete more search iterations compared to Naive Anytime A* within the given time limit, ii) end with a better solutions. This can be verified in the last cell of the notebook.

In [17]:
%%time
wa_res = massive_test(naive_anytime_astar, 10.0, 0.1, 1.0, diagonal_distance, SearchTreePQS)
ara_res = massive_test(anytime_repairing_astar, 10.0, 0.1, 1.0, diagonal_distance, SearchTreePQS)

Weighted path found! Final sub-optimality value: 1.2719784. Iterations: 9. Time: 0.9826684
Weighted path found! Final sub-optimality value: 1.4018281. Iterations: 17. Time: 0.9994636
Weighted path found! Final sub-optimality value: 1.1801959. Iterations: 16. Time: 0.9781866
Weighted path found! Final sub-optimality value: 1.2484793. Iterations: 4. Time: 0.8778191
Weighted path found! Final sub-optimality value: 1.3702344. Iterations: 12. Time: 0.9336751
Weighted path found! Final sub-optimality value: 1.2658099. Iterations: 19. Time: 0.9938374
Weighted path found! Final sub-optimality value: 1.3704302. Iterations: 13. Time: 0.9852173
Weighted path found! Final sub-optimality value: 1.2021082. Iterations: 14. Time: 0.995316
Weighted path found! Final sub-optimality value: 1.1946363. Iterations: 17. Time: 0.9900584
Weighted path found! Final sub-optimality value: 1.4346429. Iterations: 17. Time: 0.9715655
Weighted path found! Final sub-optimality value: 1.5132720. Iterations: 11. Time: 0

Weighted path found! Final sub-optimality value: 1.0116136. Iterations: 90. Time: 0.6506569
Weighted path found! Final sub-optimality value: 1.0207895. Iterations: 89. Time: 0.5965643
Weighted path found! Final sub-optimality value: 1.0069090. Iterations: 90. Time: 0.4954839
Weighted path found! Final sub-optimality value: 1.7480785. Iterations: 73. Time: 0.9254525
Weighted path found! Final sub-optimality value: 1.0438593. Iterations: 89. Time: 0.7191591
Weighted path found! Final sub-optimality value: 1.0078722. Iterations: 89. Time: 0.6931701
Weighted path found! Final sub-optimality value: 1.0231457. Iterations: 90. Time: 0.575794
Weighted path found! Final sub-optimality value: 1.5107935. Iterations: 74. Time: 0.9042168
Weighted path found! Final sub-optimality value: 1.1204340. Iterations: 87. Time: 0.8782115
Weighted path found! Final sub-optimality value: 1.1260255. Iterations: 87. Time: 0.9206822
CPU times: total: 1min 50s
Wall time: 1min 51s


In [18]:
wa_w_real = np.array(wa_res["weight_real"])
ara_w_real = np.array(ara_res["weight_real"])

wa_w_used = np.array(wa_res["weight_used"])
ara_w_used = np.array(ara_res["weight_used"])

wa_iter = np.array(wa_res["iter"])
ara_iter = np.array(ara_res["iter"])

print("Average real sub-optimality value")
print("WA*", wa_w_real.mean(), "ARA*", ara_w_real.mean())
print("Average last used sub-optimality value")
print("WA*", wa_w_used.mean(), "ARA*", ara_w_used.mean())
print("Average iteration count")
print("WA*", wa_iter.mean(), "ARA*", ara_iter.mean())

Average real sub-optimality value
WA* 1.3296581315854294 ARA* 1.067330394912409
Average last used sub-optimality value
WA* 8.736000000000004 ARA* 1.3120000000000163
Average iteration count
WA* 13.64 ARA* 88.02
