In [2]:
from heapq import heappush, heappop
import numpy as np
from scipy.spatial import KDTree
from typing import Tuple, List, Dict, Set

In [108]:
print(KDTree(np.array(static_obstacles)))

<scipy.spatial.kdtree.KDTree object at 0x00000189B6B11BE0>


In [109]:
np.array(static_obstacles)

array([[1, 1],
       [1, 2],
       [1, 3],
       [1, 4],
       [1, 5],
       [1, 6],
       [1, 7],
       [2, 1],
       [2, 2],
       [2, 3],
       [2, 4],
       [2, 5],
       [2, 6],
       [2, 7],
       [4, 1],
       [4, 2],
       [4, 3],
       [4, 4],
       [4, 5],
       [4, 6],
       [4, 7],
       [5, 1],
       [5, 2],
       [5, 3],
       [5, 4],
       [5, 5],
       [5, 6],
       [5, 7],
       [7, 1],
       [7, 2],
       [7, 3],
       [7, 4],
       [7, 5],
       [7, 6],
       [7, 7],
       [8, 1],
       [8, 2],
       [8, 3],
       [8, 4],
       [8, 5],
       [8, 6],
       [8, 7]])

In [112]:
print(Grid(grid_size, np.array(static_obstacles)))

TypeError: __init__() missing 1 required positional argument: 'warehouse_y'

In [113]:
grid = Grid(10,100,100)

In [110]:
class Grid:

    def __init__(self, grid_size, warehouse_x, warehouse_y):
        self.grid_size = grid_size
        self.minx = 0
        self.maxx = warehouse_x
        self.miny = 0
        self.maxy = warehouse_y
        self.grid = self.make_grid(grid_size, self.minx, self.maxx, self.miny, self.maxy)

#     @staticmethod
#     def calculate_boundaries(static_obstacles: np.ndarray) -> Tuple[int, int, int, int]:
#         min_ = np.min(static_obstacles, axis=0)
#         max_ = np.max(static_obstacles, axis=0)
#         return min_[0], max_[0], min_[1], max_[1]

    

    @staticmethod
    def make_grid(grid_size: int, minx: int, maxx: int, miny: int, maxy: int) -> np.ndarray:
        # Calculate the size of the sides
        warehouse_col_count = (maxx - minx) // grid_size
        warehouse_row_count = (maxy - miny) // grid_size
        # Initialize the grid, assuming grid is 2D
        grid = np.zeros([warehouse_row_count, warehouse_col_count, 2], dtype=np.int32)
        # Fill the grid in
        y = miny - grid_size / 2
        for i in range(warehouse_row_count):
            y += grid_size
            x = minx - grid_size / 2
            for j in range(warehouse_col_count):
                x += grid_size
                grid[i][j] = np.array([x, y])
        return grid

    '''
    Snap an arbitrary position to the center of the grid
    '''
    def snap_to_grid(self, position: np.ndarray) -> np.ndarray:
        i = (position[1] - self.miny) // self.grid_size
        j = (position[0] - self.minx) // self.grid_size
        if i >= len(self.grid):
            i -= 1
        if j >= len(self.grid[0]):
            j -= 1
        return self.grid[i][j]

In [144]:
class NeighbourTable:
#                 Current  Right    Left    Down     Up
    directions = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1)]
#     Uncomment the line below for 9 directions, this might slow things down a lot
#     directions = [(0, 0), (1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (1, -1), (-1, 1), (-1, -1)]

    def __init__(self, grid: np.ndarray):
        dimy, dimx = len(grid), len(grid[0])
        table = dict()
        for i in range(dimy):
            for j in range(dimx):
                neighbours = []
                for dx, dy in self.directions:
                    y, x = i + dy, j + dx,
                    if x >= 0 and x < dimx and y >= 0 and y < dimy:
                        neighbours.append(grid[y][x])
                table[self.hash(grid[i][j])] = np.array(neighbours)
        self.table = table
    def lookup(self, position: np.ndarray) -> np.ndarray:
        return self.table[self.hash(position)]

    @staticmethod
    def hash(grid_pos: np.ndarray) -> int:
        return tuple(grid_pos)

# if __name__ == '__main__':
#     grid = np.array([[[15,5],[15,6],[15,7],[15,8],[15,9]],
#             [[16,5],[16,6],[16,7],[16,8],[16,9]],
#             [[17,5],[17,6],[17,7],[17,8],[17,9]],
#             [[18,5],[18,6],[18,7],[18,8],[18,9]],
#             [[19,5],[19,6],[19,7],[19,8],[19,9]]])
#     nt = NeighbourTable(grid)
#     print(nt.lookup(np.array([16,7])))
#     print(nt.table)

