In [19]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.animation as animation

In [29]:
class Agent:
    def __init__(self, start, goal):
        self.start = start
        self.goal = goal
        self.path = []  # Stores the path of the agent


def a_star(graph, start, goal):
    open_set = {start}
    closed_set = set()
    g_values = {start: 0}
    f_values = {start: heuristic(start, goal)}
    came_from = {}

    while open_set:
        current = min(open_set, key=lambda node: f_values[node])

        if current == goal:
            return reconstruct_path(start, goal, came_from)

        open_set.remove(current)
        closed_set.add(current)

        for neighbor in get_neighbors(graph, current):
            if neighbor in closed_set:
                continue

            tentative_g = g_values[current] + 1

            if neighbor not in open_set or tentative_g < g_values[neighbor]:
                came_from[neighbor] = current
                g_values[neighbor] = tentative_g
                f_values[neighbor] = tentative_g + heuristic(neighbor, goal)

                if neighbor not in open_set:
                    open_set.add(neighbor)

    return None


def heuristic(node, goal):
    return abs(node[0] - goal[0]) + abs(node[1] - goal[1])


def get_neighbors(graph, node):
    neighbors = []
    for i, j in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
        new_node = (node[0] + i, node[1] + j)
        if 0 <= new_node[0] < len(graph) and 0 <= new_node[1] < len(graph[0]) and graph[new_node[0]][new_node[1]] == 0:
            neighbors.append(new_node)
    return neighbors


def reconstruct_path(start, goal, came_from):
    path = [goal]
    current = goal
    while current != start:
        current = came_from[current]
        path.append(current)
    path.reverse()
    return path


# Write a new function that finds conflicts between agents
def find_conflicts(agents):
    conflicts = []
    for i in range(len(agents)):
        for j in range(i + 1, len(agents)):
            for pos_i, pos_j in zip(agents[i].path, agents[j].path):
                if pos_i == pos_j:
                    conflicts.append((i, j))
                    break
    return conflicts


def resolve_conflict(agent1, agent2):
    # Implement conflict resolution logic here
    agent1.start, agent2.start = agent2.start, agent1.start


def cbs(map, agents):
    for agent in agents:
            agent.path = a_star(map, agent.start, agent.goal)

    while True:
        # Step 2: Check for conflicts
        conflicts = find_conflicts(agents)

        # If there are no conflicts, the solution is found
        if not conflicts:
            return [agent.path for agent in agents]

        # Step 3: Resolve conflicts
        for i, j in conflicts:
            resolve_conflict(agents[i], agents[j])

        # Re-plan paths
        for agent in agents:
            agent.path = a_star(map, agent.start, agent.goal)

In [30]:
# Write a function to visualise the solution as a gif with the agents moving one step in the path at each timestep
def visualize_solution(map, agents, path):
    # Initialize the plot
    map = np.array(map)
    path = np.array(path)

    fig, ax = plt.subplots(figsize=(map.shape[0], map.shape[1]))
    ax.set_title('MAPF Solution')
    ax.set_xticks(np.arange(0, map.shape[0], 1))
    ax.set_yticks(np.arange(0, map.shape[1], 1))
    ax.grid()
    ax.imshow(map, cmap='Greys', vmin=0, vmax=1)

    # Initialize the agents
    agent_markers = []
    for agent in agents:
        agent_markers.append(ax.scatter(agent.start[0], agent.start[1], marker='o', color='blue'))

    # Initialize the paths
    path_markers = []
    for i in range(len(agents)):
        path_markers.append(ax.plot([], [], color='blue', linewidth=1)[0])

    # Animate the solution
    def animate(i):
        for j, agent in enumerate(agents):
            if i < len(path[j]):
                agent_markers[j].set_offsets([path[j][i][1], path[j][i][0]])
                path_markers[j].set_data([p[1] for p in path[j][:i+1]], [p[0] for p in path[j][:i+1]])

    anim = animation.FuncAnimation(fig, animate, frames=100, interval=100, blit=False)
    anim.save('solution.gif', writer='imagemagick', fps=1)
    plt.show()

In [31]:

# Define the map and agents
map =  [[0, 0, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 0, 0]]

agents = [Agent((0, 0), (4, 4)), Agent((4, 0), (0, 4))]

# Run CBS algorithm
paths = cbs(map, agents)

# Print the paths
for i, path in enumerate(paths):
    print(f"Agent {i + 1} path:", path)

# for agent in agents:
#     agent.path = a_star(map, agent.start, agent.goal)
#     print(agent.path)

# conflicts = find_conflicts(agents)
# print(conflicts)

# visualize_solution(map, agents, [agent.path for agent in agents])

Agent 1 path: [(4, 0), (4, 1), (4, 2), (4, 3), (4, 4)]
Agent 2 path: [(0, 0), (0, 1), (0, 2), (0, 3), (0, 4)]
