In [1]:
import math
import heapq

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


from time import time
from PIL import Image, ImageDraw
from heapq import heappop, heappush

%matplotlib inline

In [2]:
class Map:

    def __init__(self, manhattan=False):
        '''
        Default constructor
        '''
        self.manhattan = manhattan
        self._width = 0
        self._height = 0
        self._cells = []
    

    def read_from_string(self, cell_str, width, height):
        '''
        Converting a string (with '#' representing obstacles and '.' representing free cells) to a grid
        '''
        self._width = width
        self._height = height
        self._cells = [[0 for _ in range(width)] for _ in range(height)]
        cell_lines = cell_str.split("\n")
        i = 0
        j = 0
        for l in cell_lines:
            if len(l) != 0:
                j = 0
                for c in l:
                    if c == '.':
                        self._cells[i][j] = 0
                    elif c == '#':
                        self._cells[i][j] = 1
                    else:
                        continue
                    j += 1
                if j != width:
                    raise Exception("Size Error. Map width = ", j, ", but must be", width )
                
                i += 1

        if i != height:
            raise Exception("Size Error. Map height = ", i, ", but must be", height )
    
     
    def set_grid_cells(self, width, height, grid_cells):
        '''
        Initialization of map by list of cells.
        '''
        self._width = width
        self._height = height
        self._cells = grid_cells


    def in_bounds(self, i, j):
        '''
        Check if the cell is on a grid.
        '''
        return (0 <= j < self._width) and (0 <= i < self._height)
    

    def traversable(self, i, j):
        '''
        Check if the cell is not an obstacle.
        ''' 
        return not self._cells[i][j]


    def get_neighbors(self, i, j):
        '''
        Get a list of neighbouring cells as (i,j) tuples.
        It's assumed that grid is 4-connected (i.e. only moves into cardinal directions are allowed)
        '''  
        
        neighbors = []
        delta = [[0, 1], [1, 0], [0, -1], [-1, 0]]
        neighbors_2 = []
        delta_2 = [[1, 1], [1, -1], [-1, -1], [-1, 1]]
        for d in delta:
            if self.in_bounds(i + d[0], j + d[1]) and self.traversable(i + d[0], j + d[1]):
                neighbors.append((i + d[0], j + d[1]))
        for d in delta_2:
            if self.in_bounds(i + d[0], j + d[1]) and self.traversable(i + d[0], j + d[1]) and (i + d[0], j) in neighbors and (i, j + d[1]) in neighbors:
                neighbors_2.append((i + d[0], j + d[1]))
        neighbors.append((i, j))
        return neighbors + neighbors_2

    def get_size(self):
        return (self._height, self._width)


In [3]:
def compute_cost(i1, j1, i2, j2):
    '''
    Computes cost of simple moves between cells
    '''
    if abs(i1 - i2) + abs(j1 - j2) == 1: #cardinal move
        return 1
    elif abs(i1 - i2) == 1 and abs(j1 - j2) == 1:
        return np.sqrt(2)
    else:
        raise Exception('Trying to compute the cost of non-supported move! ONLY cardinal moves are supported.')


In [4]:
class Node:
    '''
    Node class represents a search node

    - i, j: coordinates of corresponding grid element
    - g: g-value of the node
    - h: h-value of the node // always 0 for Dijkstra
    - F: f-value of the node // always equal to g-value for Dijkstra
    - parent: pointer to the parent-node 

    '''
    

    def __init__(self, i, j, g = 0, h = 0, f = None, parent = None, tie_breaking_func = None):
        self.i = i
        self.j = j
        self.g = g
        self.h = h
        if f is None:
            self.f = self.g + h
        else:
            self.f = f        
        self.parent = parent

        
    
    def __eq__(self, other):
        '''
        Estimating where the two search nodes are the same,
        which is needed to detect dublicates in the search tree.
        '''
        return (self.i == other.i) and (self.j == other.j)
    
    def __hash__(self):
        '''
        To implement CLOSED as set of nodes we need Node to be hashable.
        '''
        ij = self.i, self.j
        return hash(ij)


    def __lt__(self, other): 
        '''
        Comparing the keys (i.e. the f-values) of two nodes,
        which is needed to sort/extract the best element from OPEN.
        
        This comparator is very basic. We will code a more plausible comparator further on.
        '''
        if self.f == other.f:
            return self.g < other.g
        return self.f < other.f

