In [4]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:80% !important; }</style>"))

This repository contains an educational path search algorithm, just for studying. 

1. **Basic Path Search:** The purpose of basic path search is to find a possible (not necessarily the best) path between two points on a grid cell. If there is at least a path between two poins, this algorithm will find it anyway; however the approach is not efficient. In other words, it always works, but it is not the most efficient way. 
 
 
2. **A-star Search:** This approach is probably the most efficient way to fond the best path between two points. However, one drawback is that unlike the **basic search** (defined above), it may not be able to find the path in the first forward search and we may need to try different 

The pipeline consists of two main classes `RouteNode` and `RouteFinder`. 

- Class `RouteNode` encapsulates the posistion of the root node (`pos`), the grid cell indices of the neighbors `neighbor_indices`, and the costs associated with reaching from the root node to its neighbors. 

- Class `RouteFinder` contains the following variables and methods: 
 
  - **Variables**: 
    - `route_nodes`: a list of `RouteNode` objects 
    - `delta_names`: a working (temporary) matrix, with the same size as the `grid_cells`, containing the direction arrows (<, >, ^, V) 
    - `directions`: the same matrix as `delta_names`, but it contains the final direction arrows, extracted from the `delta_names` based on the search approach 
    - `count_table`: an integer matrix, with the same size as the `grid_cells`, containing the number of pipeline iteration while exploring each cell 
    - `search_counter`: a cumulative integer counter, keeping the cumulative number of pipeline iteration while exploring each cell 
        
  - **Main Functions**: 
    - `extract_path_indices`: to summarize and extract the final output out of the search method selected 
    - `search_basic`: the basic search algorithm to find a path between two points on the map. As long as a path exists between two cells, this fucntion finds an existing path anyway, but it is not the most efficient way. 
    - `search_a_star`: the A*-based search algorithm to find a path between two points on the map.


In [5]:
import numpy as np

In [6]:
class RouteNode:
    def __init__(self, pos, neighbor_indices, costs):
        """
        @pos: the node index
        @neighbor_indices: the indices of the accessible neighbors
        @costs: the costs of reaching from the current pos to the neighbor indices 
        """
        
        self.pos = pos
        self.neighbor_indices = neighbor_indices
        self.costs = costs


