In [1]:
import heapq

def move(loc, dir):
    directions = [(0, -1), (1, 0), (0, 1), (-1, 0)]
    return loc[0] + directions[dir][0], loc[1] + directions[dir][1]

def compute_heuristics(my_map, goal):
    open_list = []
    closed_list = dict()
    root = {'loc': goal, 'cost': 0}
    heapq.heappush(open_list, (root['cost'], goal, root))
    closed_list[goal] = root
    while len(open_list) > 0:
        (cost, loc, curr) = heapq.heappop(open_list)
        for dir in range(4):
            child_loc = move(loc, dir)
            child_cost = cost + 1
            if child_loc[0] < 0 or child_loc[0] >= len(my_map) or child_loc[1] < 0 or child_loc[1] >= len(my_map[0]):
                continue
            if my_map[child_loc[0]][child_loc[1]]:
                continue
            child = {'loc': child_loc, 'cost': child_cost}
            if child_loc in closed_list:
                existing_node = closed_list[child_loc]
                if existing_node['cost'] > child_cost:
                    closed_list[child_loc] = child
                    heapq.heappush(open_list, (child_cost, child_loc, child))
            else:
                closed_list[child_loc] = child
                heapq.heappush(open_list, (child_cost, child_loc, child))
    h_values = dict()
    for loc, node in closed_list.items():
        h_values[loc] = node['cost']
    return h_values

def get_path(goal_node):
    path = []
    curr = goal_node
    while curr is not None:
        path.append(curr['loc'])
        curr = curr['parent']
    path.reverse()
    return path

def push_node(open_list, node):
    heapq.heappush(open_list, (node['g_val'] + node['h_val'], node['h_val'], node['loc'], node))

def pop_node(open_list):
    _, _, _, curr = heapq.heappop(open_list)
    return curr

def compare_nodes(n1, n2):
    return n1['g_val'] + n1['h_val'] < n2['g_val'] + n2['h_val']

def A_star(my_map, start_loc, goal_loc):
    open_list = []
    closed_list = dict()
    h_values = compute_heuristics(my_map, goal_loc)
    h_value = h_values[start_loc]
    root = {'loc': start_loc, 'g_val': 0, 'h_val': h_value, 'parent': None}
    push_node(open_list, root)
    closed_list[(root['loc'])] = root
    while len(open_list) > 0:
        curr = pop_node(open_list)
        if curr['loc'] == goal_loc:
            return get_path(curr)
        for dir in range(4):
            child_loc = move(curr['loc'], dir)
            if my_map[child_loc[0]][child_loc[1]]:
                continue
            child = {'loc': child_loc,
                    'g_val': curr['g_val'] + 1,
                    'h_val': h_values[child_loc],
                    'parent': curr}
            if (child['loc']) in closed_list:
                existing_node = closed_list[(child['loc'])]
                if compare_nodes(child, existing_node):
                    closed_list[(child['loc'])] = child
                    push_node(open_list, child)
            else:
                closed_list[(child['loc'])] = child
                push_node(open_list, child)

    return None

In [2]:
my_map = [[True, True, True, True, True, True],
          [True, True, False, False, False, True],
          [True, False, False, False, False, True],
          [True, True, True, True, True, True]]
start_loc = (2, 1)
goal_loc = (1, 4)

In [3]:
path = A_star(my_map, start_loc, goal_loc)
print(path)

[(2, 1), (2, 2), (1, 2), (1, 3), (1, 4)]


In [4]:
start_loc_A = (2, 1)
goal_loc_A = (1, 4)

start_loc_B = (1, 4)
goal_loc_B = (2, 2)

In [5]:
path_A = A_star(my_map, start_loc_A, goal_loc_A)
path_B = A_star(my_map, start_loc_B, goal_loc_B)
print(path_A)
print(path_B)

[(2, 1), (2, 2), (1, 2), (1, 3), (1, 4)]
[(1, 4), (1, 3), (1, 2), (2, 2)]


In [6]:
def STA_star(my_map, start_loc, goal_loc):
    open_list = []
    closed_list = dict()
    h_values = compute_heuristics(my_map, goal_loc)
    h_value = h_values[start_loc]
    root = {'loc': start_loc, 'g_val': 0, 'h_val': h_value, 'parent': None, 'timestep' : 0}
    push_node(open_list, root)
    closed_list[(root['loc'], root['timestep'])] = root
    while len(open_list) > 0:
        curr = pop_node(open_list)
        if curr['loc'] == goal_loc:
            return get_path(curr)
        for dir in range(4):
            child_loc = move(curr['loc'], dir)
            if my_map[child_loc[0]][child_loc[1]]:
                continue
            child = {'loc': child_loc,
                    'g_val': curr['g_val'] + 1,
                    'h_val': h_values[child_loc],
                    'parent': curr,
                    'timestep' : curr['timestep'] + 1}
            if (child['loc'], child['timestep']) in closed_list:
                existing_node = closed_list[(child['loc'], child['timestep'])]
                if compare_nodes(child, existing_node):
                    closed_list[(child['loc'], child['timestep'])] = child
                    push_node(open_list, child)
            else:
                closed_list[(child['loc'], child['timestep'])] = child
                push_node(open_list, child)
    return None