In [145]:
class State:

    def __init__(self, pos: np.ndarray, time: int, g_score: int, h_score: int):
        self.pos = pos
        self.time = time
        self.g_score = g_score
        self.f_score = g_score + h_score

    def __hash__(self) -> int:
        concat = str(self.pos[0]) + str(self.pos[1]) + '0' + str(self.time)
        return int(concat)

    def pos_equal_to(self, pos: np.ndarray) -> bool:
        return np.array_equal(self.pos, pos)

    def __lt__(self, other: 'State') -> bool:
        return self.f_score < other.f_score

    def __eq__(self, other: 'State') -> bool:
        return self.__hash__() == other.__hash__()

    def __str__(self):
        return 'State(pos=[' + str(self.pos[0]) + ', ' + str(self.pos[1]) + '], ' \
               + 'time=' + str(self.time) + ', fscore=' + str(self.f_score) + ')'

    def __repr__(self):
        return self.__str__()

In [188]:
class Planner:

    def __init__(self, grid_size: int,
                       robot_radius: int,
                       static_obstacles: List[Tuple[int, int]],
                       warehouse_x,
                       warehouse_y):

        self.grid_size = grid_size
        self.robot_radius = robot_radius
        np_static_obstacles = np.array(static_obstacles)
        self.static_obstacles = KDTree(np_static_obstacles)
        self.warehouse_x = warehouse_x
        self.warehouse_y = warehouse_y

        # Make the grid according to the grid size
        self.grid = Grid(grid_size, warehouse_x, warehouse_y)
        # Make a lookup table for looking up neighbours of a grid
        self.neighbour_table = NeighbourTable(self.grid.grid)


    '''
    An admissible and consistent heuristic for A*
    '''
    @staticmethod
    def h(start: np.ndarray, goal: np.ndarray) -> int:
        return int(np.linalg.norm(start-goal, 1))  # L1 norm

    @staticmethod
    def l2(start: np.ndarray, goal: np.ndarray) -> int:
        return int(np.linalg.norm(start-goal, 2))  # L2 norm

    '''
    Check whether the nearest static obstacle is within radius
    '''
    def safe_static(self, grid_pos: np.ndarray) -> bool:
        _, nn = self.static_obstacles.query(grid_pos)
        return self.l2(grid_pos, self.static_obstacles.data[nn]) > self.robot_radius

    '''
    Space-Time A*
    '''
    def plan(self, start: Tuple[int, int],
                   goal: Tuple[int, int],
                   dynamic_obstacles: Dict[int, Set[Tuple[int, int]]],
                   semi_dynamic_obstacles:Dict[int, Set[Tuple[int, int]]] = None,
                   max_iter:int = 500,
                   debug:bool = False) -> np.ndarray:

        # Prepare dynamic obstacles
        dynamic_obstacles = dict((k, np.array(list(v))) for k, v in dynamic_obstacles.items())
        # Assume dynamic obstacles are agents with same radius, distance needs to be 2*radius
        def safe_dynamic(grid_pos: np.ndarray, time: int) -> bool:
            nonlocal dynamic_obstacles
            return all(self.l2(grid_pos, obstacle) > 2 * self.robot_radius
                       for obstacle in dynamic_obstacles.setdefault(time, np.array([])))

        # Prepare semi-dynamic obstacles, consider them static after specific timestamp
        if semi_dynamic_obstacles is None:
            semi_dynamic_obstacles = dict()
        else:
            semi_dynamic_obstacles = dict((k, np.array(list(v))) for k, v in semi_dynamic_obstacles.items())
        def safe_semi_dynamic(grid_pos: np.ndarray, time: int) -> bool:
            nonlocal semi_dynamic_obstacles
            for timestamp, obstacles in semi_dynamic_obstacles.items():
                flag = True
                if time >= timestamp:
                    flag = all(self.l2(grid_pos, obstacle) > 2 * self.robot_radius for obstacle in obstacles)
                if not flag:
                    return False
            return True


        start = self.grid.snap_to_grid(np.array(start))
        goal = self.grid.snap_to_grid(np.array(goal))

        # Initialize the start state
        s = State(start, 0, 0, self.h(start, goal))

        open_set = [s]
        closed_set = set()

        # Keep track of parent nodes for reconstruction
        came_from = dict()

        iter_ = 0
        while open_set and iter_ < max_iter:
            iter_ += 1
            current_state = open_set[0]  # Smallest element in min-heap
            if current_state.pos_equal_to(goal):
                if debug:
                    print('STA*: Path found after {0} iterations'.format(iter_))
                return self.reconstruct_path(came_from, current_state)

            closed_set.add(heappop(open_set))
            epoch = current_state.time + 1
            for neighbour in self.neighbour_table.lookup(current_state.pos):
                neighbour_state = State(neighbour, epoch, current_state.g_score + 1, self.h(neighbour, goal))
                # Check if visited
                if neighbour_state in closed_set:
                    continue

                # Avoid obstacles
                if not self.safe_static(neighbour) \
                   or not safe_dynamic(neighbour, epoch) \
                   or not safe_semi_dynamic(neighbour, epoch):
                    continue

                # Add to open set
                if neighbour_state not in open_set:
                    came_from[neighbour_state] = current_state
                    heappush(open_set, neighbour_state)

        if debug:
            print('STA*: Open set is empty, no path found.')
        return np.array([])

    '''
    Reconstruct path from A* search result
    '''
    def reconstruct_path(self, came_from: Dict[State, State], current: State) -> np.ndarray:
        total_path = [current.pos]
        while current in came_from.keys():
            current = came_from[current]
            total_path.append(current.pos)
        return np.array(total_path[::-1])