In [103]:
class RouteFinder:
    
    def __init__(self, grid_shape):
        self.route_nodes = []
        # self.directions_table = np.array(['apples', 'foobar', 'cowboy'], dtype=object)
        self.delta_names = np.full(grid_shape, fill_value="", dtype=object)
        self.directions = np.copy(self.delta_names)
        self.count_table = np.ones(grid_shape) * -1.0
        self.value_table = np.copy(self.count_table)
        self.search_counter = 0
    
    def reset(self, grid_shape):
        self.route_nodes = []
        # self.directions_table = np.array(['apples', 'foobar', 'cowboy'], dtype=object)
        self.delta_names = np.full(grid_shape, fill_value="", dtype=object)
        self.directions = np.copy(self.delta_names)
        self.count_table = np.ones(grid_shape) * -1.0
        # self.value_table = np.copy(self.count_table)
        self.search_counter = 0
    
    def find_neighbor_indices_basic(self, current_pos, grid_cells, modify_grid_cells):
        
        # neighbor_poses = []

        neighbor_indices = []
        costs = []

        max_size_x = grid_cells.shape[0] - 1
        max_size_y = grid_cells.shape[1] - 1

        pos = [min(current_pos[0], max_size_x), min(current_pos[1], max_size_y)]

        # up
        up_neighbor_indices = [max(pos[0] - 1, 0), pos[1]]
        # down
        down_neighbor_indices = [min(pos[0] + 1, max_size_x), pos[1]]
        # left
        left_neighbor_indices = [pos[0], max(pos[1] - 1, 0)]
        # right
        right_neighbor_indices = [pos[0], min(pos[1] + 1, max_size_y)]

        # index = 0
        # routs_count = 0

        if ( (pos != up_neighbor_indices) and 
             (grid_cells[up_neighbor_indices[0], up_neighbor_indices[1]] == 0)):
            
            if (modify_grid_cells):
                grid_cells[up_neighbor_indices[0], up_neighbor_indices[1]] = 1
                self.count_table[up_neighbor_indices[0], up_neighbor_indices[1]] = self.search_counter
                self.delta_names[up_neighbor_indices[0], up_neighbor_indices[1]] = "^"
                self.search_counter += 1
                
            neighbor_indices.append([up_neighbor_indices[0], up_neighbor_indices[1]])
            costs.append(1)
            # index += 1

            # if (grid_cells[up_neighbor_indices[0], up_neighbor_indices[1]] == 0):
            #    routs_count += 1
            
        if ( (up_neighbor_indices != down_neighbor_indices) and (pos != down_neighbor_indices) and 
             (grid_cells[down_neighbor_indices[0], down_neighbor_indices[1]] == 0)):

            if (modify_grid_cells):
                grid_cells[down_neighbor_indices[0], down_neighbor_indices[1]] = 1
                self.count_table[down_neighbor_indices[0], down_neighbor_indices[1]] = self.search_counter
                self.delta_names[down_neighbor_indices[0], down_neighbor_indices[1]] = "v"
                self.search_counter += 1
                
            neighbor_indices.append([down_neighbor_indices[0], down_neighbor_indices[1]])
            # delta_names.append("v")
            costs.append(1)
            # index += 1

            # if (grid_cells[down_neighbor_indices[0], down_neighbor_indices[1]] == 0):
            #    routs_count += 1

        if ( (pos != left_neighbor_indices) and 
             (grid_cells[left_neighbor_indices[0], left_neighbor_indices[1]] == 0)):

            if (modify_grid_cells):
                grid_cells[left_neighbor_indices[0], left_neighbor_indices[1]] = 1
                self.count_table[left_neighbor_indices[0], left_neighbor_indices[1]] = self.search_counter
                self.delta_names[left_neighbor_indices[0], left_neighbor_indices[1]] = "<"
                self.search_counter += 1
                
            neighbor_indices.append([left_neighbor_indices[0], left_neighbor_indices[1]])
            # delta_names.append("<")
            costs.append(1)
            # index += 1

            #if (grid_cells[left_neighbor_indices[0], left_neighbor_indices[1]] == 0):
            #    routs_count += 1

        if ( (left_neighbor_indices != right_neighbor_indices) and (pos != right_neighbor_indices) and 
             (grid_cells[right_neighbor_indices[0], right_neighbor_indices[1]] == 0)):

            if (modify_grid_cells):
                grid_cells[right_neighbor_indices[0], right_neighbor_indices[1]] = 1
                self.count_table[right_neighbor_indices[0], right_neighbor_indices[1]] = self.search_counter
                self.delta_names[right_neighbor_indices[0], right_neighbor_indices[1]] = ">"
                self.search_counter += 1
                
            neighbor_indices.append([right_neighbor_indices[0], right_neighbor_indices[1]])
            # delta_names.append(">")
            costs.append(1)
            # index += 1

            # if (grid_cells[right_neighbor_indices[0], right_neighbor_indices[1]] == 0):
            #    routs_count += 1
            
        return (neighbor_indices, costs)
    
    def find_neighbor_pos_astar(self, current_pos, grid_cells, heuristic_cells, modify_grid_cells):
    
        # neighbor_poses = []

        neighbor_indices = []
        neighbor_pos = []
        delta_names = [" "] * 4
        # costs = []
        cost = -1
        astar_costs = np.ones([4]) * 100

        max_size_x = grid_cells.shape[0] - 1
        max_size_y = grid_cells.shape[1] - 1
        
        pos = [min(current_pos[0], max_size_x), min(current_pos[1], max_size_y)]

        # up
        up_neighbor_indices = [max(pos[0] - 1, 0), pos[1]]
        # down
        down_neighbor_indices = [min(pos[0] + 1, max_size_x), pos[1]]
        # left
        left_neighbor_indices = [pos[0], max(pos[1] - 1, 0)]
        # right
        right_neighbor_indices = [pos[0], min(pos[1] + 1, max_size_y)]
        
        # print("up_neighbor_indices", up_neighbor_indices)
        # print("down_neighbor_indices", down_neighbor_indices)
        # print("left_neighbor_indices", left_neighbor_indices)
        # print("right_neighbor_indices", right_neighbor_indices)

        index = 0

        if ( (pos != up_neighbor_indices) and 
             (grid_cells[up_neighbor_indices[0], up_neighbor_indices[1]] == 0)):
            
            astar_costs[index] = heuristic_cells[up_neighbor_indices[0], up_neighbor_indices[1]] + self.search_counter
            delta_names[index] = "^"
            neighbor_indices.append([up_neighbor_indices[0], up_neighbor_indices[1]])
            index += 1

        if ( (up_neighbor_indices != down_neighbor_indices) and (pos != down_neighbor_indices) and 
             (grid_cells[down_neighbor_indices[0], down_neighbor_indices[1]] == 0)):

            astar_costs[index] = heuristic_cells[down_neighbor_indices[0], down_neighbor_indices[1]] + self.search_counter
            delta_names[index] = "v"
            neighbor_indices.append([down_neighbor_indices[0], down_neighbor_indices[1]])
            index += 1
            
            # print("down:", [down_neighbor_indices[0], down_neighbor_indices[1]])

        if ( (pos != left_neighbor_indices) and 
             (grid_cells[left_neighbor_indices[0], left_neighbor_indices[1]] == 0)):

            astar_costs[index] = heuristic_cells[left_neighbor_indices[0], left_neighbor_indices[1]] + self.search_counter
            delta_names[index] = "<"
            neighbor_indices.append([left_neighbor_indices[0], left_neighbor_indices[1]])
            index += 1
            
            # print("left:", [left_neighbor_indices[0], left_neighbor_indices[1]])

        if ( (left_neighbor_indices != right_neighbor_indices) and (pos != right_neighbor_indices) and 
             (grid_cells[right_neighbor_indices[0], right_neighbor_indices[1]] == 0)):

            astar_costs[index] = heuristic_cells[right_neighbor_indices[0], right_neighbor_indices[1]] + self.search_counter
            delta_names[index] = ">"
            neighbor_indices.append([right_neighbor_indices[0], right_neighbor_indices[1]])
            index += 1

            # print("right:", [right_neighbor_indices[0], right_neighbor_indices[1]])
            
        if (index > 0):
            sorted_indices = np.argsort(astar_costs)
            
            selected_pos = [neighbor_indices[sorted_indices[0]][0], neighbor_indices[sorted_indices[0]][1]]
            
            if (modify_grid_cells):
                grid_cells[selected_pos[0], selected_pos[1]] = 1
                self.count_table[selected_pos[0], selected_pos[1]] = self.search_counter
                self.delta_names[selected_pos[0], selected_pos[1]] = delta_names[sorted_indices[0]]
                self.search_counter += 1

            neighbor_pos = [selected_pos[0], selected_pos[1]]
            cost = 1.0
            
            # print("astar_costs:", astar_costs, "  -  selected_pos:", selected_pos, "  -  neighbor_pos:", neighbor_pos)
            
        return (neighbor_pos, cost)
    
    def extract_path_indices(self, init_pos, goal_pos):
        
        pointer_pos = goal_pos.copy()
        path_cell_indices = [pointer_pos]
        # path_delta_names = []
        total_cost = 0.0
        node_index = len(self.route_nodes) - 1
        
        while (pointer_pos != init_pos):
            if (node_index < 0):
                return ([], [], 0.0)
            
            node = self.route_nodes[node_index]
            
            if (pointer_pos in node.neighbor_indices):
                index = node.neighbor_indices.index(pointer_pos)
                path_cell_indices.append(node.pos)
                # path_delta_names.append(node.delta_names[index])
                total_cost += node.costs[index]
                pointer_pos = node.pos
                
                # print(node.pos)
            
            node_index -= 1
        
        path_cell_indices.reverse()
        # path_delta_names.reverse()
        
        return (path_cell_indices, total_cost)
    
    def extract_path_info(self, init_pos, goal_pos):
        
        pointer_pos = goal_pos.copy()
        # path_delta_names = []
        total_cost = 0.0
        node_index = len(self.route_nodes) - 1
        
        while (pointer_pos != init_pos):
            if (node_index < 0):
                return ([], [], 0.0)
            
            node = self.route_nodes[node_index]
            
            if (pointer_pos in node.neighbor_indices):
                index = node.neighbor_indices.index(pointer_pos)
                total_cost += node.costs[index]
                self.directions[pointer_pos[0], pointer_pos[1]] = self.delta_names[pointer_pos[0], pointer_pos[1]]
                pointer_pos = node.pos
            
            node_index -= 1
        
        return total_cost
    
    def search(self, search_type, grid_cells, heuristic_cells, init_pos, goal_pos, max_depth):

        if (search_type == "basic"):
            return self.search_basic(grid_cells, init_pos, goal_pos, max_depth)
        
        if (search_type == "a-star"):
            return self.search_a_star(grid_cells, heuristic_cells, init_pos, goal_pos, max_depth)
        
        return None
    
    def search_basic(self, grid_cells, init_pos, goal_pos, max_depth, cell_blockage_value=-1):
        
        if (init_pos == goal_pos):
            return [0.0, goal_pos[0], goal_pos[1]]
        
        if (grid_cells[init_pos[0], init_pos[1]] == 1):
            return [cell_blockage_value, goal_pos[0], goal_pos[1]]
        
        self.reset(grid_cells.shape)
        
        current_pos = init_pos

        branch_search_completed = False
        debug_counter = 0

        grid_cells[init_pos[0], init_pos[1]] = 1
        
        self.search_counter = 1
        self.count_table[init_pos[0], init_pos[1]] = 0

        (neighbor_indices, costs) = self.find_neighbor_indices_basic(current_pos, grid_cells, modify_grid_cells = True)
        # for debug
        # print("current_pos: ", current_pos, "  -  neighbor_indices: ", neighbor_indices)
        new_nodes = []
        current_node = RouteNode(current_pos, neighbor_indices, costs)
        new_nodes.append(current_node)
        self.route_nodes += new_nodes
        
        while (not branch_search_completed):
            # index = 0

            neighbor_nodes = []

            for node in new_nodes:

                for neighbor_pos in neighbor_indices:
                    if (neighbor_pos == goal_pos):
                        branch_search_completed = True

                        self.route_nodes += neighbor_nodes

                        # uncomment these two lines for a full path search result
                        # (path_cell_indices, path_deltas, total_cost) = RouteFinder.extract_path_indices(route_nodes, init_pos, goal_pos)
                        # return (path_cell_indices, path_deltas, total_cost)

                        # if (return_type == "cost"):
                        #    total_cost = RouteFinder.extract_path_cost(route_nodes, init_pos, goal_pos)
                        #    return [total_cost, int(goal_pos[0]), int(goal_pos[1])]

                        total_cost = self.extract_path_info(init_pos, goal_pos)

                        return [total_cost, int(goal_pos[0]), int(goal_pos[1])]
                    
                for node_pos in node.neighbor_indices:
                    (neighbor_indices, costs) = self.find_neighbor_indices_basic(node_pos, grid_cells, modify_grid_cells = True)
                    
                    # for debug
                    # print("=================================================")
                    # print("node_pos: ", node_pos, "  -  neighbor_indices: ", neighbor_indices)
                    
                    current_node = RouteNode(node_pos, neighbor_indices, costs)
                    neighbor_nodes.append(current_node)

            new_nodes = neighbor_nodes
            self.route_nodes += new_nodes
            
            if (debug_counter == max_depth):
                branch_search_completed = True
            
            debug_counter += 1
        
        return [cell_blockage_value, goal_pos[0], goal_pos[1]]
    
    def search_a_star(self, grid_cells, heuristic_cells, init_pos, goal_pos, max_depth):
        
        if (init_pos == goal_pos):
            return [0.0, goal_pos[0], goal_pos[1]]
        
        if (grid_cells[init_pos[0], init_pos[1]] == 1):
            return [-1.0, goal_pos[0], goal_pos[1]]
        
        self.reset(grid_cells.shape)
        
        current_pos = init_pos

        branch_search_completed = False
        debug_counter = 0

        grid_cells[init_pos[0], init_pos[1]] = 1
        
        self.search_counter = 1
        self.count_table[init_pos[0], init_pos[1]] = 0

        (neighbor_pos, cost) = self.find_neighbor_pos_astar(current_pos, grid_cells, heuristic_cells, modify_grid_cells = True)
        # print for debug
        # print("current_pos: ", current_pos, "  -  neighbor_pos: ", neighbor_pos)
        new_nodes = []
        current_node = RouteNode(current_pos, [neighbor_pos], [cost])
        new_nodes.append(current_node)
        self.route_nodes += new_nodes

        while (not branch_search_completed):
            # index = 0

            neighbor_nodes = []

            for node in new_nodes:

                if (neighbor_pos == goal_pos):
                    branch_search_completed = True

                    self.route_nodes += neighbor_nodes

                    total_cost = self.extract_path_info(init_pos, goal_pos)

                    return [total_cost, int(goal_pos[0]), int(goal_pos[1])]
                      
                # print for debug
                # print("=================================================")
                # print("node_pos: ", node.neighbor_indices[0])
                
                # for node_pos in node.neighbor_indices:
                (neighbor_pos, cost) = self.find_neighbor_pos_astar(node.neighbor_indices[0], grid_cells, heuristic_cells, modify_grid_cells = True)

                # print for debug
                # print("neighbor_pos: ", neighbor_pos)

                if (neighbor_pos == []):
                    branch_search_completed = True
                    break
                
                current_node = RouteNode(node.neighbor_indices[0], [neighbor_pos], [cost])
                neighbor_nodes.append(current_node)

            new_nodes = neighbor_nodes
            self.route_nodes += new_nodes
            
            if (debug_counter == max_depth):
                branch_search_completed = True
            
            debug_counter += 1
        
        return [-1.0, goal_pos[0], goal_pos[1]]
    
    def calculate_value_table(self, grid_cells, goal_pos, cell_blockage_value):
        
        self.value_table = np.ones(grid_cells.shape)
        
        """
        row_index = 4
        col_index = 3
        init_pos = [row_index, col_index]
        
        copy_grid_cells = np.copy(grid_cells)
        search_results = self.search_basic(copy_grid_cells, init_pos, goal_pos, max_depth = 100)
        print(init_pos, search_results)
        self.value_table[row_index, col_index] = search_results[0]
        
        return
        """
        
        for row_index in range(grid_cells.shape[0]):
            for col_index in range(grid_cells.shape[1]):
                copy_grid_cells = np.copy(grid_cells)
                init_pos = [row_index, col_index]
                
                search_results = self.search_basic(copy_grid_cells, init_pos, goal_pos, max_depth = 100, cell_blockage_value = cell_blockage_value)
                
                # print(init_pos, search_results)

                self.value_table[row_index, col_index] = search_results[0]
        