In [22]:
class CBS_Node:
    '''
    Node class represents a search node

    - i, j: coordinates of corresponding grid element
    - g: g-value of the node
    - h: h-value of the node // always 0 for Dijkstra
    - F: f-value of the node // always equal to g-value for Dijkstra
    - parent: pointer to the parent-node 

    '''
    

    def __init__(self, cost, constraints=None, solutions=None):
        self._cost = cost
        self._constraints = constraints
        self._solutions = solutions
    
    def count_cost(self):
        cost = 0
        for solution in self._solutions:
            cost += solution.get_cost()
        self._cost = cost
    
    def get_cost(self):
        return self._cost
    
    def get_solutions(self):
        return self._solutions
    
    def get_constraints(self):
        return self._constraints
        
    
#     def __eq__(self, other):
#         '''
#         Estimating where the two search nodes are the same,
#         which is needed to detect dublicates in the search tree.
#         '''
#         return (self.i == other.i) and (self.j == other.j)
    
    def __hash__(self):
        '''
        To implement CLOSED as set of nodes we need Node to be hashable.
        '''
        s = f"{self._cost} {self._constraints.__hash__()}"
        return hash(s)


    def __lt__(self, other): 
        '''
        Comparing the keys (i.e. the f-values) of two nodes,
        which is needed to sort/extract the best element from OPEN.
        
        This comparator is very basic. We will code a more plausible comparator further on.
        '''
        return self._cost < other._cost

In [20]:
class Constraints:
    def __init__(self):
        self.constraints = {}
    
    def add_constraint(self, agent_index, step, node):
        if Constraint_step(agent_index, step) not in self.constraints:
            self.constraints[Constraint_step(agent_index, step)] = [node]
            return
        self.constraints[Constraint_step(agent_index, step)].append(node)
    
    def get_constraints(self, agent_index, step):
        return self.constraints[Constraint_step(agent_index, step)]
    
    def __hash__(self):
        '''
        To implement CLOSED as set of nodes we need Node to be hashable.
        '''
        h = ""
        for constraint in list(self.constraints.values()):
            h += str(constraint)
        return hash(h)
    

In [15]:
class Constraint_step:
    def __init__(self, agent_index, step):
        self.agent_index = agent_index
        self.step = step
    
    def get_index(self):
        return self.agent_index
    
    def get_step(self):
        return self.step
    
    def __hash__(self):
        '''
        To implement CLOSED as set of nodes we need Node to be hashable.
        '''
        h = self.agent_index, self.step
        return hash(h)
    
    def __eq__(self, other):
        return self.agent_index == other.agent_index and self.step == other.step

In [23]:
class Solutions:
    def __init__(self):
        self.solutions = []
    
    def add_solution(self, find, end, steps):
        self.solutions.append(Solution(find, end, steps))
    
    def upgrade_solution(self, index, find, end, steps):
        self.solutions[index] = Solution(find, end, steps)
    
    def get_solution_of_robot(self, index):
        return self.solutions[index]

In [14]:
class Solution:
    def __init__(self, find, end, steps):
        self._find = find
        self._end = end
        self._steps = steps
    
    def find(self):
        return self._find
    
    def get_path(self):
        path, len_ = make_path(self._goal)
        return path
    
    def steps(self):
        return self._steps
    
        