In [7]:
def push_node_time(open_list, node):
    heapq.heappush(open_list, (node['g_val'] + node['h_val'], node['h_val'], -node['timestep'], node['loc'], node))

def pop_node_time(open_list):
    _, _, _, _, curr = heapq.heappop(open_list)
    return curr

In [8]:
def build_constraint_table(constraints, agent):
    constraint_table = []
    for constraint in constraints:
        if constraint['agent'] == agent:
            constraint_table.append(constraint)
    return constraint_table

In [9]:
def is_constrained(curr_loc, next_loc, next_time, constraint_table):
    for constraint in constraint_table:
        const_loc = constraint['loc']
        const_time = constraint['timestep']
        if const_time == next_time:
            if len(const_loc) == 2:
                if const_loc == [curr_loc, next_loc] or const_loc == [next_loc, curr_loc] or const_loc[1] == next_loc:
                    return True
            else:
                if const_loc == [next_loc]:
                    return True
    return False

In [10]:
def move_wait(loc, dir):
    directions = [(0, -1), (1, 0), (0, 1), (-1, 0), (0, 0)]
    return loc[0] + directions[dir][0], loc[1] + directions[dir][1]

def STA_star_constraints(my_map, start_loc, goal_loc, agent, constraints):
    open_list = []
    closed_list = dict()
    h_values = compute_heuristics(my_map, goal_loc)
    h_value = h_values[start_loc]
    constraint_table = build_constraint_table(constraints, agent)
    root = {'loc': start_loc, 'g_val': 0, 'h_val': h_value, 'parent': None, 'timestep' : 0}
    push_node_time(open_list, root)
    closed_list[(root['loc'], root['timestep'])] = root
    while len(open_list) > 0:
        curr = pop_node_time(open_list)
        if curr['loc'] == goal_loc:
            return get_path(curr)
        for dir in range(5):
            child_loc = move_wait(curr['loc'], dir)
            if my_map[child_loc[0]][child_loc[1]]:
                continue
            child = {'loc': child_loc,
                    'g_val': curr['g_val'] + 1,
                    'h_val': h_values[child_loc],
                    'parent': curr,
                    'timestep' : curr['timestep'] + 1}
            if is_constrained(curr['loc'], child['loc'], child['timestep'], constraint_table):
                closed_list[(child['loc'], child['timestep'])] = child
                continue
            if (child['loc'], child['timestep']) in closed_list:
                existing_node = closed_list[(child['loc'], child['timestep'])]
                if compare_nodes(child, existing_node):
                    closed_list[(child['loc'], child['timestep'])] = child
                    push_node_time(open_list, child)
            else:
                closed_list[(child['loc'], child['timestep'])] = child
                push_node_time(open_list, child)
    return None

In [11]:
start_loc_A = (2, 1)
goal_loc_A = (1, 4)

start_loc_B = (1, 4)
goal_loc_B = (2, 2)

In [12]:
STpath_A = STA_star_constraints(my_map, start_loc_A, goal_loc_A, 0, [])
print(STpath_A)

[(2, 1), (2, 2), (1, 2), (1, 3), (1, 4)]


In [13]:
constraints = []

for time in range(len(STpath_A)):
    constraint = dict()
    constraint['agent'] = 1
    constraint['loc'] = [STpath_A[time]]
    constraint['timestep'] = time
    constraints.append(constraint)
            
for time in range(len(STpath_A)-1):
    constraint = dict()
    constraint['agent'] = 1
    constraint['loc'] = [STpath_A[time], STpath_A[time+1]]
    constraint['timestep'] = time+1
    constraints.append(constraint)

In [14]:
STpath_B = STA_star_constraints(my_map, start_loc_B, goal_loc_B, 1, constraints)
print(STpath_B)

[(1, 4), (1, 3), (2, 3), (2, 2)]


In [15]:
import time as timer
import heapq
import random
import copy

def get_sum_of_cost(paths):
    rst = 0
    for path in paths:
        rst += len(path) - 1
    return rst

def get_location(path, time):
    if time < 0:
        return path[0]
    elif time < len(path):
        return path[time]
    else:
        return path[-1]