### A Few Examples 

In [104]:
# Grid format:
#   0 = Navigable space
#   1 = Occupied space

grid_cells_01 = np.asarray( 
        [[0, 0, 1, 0, 0, 0],
         [0, 0, 1, 0, 0, 0],
         [0, 0, 0, 0, 1, 0],
         [0, 0, 1, 1, 1, 0],
         [0, 0, 0, 0, 1, 0]] )

grid_cells_02 = np.asarray( 
        [[0, 0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0, 0],
        [0, 0, 1, 0, 1, 0],
        [0, 0, 1, 1, 1, 0],
        [0, 0, 1, 0, 1, 0]] )

grid_cells_03 = np.asanyarray(
    [[0, 1, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0],
     [0, 0, 0, 0, 1, 0]] )

grid_cells_04 = np.asarray(
    [[0, 0, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0],
     [0, 1, 0, 0, 0, 0],
     [0, 0, 0, 0, 1, 0]] )

grid_cells_05 = np.asarray(
    [[0, 0, 1, 0, 0, 0],
     [0, 0, 1, 0, 0, 0],
     [0, 0, 1, 0, 1, 0],
     [0, 0, 1, 1, 1, 0],
     [0, 0, 0, 0, 1, 0]] )


heuristic_cells = np.asarray(
    [[9, 8, 7, 6, 5, 4],
     [8, 7, 6, 5, 4, 3],
     [7, 6, 5, 4, 3, 2],
     [6, 5, 4, 3, 2, 1],
     [5, 4, 3, 2, 1, 0]] )