In [5]:
def draw(grid_map, start = None, goal = None, path = None, nodes_opened = None, nodes_expanded = None, nodes_reexpanded = None, pt = False):
    '''
    Auxiliary function that visualizes the environment, the path and 
    the open/expanded/re-expanded nodes.
    
    The function assumes that nodes_opened/nodes_expanded/nodes_reexpanded
    are iterable collestions of SearchNodes
    '''
    k = 5
    height, width = grid_map.get_size()
    h_im = height * k
    w_im = width * k
    im = Image.new('RGB', (w_im, h_im), color = 'white')
    draw = ImageDraw.Draw(im)
    points = [[0,1], [1, 0], [1, 1], [-1, -1], [-1, 1], [1, -1], [0, -1], [-1, 0],
            [0,2], [2, 0], [2, 2], [-2, -2], [-2, 2], [2, -2], [0, -2], [-2, 0]]
        
    
    for i in range(height):
        for j in range(width):
            if not grid_map.traversable(i, j):
                draw.rectangle((j * k, i * k, (j + 1) * k - 1, (i + 1) * k - 1), fill=( 0, 0, 0 ))
    if nodes_opened is not None:
        for node in nodes_opened:
            draw.rectangle((node.j * k, node.i * k, (node.j + 1) * k - 1, (node.i + 1) * k - 1), fill=(213, 219, 219), width=0)
    
    if nodes_expanded is not None:
        for node in nodes_expanded:
            draw.rectangle((node.j * k, node.i * k, (node.j + 1) * k - 1, (node.i + 1) * k - 1), fill=(131, 145, 146), width=0)
    
    if nodes_reexpanded is not None:
        for node in nodes_reexpanded:
                draw.rectangle((node.j * k, node.i * k, (node.j + 1) * k - 1, (node.i + 1) * k - 1), fill=(255, 145, 146), width=0)
    
    if path is not None:
        for step in path:
            if (step is not None):
                if (grid_map.traversable(step.i, step.j)):
                    draw.rectangle((step.j * k, step.i * k, (step.j + 1) * k - 1, (step.i + 1) * k - 1), fill=(255, 0, 0), width=0)
                else:
                    draw.rectangle((step.j * k, step.i * k, (step.j + 1) * k - 1, (step.i + 1) * k - 1), fill=(230, 126, 34), width=0)

    if (start is not None) and (grid_map.traversable(start.i, start.j)):
        draw.rectangle((start.j * k, start.i * k, (start.j + 1) * k - 1, (start.i + 1) * k - 1), fill=(40, 180, 99), width=0)
    
    if (goal is not None) and (grid_map.traversable(goal.i, goal.j)):
        draw.rectangle((goal.j * k, goal.i * k, (goal.j + 1) * k - 1, (goal.i + 1) * k - 1), fill=(231, 76, 60), width=0)

    if pt:
        for point in points:
            if grid_map.in_bounds(start.i + point[0], start.j + point[1]):
                i = start.i + point[0]
                j = start.j + point[1]
                draw.rectangle((j * k, i * k, (j + 1) * k - 1, (i + 1) * k - 1), fill=(255, 0, 0))
            if grid_map.in_bounds(goal.i + point[0], goal.j + point[1]):
                i = goal.i + point[0]
                j = goal.j + point[1]
                draw.rectangle((j * k, i * k, (j + 1) * k - 1, (i + 1) * k - 1), fill=(255, 0, 0))
                
    _, ax = plt.subplots(dpi=150)
    ax.axes.xaxis.set_visible(False)
    ax.axes.yaxis.set_visible(False)
    plt.imshow(np.asarray(im))
    plt.show()


In [6]:
def make_path(goal):
    '''
    Creates a path by tracing parent pointers from the goal node to the start node
    It also returns path's length.
    '''

    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length


In [7]:
c

In [10]:
def read_tasks(file):
    file = file + ".scen"
    tasks_file = open(file)
    tasks = []
    tasks_file.readline()
    while True:
        task = tasks_file.readline()
        if task == "":
            break
        task = task.split()
        len_ = float(task[8])
        task = list(map(int, [task[0], task[4], task[5], task[6], task[7]]))
        task.append(len_)
        tasks.append(task)
    return tasks

