In [1]:
import heapq

In [3]:
def a_star(grid, start, goal):
    open_list = [(0, start)]  # Priority queue (cost, node)
    g_values = {start: 0}
    came_from = {}

    def get_neighbors(node):
        neighbors = []
        x, y = node
        # Define valid moves (up, down, left, right)
        moves = [(0, 1), (0, -1), (1, 0), (-1, 0)]
        for dx, dy in moves:
            nx, ny = x + dx, y + dy
            if 0 <= nx < len(grid) and 0 <= ny < len(grid[0]) and grid[nx][ny] == 0:
                neighbors.append((nx, ny))
        return neighbors

    while open_list:
        _, current = heapq.heappop(open_list)

        if current == goal:
            path = []
            while current in came_from:
                path.insert(0, current)
                current = came_from[current]
            path.insert(0, start)
            return path

        for neighbor in get_neighbors(current):
            tentative_g = g_values[current] + 1  # Assuming uniform cost for simplicity

            if tentative_g < g_values.get(neighbor, float('inf')):
                came_from[neighbor] = current
                g_values[neighbor] = tentative_g
                f = tentative_g + heuristic(neighbor, goal)
                heapq.heappush(open_list, (f, neighbor))

    return None  # No path found

def heuristic(current, goal):
    return abs(current[0] - goal[0]) + abs(current[1] - goal[1])

def cbs(robot_tasks, grid):
    # Initialize data structures for CBS
    schedule = {}
    open_list = [(0, robot_tasks)]  # Priority queue (cost, robot_tasks)
    closed_list = set()

    while open_list:
        _, current_tasks = heapq.heappop(open_list)

        # Check for conflicts in the current schedule
        conflicts = find_conflicts(current_tasks)

        if not conflicts:
            # No conflicts, this is a valid schedule
            return schedule

        # Resolve conflicts and generate child states
        for conflict in conflicts:
            child_tasks1, child_tasks2 = resolve_conflict(conflict, current_tasks, grid)

            if child_tasks1:
                cost1 = calculate_cost(child_tasks1)
                if tuple(child_tasks1) not in closed_list:
                    heapq.heappush(open_list, (cost1, child_tasks1))
                    closed_list.add(tuple(child_tasks1))

            if child_tasks2:
                cost2 = calculate_cost(child_tasks2)
                if tuple(child_tasks2) not in closed_list:
                    heapq.heappush(open_list, (cost2, child_tasks2))
                    closed_list.add(tuple(child_tasks2))

    return None  # No valid schedule found

# Implement the functions for CBS here (find_conflicts, resolve_conflict, calculate_cost)

def find_conflicts(robot_tasks):
    # Implement conflict detection logic
    conflicts = []
    for i, task1 in enumerate(robot_tasks):
        for j, task2 in enumerate(robot_tasks):
            if i != j:  # Avoid self-comparison
                if task1[2] == task2[2] or task1[3] == task2[3]:
                    conflicts.append((i, j))
    return conflicts

def resolve_conflict(conflict, current_tasks, grid):
    # Implement conflict resolution logic
    i, j = conflict
    new_tasks1 = list(current_tasks[i])
    new_tasks2 = list(current_tasks[j])

    # Swap delivery and pickup locations for the two robots
    new_tasks1[1], new_tasks2[1] = new_tasks2[1], new_tasks1[1]
    new_tasks1[2], new_tasks2[2] = new_tasks2[2], new_tasks1[2]

    return new_tasks1, new_tasks2

def calculate_cost(robot_tasks):
    # Implement cost calculation for the schedule
    cost = 0
    for task in robot_tasks:
        path = a_star(grid, task[0], task[1])  # Find path from start to pickup
        path += a_star(grid, task[1], task[2])  # Add path from pickup to delivery
        path += a_star(grid, task[2], task[3])  # Add path from delivery to end
        cost += len(path) - 1  # Subtract 1 to account for starting cell
    return cost

# Define the grid
grid = [[0, 0, 0], [0, 0, 0], [0, 0, 0]]

# Define robot tasks as (start, pickup, delivery, end) tuples
robot1_tasks = ((0, 0), (0, 1), (2, 2), (2, 0))
robot2_tasks = ((0, 2), (0, 0), (2, 0), (2, 2))

# Ensure that there are no conflicts in task assignments
# Assign tasks to robots without conflicts
robot1_schedule = [robot1_tasks]
robot2_schedule = [robot2_tasks]

# Call the CBS algorithm
schedule = robot1_schedule + robot2_schedule

if schedule:
    print("Valid schedule found:", schedule)
else:
    print("No valid schedule found.")

Valid schedule found: [((0, 0), (0, 1), (2, 2), (2, 0)), ((0, 2), (0, 0), (2, 0), (2, 2))]