def detect_collision(path1, path2):
    for time in range(min(len(path1), len(path2))):
        collision = dict()
        loc1 = get_location(path1, time)
        loc2 = get_location(path2, time)
        if loc1 == loc2:
            collision['loc'] =  [loc1]
            collision['timestep'] = time
            return collision
        if time > 0:
            loc1p = get_location(path1, time-1)
            loc2p = get_location(path2, time-1)
            if loc1 == loc2p and loc2 == loc1p:
                collision['loc'] =  [loc1, loc2]
                collision['timestep'] = time
                
                return collision
    return None

def detect_collisions(paths):
    collisions = []
    num_of_agents = len(paths)
    for i in range(num_of_agents):
        for j in range(i+1, num_of_agents):
            collision = detect_collision(paths[i], paths[j])
            if collision:
                col = dict()
                col['a1'] = i
                col['a2'] = j
                col['loc'] = collision['loc'].copy()
                col['timestep'] = collision['timestep']
                collisions.append(col)
    return collisions


def standard_splitting(collision):
    conflicts = []
    
    conflict1 = dict()
    conflict1['agent'] = collision['a1']
    conflict1['loc'] = collision['loc'].copy()
    conflict1['timestep'] = collision['timestep']
    conflicts.append(conflict1)
    
    conflict2 = dict()
    conflict2['agent'] = collision['a2']
    conflict2['loc'] = list(reversed(collision['loc'].copy()))
    conflict2['timestep'] = collision['timestep']
    conflicts.append(conflict2)
    
    return conflicts


class CBSSolver(object):

    def __init__(self, my_map, starts, goals):

        self.my_map = my_map
        self.starts = starts
        self.goals = goals
        self.num_of_agents = len(goals)

        self.num_of_generated = 0
        self.num_of_expanded = 0
        self.CPU_time = 0

        self.open_list = []

    def push_node(self, node):
        heapq.heappush(self.open_list, (node['cost'], len(node['collisions']), self.num_of_generated, node))
        print("Generate node {}".format(self.num_of_generated))
        self.num_of_generated += 1

    def pop_node(self):
        _, _, id, node = heapq.heappop(self.open_list)
        print("Expand node {}".format(id))
        self.num_of_expanded += 1
        return node

    def find_solution(self, disjoint=True):
        self.start_time = timer.time()
        root = {'cost': 0,
                'constraints': [],
                'paths': [],
                'collisions': []}
        for i in range(self.num_of_agents):
            path = STA_star_constraints(self.my_map, self.starts[i], self.goals[i], i, root['constraints'])
            if path is None:
                raise BaseException('No solutions')
            root['paths'].append(path)
        root['cost'] = get_sum_of_cost(root['paths'])
        root['collisions'] = detect_collisions(root['paths'])
        self.push_node(root)
        while(self.open_list):
            current = self.pop_node()
            if not(current['collisions']):
                self.print_results(current)
                return current['paths']
            collision = current['collisions'][0]
            constraints = standard_splitting(collision)
            for constraint in constraints:
                new = {'cost': 0,
                        'constraints': [],
                        'paths': [],
                        'collisions': []}
                new['constraints'] = current['constraints'].copy()
                if not constraint in new['constraints']:
                    new['constraints'].append(constraint)
                new['paths'] = current['paths'].copy()
                agent = constraint['agent']
                path = STA_star_constraints(self.my_map, self.starts[agent], self.goals[agent], agent, new['constraints'])
                if (path):
                    new['paths'][agent] = path
                    new['collisions'] = detect_collisions(new['paths'])
                    new['cost'] = get_sum_of_cost(new['paths'])
                    self.push_node(new)
        self.print_results(root)
        return root['paths']

    def print_results(self, node):
        print("\n        Found a solution! \n")
        CPU_time = timer.time() - self.start_time
        print("CPU time (s)    :  {:.5f}".format(CPU_time))
        print("Sum of costs    :  {}".format(get_sum_of_cost(node['paths'])))
        print("Longest path    :  {}".format(max(len(path) for path in node['paths']) - 1))
        print("Expanded nodes  :  {}".format(self.num_of_expanded))
        print("Generated nodes :  {}".format(self.num_of_generated))

In [16]:
my_map = [
    [True, True, True, True, True, True],
    [True, True, False, False, False, True],
    [True, False, False, False, False, True],
    [True, True, True, True, True, True]
]

starts = [(2, 1), (1, 4)]
goals = [(1, 4), (2, 2)]

In [17]:
solver = CBSSolver(my_map, starts, goals)
solution = solver.find_solution()
print("Final Paths:")
for i, path in enumerate(solution):
    print(f"Agent {i + 1}: {path}")

Generate node 0
Expand node 0
Generate node 1
Generate node 2
Expand node 1

        Found a solution! 

CPU time (s)    :  0.00081
Sum of costs    :  7
Longest path    :  4
Expanded nodes  :  2
Generated nodes :  3
Final Paths:
Agent 1: [(2, 1), (2, 2), (2, 3), (1, 3), (1, 4)]
Agent 2: [(1, 4), (1, 3), (1, 2), (2, 2)]