init_pos = [0, 0]
# init_pos = [20, 20]
# grid_bookings[init_pos[0], init_pos[1]] = 1

# cost = 1.0

delta = [[-1, 0], # go up
         [ 0,-1], # go left
         [ 1, 0], # go down
         [ 0, 1]] # go right

"""
cost_table = np.ones_like(original_grid_cells) * -1.0

for row_index in range(original_grid_cells.shape[0]):
    for col_index in range(original_grid_cells.shape[1]):
        grid_cells = np.copy(original_grid_cells)
        working_goal_pos = [row_index, col_index]
        search_results = RouteFinder.search(grid_cells, init_pos, working_goal_pos, cost, max_depth = 100)
        
        cost_table[search_results[1], search_results[2]] = search_results[0]
        
print(cost_table)
"""
grid_cells = np.copy(grid_cells_01)
original_goal_pos = [len(grid_cells)-1, len(grid_cells[0])-1]

route_finder = RouteFinder(grid_cells.shape)

print("========== Basic Search Start ==========")
print("grid Cell:\n", grid_cells)
search_results = route_finder.search("basic", grid_cells, heuristic_cells, init_pos, original_goal_pos, max_depth = 100)
print(search_results)
print("count_table basic:\n", route_finder.count_table)
print("directions basic:\n", route_finder.directions)
print("==================== End ====================")