In [11]:
class SearchTreePQS: #SearchTree which uses PriorityQueue for OPEN and set for CLOSED
    
    def __init__(self):
        self._open = []
        heapq.heapify(self._open)
        self._closed = {}      # list for the expanded nodes = CLOSED
        self._enc_open_dublicates = 0
        
    def __len__(self):
        return len(self._open) + len(self._closed)
                    
    '''
    open_is_empty should inform whether the OPEN is exhausted or not.
    In the former case the search main loop should be interrupted.
    '''
    def open_is_empty(self):
        return len(self._open) == 0
    
    
    def add_to_open(self, item):
        heapq.heappush(self._open, item)
    
    def get_best_node_from_open(self):
        best_node = heapq.heappop(self._open)
        while self.was_expanded(best_node) and len(self._open) > 0:
            best_node = heapq.heappop(self._open)     
        return best_node
        
    def add_to_closed(self, item):
        self._closed[item] = item

    def was_expanded(self, item):
        try:
            node = self._closed[item]
            return True
        except KeyError:
            return False

    @property
    def OPEN(self):
        return self._open
    
    @property
    def CLOSED(self):
        return self._closed

    @property
    def number_of_open_dublicates(self):
        return self._enc_open_dublicates


In [18]:
class CBS_tree:
    def __init__(self):
        self._open = []
        heapq.heapify(self._open)
    
    def open_is_empty(self):
        return len(self._open) == 0
    
    def add_to_open(self, item):
        heapq.heappush(self._open, item)
    
    def get_best_node_from_open(self):
        best_node = heapq.heappop(self._open)
        while self.was_expanded(best_node) and len(self._open) > 0:
            best_node = heapq.heappop(self._open)     
        return best_node
    
    @property
    def OPEN(self):
        return self._open

In [12]:
def distance(i1, j1, i2, j2):
    line = max(abs(i1 - i2), abs(j1 - j2)) - min(abs(i1 - i2), abs(j1 - j2))
    return line + min(abs(i1 - i2), abs(j1 - j2)) * np.sqrt(2)

In [13]:
def astar(grid_map, start_i, start_j, goal_i, goal_j, robot_index, constraints, heuristic_func = None, search_tree = None):
    '''
    TODO
    '''
    ast = search_tree()
    steps = 0
    nodes_created = 0
    CLOSED = None
    
    current_point = [start_i, start_j]
    current_node = Node(current_point[0], current_point[1])
    nodes_created += 1
    ast.add_to_open(current_node)
    while not ast.open_is_empty():
        steps += 1
        ast.add_to_closed(current_node)
        allowed = []
        neighbors = grid_map.get_neighbors(current_node.i, current_node.j)
        constraint = constraints[Constraint_step(robot_index, steps)]
        for point in neighbors:
            nodes_created += 1
            new_node = Node(point[0], point[1],g=current_node.g + compute_cost(point[0], point[1], current_node.i, current_node.j) ,h=heuristic_func(goal_i, goal_j, point[0], point[1]),parent= current_node)
            if (ast.was_expanded(new_node) and new_node != current_node) or new_node in constraintn:
                pass
            else:
                if new_node.i == goal_i and new_node.j == goal_j:
                    end = new_node
                    find = True
                    return find, end, steps
                ast.add_to_open(new_node)
        current_node = ast.get_best_node_from_open()
        if ast.was_expanded(current_node):
            break
        
    
    
    
    CLOSED = ast.CLOSED
    
    return False, None, steps


In [None]:
def CBS(grid_map, starts_points, goals_points, heuristic_func = None, search_tree = None):
    
    cbs = CBS_tree()
    
    root = CBS_Node(0, Constraints(), Solutions())
    for i in range(len(starts_points)):
        find, end, steps = astar(
            grid_map, 
            starts_points[i][0],
            starts_points[i][1],
            goals_points[i][0],
            goals_points[i][1],
            i,
            [],
            heuristic_func,
            search_tree
        )
        root.get_solutions().add_solution(find, end, steps)
        
    
    
    
    
    
    