In [18]:
def push_node_t_turn(open_list, node):
    heapq.heappush(open_list, (node['g_val'] + node['h_val'], node['g_val'], node['h_val'], node['loc'], node['dir'], node))

def pop_node_t_turn(open_list):
    _, _, _, _, _, curr = heapq.heappop(open_list)
    return curr

In [19]:
def turn(dir1, dir2):
    if dir1 == dir2 or dir1 == -1:
        return 0
    return min(abs(dir1 - dir2 - 4), abs(dir1 - dir2), abs(dir1 - dir2 + 4))

In [20]:
def STA_star_c_turn(my_map, start_loc, goal_loc, agent, constraints):
    open_list = []
    closed_list = dict()
    h_values = compute_heuristics(my_map, goal_loc)
    h_value = h_values[start_loc]
    constraint_table = build_constraint_table(constraints, agent)
    root = {'loc': start_loc, 'g_val': 0, 'h_val': h_value, 'parent': None, 'timestep' : 0, 'dir' : -1}
    push_node_t_turn(open_list, root)
    closed_list[(root['loc'], root['timestep'], root['dir'])] = root
    while len(open_list) > 0:
        curr = pop_node_t_turn(open_list)
        if curr['loc'] == goal_loc:
            return get_path(curr)
        for dir in range(8):
            dir_go = dir
            dir_came = dir
            if dir > 3:
                dir_go = 4
                dir_came = dir-4
            child_loc = move_wait(curr['loc'], dir_go)
            if my_map[child_loc[0]][child_loc[1]]:
                continue
            child = {'loc': child_loc,
                    'g_val': curr['g_val'] + 1,
                    'h_val': h_values[child_loc],
                    'parent': curr,
                    'timestep' : curr['timestep'] + 1,
                    'dir' : dir_came}
            if curr['loc'] != child['loc'] and turn(curr['dir'], child['dir']) > 0:
                continue
            if curr['loc'] == child['loc'] and turn(curr['dir'], child['dir']) > 1:
                continue
            if is_constrained(curr['loc'], child['loc'], child['timestep'], constraint_table):
                closed_list[(child['loc'], child['timestep'])] = child
                print(f"{agent} is constrained at time {child['timestep']} in {child['loc']}")
                continue
            if (child['loc'], child['timestep'], child['dir']) in closed_list:
                existing_node = closed_list[(child['loc'], child['timestep'], child['dir'])]
                if compare_nodes(child, existing_node):
                    closed_list[(child['loc'], child['timestep'], child['dir'])] = child
                    push_node_t_turn(open_list, child)
            else:
                closed_list[(child['loc'], child['timestep'], child['dir'])] = child
                push_node_t_turn(open_list, child)
    return None

In [21]:
my_map =[[True, True, True, True, True, True, True, True], 
         [True, True, True, True, False, False, False, True], 
         [True, True, True, False, False, True, False, True], 
         [True, True, False, False, True, True, False, True], 
         [True, False, False, True, True, True, False, True], 
         [True, False, False, False, False, False, False, True], 
         [True, True, True, True, True, True, True, True]]
start_loc = (4, 1)
goal_loc = (1, 6)

In [22]:
path = STA_star_c_turn(my_map, start_loc, goal_loc, 0, [])

In [23]:
print(path)