print("")

grid_cells = np.copy(grid_cells_03)

print("========== A* Search Start ==========")
print("grid Cell:\n", grid_cells)
search_results = route_finder.search("a-star", grid_cells, heuristic_cells, init_pos, original_goal_pos, max_depth = 100)
print(search_results)
print("count_table a-star:\n", route_finder.count_table)
print("directions a-star:\n", route_finder.directions)
print("==================== End ====================")

print("")

grid_cells = np.copy(grid_cells_05)

print("========== value table calculation start ==========")
print("grid Cell:\n", grid_cells)
search_results = route_finder.calculate_value_table( grid_cells, original_goal_pos, cell_blockage_value = 100)
print("value_table:\n", route_finder.value_table)
print("==================== End ====================")


grid Cell:
 [[0 0 1 0 0 0]
 [0 0 1 0 0 0]
 [0 0 0 0 1 0]
 [0 0 1 1 1 0]
 [0 0 0 0 1 0]]
[11.0, 4, 5]
count_table basic:
 [[ 0.  2. -1. 15. 17. 19.]
 [ 1.  4. -1. 13. 16. 18.]
 [ 3.  6.  9. 11. -1. 20.]
 [ 5.  8. -1. -1. -1. 21.]
 [ 7. 10. 12. 14. -1. 22.]]