In [189]:
static_obstacles = [(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7),
                    (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), 
                    (4, 1), (4, 2), (4, 3), (4, 4), (4, 5), (4, 6), (4, 7), 
                    (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (5, 6), (5, 7), 
                    (7, 1), (7, 2), (7, 3), (7, 4), (7, 5), (7, 6), (7, 7), 
                    (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7)]

In [190]:
warehouse_x = 100
warehouse_y = 100 
x_size = 10
y_size = 10
grid_size =10


In [191]:
grid = np.zeros([y_size, x_size, 2], dtype=np.int32)

In [192]:
# for i in range(x_size):
#     for j in range(y_size):
#         grid[i][j] = np.array([j,i])
# print(grid)

In [193]:
print(grid)

[[[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]

 [[0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]
  [0 0]]]


In [194]:
#for i in x_size:
#    for j in y_size:
#        grid[i][j] = 0
y = 0-y_size/2
for i in range(y_size):
    y+=grid_size
    x = 0-x_size/2
    for j in range(x_size):
        x+=grid_size
        grid[y_size-j-1][i] = np.array([x,y])
print(grid)    

[[[95  5]
  [95 15]
  [95 25]
  [95 35]
  [95 45]
  [95 55]
  [95 65]
  [95 75]
  [95 85]
  [95 95]]

 [[85  5]
  [85 15]
  [85 25]
  [85 35]
  [85 45]
  [85 55]
  [85 65]
  [85 75]
  [85 85]
  [85 95]]

 [[75  5]
  [75 15]
  [75 25]
  [75 35]
  [75 45]
  [75 55]
  [75 65]
  [75 75]
  [75 85]
  [75 95]]

 [[65  5]
  [65 15]
  [65 25]
  [65 35]
  [65 45]
  [65 55]
  [65 65]
  [65 75]
  [65 85]
  [65 95]]

 [[55  5]
  [55 15]
  [55 25]
  [55 35]
  [55 45]
  [55 55]
  [55 65]
  [55 75]
  [55 85]
  [55 95]]

 [[45  5]
  [45 15]
  [45 25]
  [45 35]
  [45 45]
  [45 55]
  [45 65]
  [45 75]
  [45 85]
  [45 95]]

 [[35  5]
  [35 15]
  [35 25]
  [35 35]
  [35 45]
  [35 55]
  [35 65]
  [35 75]
  [35 85]
  [35 95]]

 [[25  5]
  [25 15]
  [25 25]
  [25 35]
  [25 45]
  [25 55]
  [25 65]
  [25 75]
  [25 85]
  [25 95]]

 [[15  5]
  [15 15]
  [15 25]
  [15 35]
  [15 45]
  [15 55]
  [15 65]
  [15 75]
  [15 85]
  [15 95]]

 [[ 5  5]
  [ 5 15]
  [ 5 25]
  [ 5 35]
  [ 5 45]
  [ 5 55]
  [ 5 65]
  [ 5 75]
  

In [195]:
state = State((1,1),1,1,3)

In [196]:
state.pos

(1, 1)

In [197]:
planner = Planner(grid_size,5,np.array(static_obstacles),warehouse_x,warehouse_y)

In [198]:
planner.plan([5,5], [5,95],{0:[95,15]},{0:[75,15]},500)

array([], dtype=float64)