[(4, 1), (5, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (5, 6), (5, 6), (4, 6), (3, 6), (2, 6), (1, 6)]


In [24]:
class CBSSolver_turn(object):

    def __init__(self, my_map, starts, goals):

        self.my_map = my_map
        self.starts = starts
        self.goals = goals
        self.num_of_agents = len(goals)

        self.num_of_generated = 0
        self.num_of_expanded = 0
        self.CPU_time = 0

        self.open_list = []

    def push_node(self, node):
        heapq.heappush(self.open_list, (node['cost'], len(node['collisions']), self.num_of_generated, node))
        print("Generate node {}".format(self.num_of_generated))
        self.num_of_generated += 1

    def pop_node(self):
        _, _, id, node = heapq.heappop(self.open_list)
        print("Expand node {}".format(id))
        self.num_of_expanded += 1
        return node

    def find_solution(self, disjoint=True):
        self.start_time = timer.time()
        root = {'cost': 0,
                'constraints': [],
                'paths': [],
                'collisions': []}
        for i in range(self.num_of_agents):
            path = STA_star_c_turn(self.my_map, self.starts[i], self.goals[i], i, root['constraints'])
            if path is None:
                raise BaseException('No solutions')
            root['paths'].append(path)
        root['cost'] = get_sum_of_cost(root['paths'])
        root['collisions'] = detect_collisions(root['paths'])
        self.push_node(root)
        while(self.open_list):
            current = self.pop_node()
            if not(current['collisions']):
                self.print_results(current)
                return current['paths']
            collision = current['collisions'][0]
            constraints = standard_splitting(collision)
            for constraint in constraints:
                new = {'cost': 0,
                        'constraints': [],
                        'paths': [],
                        'collisions': []}
                new['constraints'] = current['constraints'].copy()
                if not constraint in new['constraints']:
                    new['constraints'].append(constraint)
                new['paths'] = current['paths'].copy()
                agent = constraint['agent']
                path = STA_star_c_turn(self.my_map, self.starts[agent], self.goals[agent], agent, new['constraints'])
                if (path):
                    new['paths'][agent] = path
                    new['collisions'] = detect_collisions(new['paths'])
                    new['cost'] = get_sum_of_cost(new['paths'])
                    self.push_node(new)
                
        self.print_results(root)
        return root['paths']
    
    def print_results(self, node):
        print("\n        Found a solution! \n")
        CPU_time = timer.time() - self.start_time
        print("CPU time (s)    :  {:.5f}".format(CPU_time))
        print("Sum of costs    :  {}".format(get_sum_of_cost(node['paths'])))
        print("Longest path    :  {}".format(max(len(path) for path in node['paths']) - 1))
        print("Expanded nodes  :  {}".format(self.num_of_expanded))
        print("Generated nodes :  {}".format(self.num_of_generated))

In [25]:
my_map = [
    [True, True, True, True, True, True],
    [True, True, False, False, False, True],
    [True, False, False, False, False, True],
    [True, True, True, True, True, True]
]

starts = [(2, 1), (1, 4)]
goals = [(1, 4), (2, 2)]

In [26]:
solver = CBSSolver_turn(my_map, starts, goals)
solution = solver.find_solution()

print("Final Paths:")
for i, path in enumerate(solution):
    print(f"Agent {i + 1}: {path}")

Generate node 0
Expand node 0
0 is constrained at time 3 in (2, 4)
0 is constrained at time 3 in (2, 3)
0 is constrained at time 3 in (2, 3)
0 is constrained at time 3 in (2, 3)
0 is constrained at time 3 in (2, 3)
Generate node 1
1 is constrained at time 3 in (2, 3)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
1 is constrained at time 3 in (2, 4)
Generate node 2
Expand node 2

        Found a solution! 

CPU time (s)    :  0.00544
Sum of costs    :  9
Longest path    :  5
Expanded nodes  :  2
Generated nodes :  3
Final Paths:
Agent 1: [(2, 1), (2, 2), (2, 3), (2, 4), (2, 4), (1, 4)]
Agent 2: [(1, 4), (1, 3), (1, 2), (1, 2), (2, 2)]


In [27]:
def compute_heuristics_shelf(my_map, start, goal, state = 0):
    open_list = []
    closed_list = dict()
    root = {'loc': goal, 'cost': 0}
    heapq.heappush(open_list, (root['cost'], goal, root))
    closed_list[goal] = root
    while len(open_list) > 0:
        (cost, loc, curr) = heapq.heappop(open_list)
        for dir in range(4):
            child_loc = move(loc, dir)
            child_cost = cost + 1
            if child_loc[0] < 0 or child_loc[0] >= len(my_map) or child_loc[1] < 0 or child_loc[1] >= len(my_map[0]):
                continue
            if my_map[child_loc[0]][child_loc[1]] == '@':
                continue
            if child_loc != goal and my_map[child_loc[0]][child_loc[1]] == '#' and state == 1:
                continue
            child = {'loc': child_loc, 'cost': child_cost}
            if child_loc in closed_list:
                existing_node = closed_list[child_loc]
                if existing_node['cost'] > child_cost:
                    closed_list[child_loc] = child
                    heapq.heappush(open_list, (child_cost, child_loc, child))
            else:
                closed_list[child_loc] = child
                heapq.heappush(open_list, (child_cost, child_loc, child))
    h_values = dict()
    if not start in closed_list:
        neighbors = []
        for dir in range(4):
            neighbor = move(start, dir)
            if neighbor in closed_list:
                neighbors.append(neighbor)
        mincost = min(closed_list[neighbor]['cost'] for neighbor in neighbors)
        h_values[start] = mincost + 1
    for loc, node in closed_list.items():
        h_values[loc] = node['cost']
    return h_values

In [28]:
def single_a_star(my_map, start_loc, goal_loc, agent, constraints, start_node = None, state = 0):
    open_list = []
    closed_list = dict()
    h_values = compute_heuristics_shelf(my_map, start_loc, goal_loc, state)
    h_value = h_values[start_loc]
    constraint_table = build_constraint_table(constraints, agent)
    if start_node == None:
        root = {'loc': start_loc, 'g_val': 0, 'h_val': h_value, 'parent': None, 'timestep' : 0, 'dir' : -1}
    else:
        root = start_node
    push_node_t_turn(open_list, root)
    closed_list[(root['loc'], root['timestep'], root['dir'])] = root
    while len(open_list) > 0:
        curr = pop_node_t_turn(open_list)
        if curr['loc'] == goal_loc:
            return curr, get_path(curr)
        for dir in range(8):
            dir_go = dir
            dir_came = dir
            if dir > 3:
                dir_go = 4
                dir_came = dir-4
            child_loc = move_wait(curr['loc'], dir_go)
            if my_map[child_loc[0]][child_loc[1]] == '@':
                continue
            if child_loc != start_loc and child_loc != goal_loc and my_map[child_loc[0]][child_loc[1]] == '#' and state == 1:
                continue
            child = {'loc': child_loc,
                    'g_val': curr['g_val'] + 1,
                    'h_val': h_values[child_loc],
                    'parent': curr,
                    'timestep' : curr['timestep'] + 1,
                    'dir' : dir_came}
            if curr['loc'] != child['loc'] and turn(curr['dir'], child['dir']) > 0:
                continue
            if curr['loc'] == child['loc'] and turn(curr['dir'], child['dir']) > 1:
                continue
            if is_constrained(curr['loc'], child['loc'], child['timestep'], constraint_table):
                closed_list[(child['loc'], child['timestep'], child['dir'])] = child
                continue
            if (child['loc'], child['timestep'], child['dir']) in closed_list:
                existing_node = closed_list[(child['loc'], child['timestep'], child['dir'])]
                if compare_nodes(child, existing_node):
                    closed_list[(child['loc'], child['timestep'], child['dir'])] = child
                    push_node_t_turn(open_list, child)
            else:
                closed_list[(child['loc'], child['timestep'], child['dir'])] = child
                push_node_t_turn(open_list, child)
    return None, None

In [29]:
def agent_a_star(my_map, agent, constraints):
    start = agent['start']
    item = agent['items'][0]
    station = agent['station']
    goal = agent['goal']
    agent_num = agent['idx']
    root = None
    for item in agent['items']:
        root, _ = single_a_star(my_map, start, item, agent_num, constraints, root)
        if root == None:
            return None
        root, _ = single_a_star(my_map, item, station, agent_num, constraints, root, 1)
        if root == None:
            return None
        root, _ = single_a_star(my_map, station, item, agent_num, constraints, root, 1)
        if root == None:
            return None
        start = item
    root, path = single_a_star(my_map, start, goal, agent_num, constraints, root)
    if root == None:
            return None
    return path

In [36]:
class MAPDSolver(object):

    def __init__(self, my_map, starts, items, stations, goals):
        self.my_map = my_map
        self.starts = starts
        self.items = items
        self.stations = stations
        self.goals = goals
        self.num_of_agents = len(goals)

        self.num_of_generated = 0
        self.num_of_expanded = 0
        self.CPU_time = 0

        self.open_list = []

    def push_node(self, node):
        heapq.heappush(self.open_list, (node['cost'], len(node['collisions']), self.num_of_generated, node))
        print("Generate node {}".format(self.num_of_generated))
        self.num_of_generated += 1

    def pop_node(self):
        _, _, id, node = heapq.heappop(self.open_list)
        print("Expand node {}".format(id))
        self.num_of_expanded += 1
        return node

    def find_solution(self, disjoint=True):
        self.start_time = timer.time()
        root = {'cost': 0,
                'constraints': [],
                'paths': [],
                'collisions': []}
        for i in range(self.num_of_agents):
            agent = {'start' : self.starts[i], 'items' : self.items[i], 'station' : self.stations[i], 'goal' : self.goals[i], 'idx' : i}
            path = agent_a_star(self.my_map, agent, root['constraints'])
            if path is None:
                raise BaseException('No solutions')
            root['paths'].append(path)
        root['cost'] = get_sum_of_cost(root['paths'])
        root['collisions'] = detect_collisions(root['paths'])
        self.push_node(root)
        while(self.open_list):
            if self.num_of_generated > 30000:
                print("Exceed 30,000 nodes!")
                result = dict()
                result['CPU time'] = timer.time() - self.start_time
                result['Sum of costs'] = 0
                result['Longest path'] = 0
                return None, result
            current = self.pop_node()
            if not(current['collisions']):
                self.print_results(current)
                result = dict()
                result['CPU time'] = timer.time() - self.start_time
                result['Sum of costs'] = get_sum_of_cost(current['paths'])
                result['Longest path'] = max(len(path) for path in current['paths']) - 1
                return current['paths'], result
            collision = current['collisions'][0]
            constraints = standard_splitting(collision)
            for constraint in constraints:
                new = {'cost': 0, 'constraints': [], 'paths': [], 'collisions': []}
                new['constraints'] = current['constraints'].copy()
                if  not constraint in new['constraints']:    
                    new['constraints'].append(constraint)
                new['paths'] = current['paths'].copy()
                agent = constraint['agent']
                colagent = {'start' : self.starts[agent], 'items' : self.items[agent], 'station' : self.stations[agent], 'goal' : self.goals[agent], 'idx' : agent}
                path = agent_a_star(self.my_map, colagent, new['constraints'])
                if (path):
                    new['paths'][agent] = path
                    new['collisions'] = detect_collisions(new['paths'])
                    new['cost'] = get_sum_of_cost(new['paths'])
                    self.push_node(new)
        self.print_results(root)
        result = dict()
        result['CPU time'] = timer.time() - self.start_time
        result['Sum of costs'] = get_sum_of_cost(root['paths'])
        result['Longest path'] = max(len(path) for path in root['paths']) - 1
        return root['paths'], result
        
    def print_results(self, node):
        print("\n        Found a solution! \n")
        CPU_time = timer.time() - self.start_time
        print("CPU time (s)    :  {:.5f}".format(CPU_time))
        print("Sum of costs    :  {}".format(get_sum_of_cost(node['paths'])))
        print("Longest path    :  {}".format(max(len(path) for path in node['paths']) - 1))
        print("Expanded nodes  :  {}".format(self.num_of_expanded))
        print("Generated nodes :  {}".format(self.num_of_generated))

In [37]:
from pathlib import Path

def import_mapf_instance(filename):
    f = Path(filename)
    if not f.is_file():
        raise BaseException(filename + " does not exist.")
    f = open(filename, 'r')
    line = f.readline()
    rows, columns = [int(x) for x in line.split(' ')]
    rows = int(rows)
    columns = int(columns)
    my_map = []
    for r in range(rows):
        line = f.readline()
        my_map.append([])
        for cell in line:
            if cell == '@':
                my_map[-1].append('@')
            elif cell == '.':
                my_map[-1].append('.')
            elif cell == '#':
                my_map[-1].append('#')
    line = f.readline()
    num_agents = int(line)
    starts = []
    items = []
    stations = []
    goals = []
    for a in range(num_agents):
        line = f.readline()
        x, y = [int(x) for x in line.split(' ')]
        line = f.readline()
        item = [int(x) for x in line.split(' ')]
        it = []
        for idx in range(0, len(item), 2):
            ix, iy = item[idx], item[idx+1]
            it.append((ix, iy))
        line = f.readline()
        sx, sy = [int(x) for x in line.split(' ')]
        line = f.readline()
        gx, gy = [int(x) for x in line.split(' ')]
        starts.append((x, y))
        items.append(it)
        stations.append((sx, sy))
        goals.append((gx, gy))
    f.close()
    return my_map, starts, items, stations, goals

In [38]:
import tkinter as tk

def visualize(my_map, Path_T, starts, items, stations, goals):
    
    def set_original():
        for i in range(vertical):
            for j in range(horizontal):
                if my_map[i][j] == '@':
                    labels[i][j].config(bg = "black", text = "")
                elif my_map[i][j] == '.':
                    labels[i][j].config(bg = "white", text = "")
                elif my_map[i][j] == '#':
                    labels[i][j].config(bg = "gray", text = "")
        for i, start in enumerate(starts):
            x = start[0]
            y = start[1]
            labels[x][y].config(bg = "light green", text = i)
        for i, item in enumerate(items):
            for it in item:
                x = it[0]
                y = it[1]
                labels[x][y].config(bg = "light blue", text = i)
        for i, station in enumerate(stations):
            x = station[0]
            y = station[1]
            labels[x][y].config(bg = "khaki", text = i)
        for i in range(vertical):
            labels[i][0].config(text = i, fg = "yellow")
        for j in range(horizontal):
            labels[0][j].config(text = j, fg = "yellow")

    def set_btn(num):
        global colors
        set_original()
        path = Path_T[num]
        for i, tup in enumerate(path):
            x = tup[0]
            y = tup[1]
            labels[x][y].config(bg = colors[i], text = i)

    def play():
        global timestep
        if timestep < len(Path_T)-1:
            timestep += 1
            set_btn(timestep)
            lbl_time.config(text = f'timestep : {timestep}')

    def playback():
        global timestep
        if timestep > 0:
            timestep -= 1
            set_btn(timestep)
            lbl_time.config(text = f'timestep : {timestep}')

    def autoplay():
        global auto
        global speed
        play()
        auto = root.after(speed, autoplay)

    def autoplayback():
        global auto
        global speed
        playback()
        auto = root.after(speed, autoplayback)

    def pause():
        global auto
        root.after_cancel(auto)

    vertical = len(my_map)
    horizontal = len(my_map[0])

    root = tk.Tk()
    root.title("Path Finder")
    labels = [[tk.Label(root, width=4, height=2, bg="white", borderwidth=1, relief="sunken") for j in range(horizontal)] for i in range(vertical)]
    global colors
    colors = ["orange red", "salmon", "wheat3", "sea green", "purple", "olive drab", "indian red"]
    for i in range(vertical):
        for j in range(horizontal):
            labels[i][j].grid(row=i,column=j)
    for i in range(vertical):
        for j in range(horizontal):
            if my_map[i][j] == '@':
                labels[i][j].config(bg = "black")
            elif my_map[i][j] == '.':
                labels[i][j].config(bg = "white")
            elif my_map[i][j] == '#':
                labels[i][j].config(bg = "gray")
    for i in range(vertical):
        labels[i][0].config(text = i, fg = "yellow")
    for j in range(horizontal):
        labels[0][j].config(text = j, fg = "yellow")
    for i, start in enumerate(starts):
            x = start[0]
            y = start[1]
            labels[x][y].config(bg = "light green", text = i)
    for i, item in enumerate(items):
        for it in item:
            x = it[0]
            y = it[1]
            labels[x][y].config(bg = "light blue", text = i)
    for i, station in enumerate(stations):
        x = station[0]
        y = station[1]
        labels[x][y].config(bg = "khaki", text = i)
    global timestep
    global speed
    timestep = 0
    speed = 500
    path = Path_T[0]
    for i, tup in enumerate(path):
        x = tup[0]
        y = tup[1]
        labels[x][y].config(bg = colors[i], text = i)
    lbl_time = tk.Label(root, text = f'timestep : {timestep}', height = 3, width = 10)
    lbl_time.grid(row = vertical + 5, column = int(horizontal/2)-1, columnspan = 3, rowspan = 3, sticky='NEWS')
    btn_autoplayback = tk.Button(root, text = '<<', width = 2, command = autoplayback)
    btn_playback = tk.Button(root, text = '<', width = 2, command = playback)      
    btn_pause = tk.Button(root, text = 'II', width = 2, command = pause)     
    btn_play = tk.Button(root, text = '>', width = 2, command = play)
    btn_autoplay = tk.Button(root, text = '>>', width = 2, command = autoplay)
    btn_autoplayback.grid(row=vertical + 3, column=int(horizontal/2)-3, columnspan = 3)
    btn_playback.grid(row=vertical + 3, column=int(horizontal/2)-2, columnspan = 3)
    btn_pause.grid(row=vertical + 3, column=int(horizontal/2)-1, columnspan = 3)
    btn_play.grid(row=vertical + 3, column=int(horizontal/2), columnspan = 3)
    btn_autoplay.grid(row=vertical + 3, column=int(horizontal/2)+1, columnspan = 3)
    root.mainloop()

In [39]:
def pathfind(filepath):
    my_map, starts, items, stations, goals = import_mapf_instance(filepath)
    cbs = MAPDSolver(my_map, starts, items, stations, goals)
    paths, result = cbs.find_solution()
    return my_map, paths, result

In [40]:
def convert_path(paths):
    m = max(len(path) for path in paths)
    for path in paths:
        path += [path[-1]] * ( m - len(path))
    path_2arr = []
    for j in range(m):
        timepath = []
        for i in range(len(paths)):
            timepath.append(paths[i][j])
        path_2arr.append(timepath)
    return path_2arr

In [43]:
filepath = "./maps/map-4-20-3.txt"
my_map, paths, _ = pathfind(filepath)
my_map, starts, items, stations, goals = import_mapf_instance(filepath)
Path_T = convert_path(paths)
visualize(my_map, Path_T, starts, items, stations, goals)

Generate node 0
Expand node 0
Generate node 1
Generate node 2
Expand node 2
Generate node 3
Generate node 4
Expand node 1
Generate node 5
Generate node 6
Expand node 5
Generate node 7
Generate node 8
Expand node 7
Generate node 9
Generate node 10
Expand node 10
Generate node 11
Generate node 12
Expand node 11
Generate node 13
Generate node 14
Expand node 12
Generate node 15
Generate node 16
Expand node 14
Generate node 17
Generate node 18
Expand node 16
Generate node 19
Generate node 20
Expand node 17
Generate node 21
Generate node 22
Expand node 22
Generate node 23
Generate node 24
Expand node 24
Generate node 25
Generate node 26
Expand node 26
Generate node 27
Generate node 28
Expand node 28
Generate node 29
Generate node 30
Expand node 19
Generate node 31
Generate node 32
Expand node 32
Generate node 33
Generate node 34
Expand node 34
Generate node 35
Generate node 36
Expand node 36
Generate node 37
Generate node 38
Expand node 38
Generate node 39
Generate node 40
Expand node 13
Gen