directions basic:
 [['' '' '' '' '' '']
 ['v' '' '' '^' '>' '>']
 ['v' '>' '>' '>' '' 'v']
 ['' '' '' '' '' 'v']
 ['' '' '' '' '' 'v']]

grid Cell:
 [[0 1 0 0 0 0]
 [0 1 0 0 0 0]
 [0 1 0 0 0 0]
 [0 1 0 0 0 0]
 [0 0 0 0 1 0]]
[11.0, 4, 5]
count_table a-star:
 [[ 0. -1. -1. -1. -1. -1.]
 [ 1. -1. -1. -1. -1. -1.]
 [ 2. -1. -1. -1. -1. -1.]
 [ 3. -1. -1.  8.  9. 10.]
 [ 4.  5.  6.  7. -1. 11.]]
directions a-star:
 [['' '' '' '' '' '']
 ['v' '' '' '' '' '']
 ['v' '' '' '' '' '']
 ['v' '' '' '^' '>' '>']
 ['v' '>' '>' '>' '' 'v']]

grid Cell:
 [[0 0 1 0 0 0]
 [0 0 1 0 0 0]
 [0 0 1 0 1 0]
 [0 0 1 1 1 0]
 [0 0 0 0 1 0]]
value_table:
 [[-1. -1. -1.  6.  5.  4.]
 [-1. -1. -1.  5.  4.  3.]
 [-1. -1. -1.  6. -1.  2.]
 [-1. -1. -1. -1. -1.  1.]
