# Multi-Agent Reinforcement Learning: Coordinated Package Delivery

**Author**: Esteban Lopez (ID: 34377360)  
**Course**: FIT5226 Multi-Agent Systems  
**Institution**: Monash University  

## 🎯 Project Overview

This project implements a multi-agent reinforcement learning system where 4 agents learn to coordinate package delivery in a 5×5 grid world. The challenge involves agents picking up packages from location A and delivering them to location B while avoiding collisions, all within a 1.5M step training limit.

**Key Achievement**: Achieved **78%+ success rate** (exceeding the 75% target) using a novel staged training approach.

## 🎯 Problem Statement

Train 4 agents in a 5×5 grid world to learn a coordinated transport task:

### 📊 **Performance Constraints**
- **Step Budget**: 1,500,000 maximum agent steps during training
- **Collision Budget**: 4,000 maximum head-on collisions during training  
- **Walltime Budget**: 10 minutes maximum runtime for training
- **Final Performance**: 75% success rate across all scenarios
- **Efficiency**: Each agent must complete delivery in ≤25 steps
- **Safety**: All deliveries must be collision-free

### 🎯 **Challenge Requirements**
- **Objective**: Pick up packages from location A and deliver to location B
- **Constraint**: Avoid collisions between agents
- **Evaluation**: Test across all possible A-B-agent combinations

## 🚀 Key Innovation

**Staged Training Approach**: Instead of training all 4 agents simultaneously, we train a single agent across diverse scenarios, then deploy the learned policy to multiple agents. This approach achieves emergent collision avoidance through natural path differentiation.

## 📋 Notebook Structure

This notebook is organized into the following sections:

1. **🎯 Problem Statement & Innovation** - Challenge definition and novel approach
2. **📚 Methodology** - Staged training strategy explanation
3. **🔧 Implementation** - Core classes and algorithms
4. **🏋️ Training Process** - Q-learning implementation and optimization
5. **📊 Evaluation** - Comprehensive testing and performance analysis
6. **📈 Results & Discussion** - Performance metrics and insights
7. **🔍 Limitations & Future Work** - Honest assessment and improvements

## 🎯 Quick Start

To run this notebook:
1. Install dependencies: `pip install -r requirements.txt`
2. Run all cells to see the complete training and evaluation process
3. Modify hyperparameters in the training section as needed

## Methodology
This challenge is solved by using a "Staged Training".

After many different strategies to reach the desired performance, the best training method found was to train a single agent in the most possible number of scenarios (A and B in different locations) resetting each scenario every 100 steps, while giving some rewards to the agent based on its behaviour (see Rewards section). So all the training consists in a single agent in the 1.500.000 steps learning the best possible path from A to B and from B to A. The interesting point about this is how in most scenarios the agent takes a different path from A to B than from B to A, which makes the collisions do not occur when placing the 4 agents, this keypoint will be analysed further on.

## Import Libraries

In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import time
import itertools

## Rewards

In [None]:
REWARDS = {
    'base_step_cost': -0.5,    # Cost for each step taken
    'closer_reward': 0.5,      # Reward for moving closer to target
    'away_penalty': -0.5,      # Penalty for moving away from target
    'collision_penalty': 0,    # Penalty for collisions
    'pickup_reward': 30,       # Reward for picking up package
    'delivery_reward': 30      # Reward for delivering package
}

All rewards previously defined are really self-explanatory by themselves. The only interesting point is that the collision penalty has value of 0. This is because during our training we are only using a single agent so it is not possible that there exists a collision. In fact, this collision penalty could have any value and it would not affect our training. The only reason why it is defined is case another another stage is implemented into the training with more than one agent and therefore we would need to adjust this value.

## Grid World Classes Definition

### Agent

In [None]:
class Agent:
    """
    Represents an individual agent in the multi-agent environment.
    
    Each agent has a position in the grid, can carry a package, and can move in four directions.
    The agent's state includes its position, package status, and relative positions to package
    pickup (A) and delivery (B) points.
    """
    def __init__(self, position, agent_id):
        """
        Initialize an agent with a position in the grid.
        
        Args:
            position (tuple): Initial (x, y) position of the agent
            agent_id (int): Unique identifier for the agent
        """
        self.position = position
        self.has_package = False
        self.agent_id = agent_id
    
    def get_state(self, package_position, target_position):
        """
        Return the agent's state as a numpy array for Q-learning.
        State includes:
        - agent_x, agent_y (2 values)
        - package_x, package_y (position A) (2 values)
        - target_x, target_y (position B) (2 values)
        - has_package (0/1) (1 value)
        Total state size: 7 values
        
        Args:
            package_position (tuple): (x,y) position of package pickup point A
            target_position (tuple): (x,y) position of delivery point B
            
        Returns:
            tuple: (x, y, px, py, tx, ty, has_package)
        """
        x, y = self.position
        px, py = package_position
        tx, ty = target_position
        has_package = int(self.has_package)
        return (x, y, px, py, tx, ty, has_package)

    def move(self, action, grid_size):
        """
        Move the agent according to the given action.
        Movement is constrained to grid boundaries.
        
        Args:
            action (str): One of "up", "down", "left", "right"
            grid_size (int): Size of the grid
        """
        x, y = self.position

        # "up" means decreasing row (x), "down" means increasing row (x)
        # "left" means decreasing column (y), "right" means increasing column (y)
        if action == "up" and x > 0:
            self.position = (x - 1, y)
        elif action == "down" and x < grid_size - 1:
            self.position = (x + 1, y)
        elif action == "left" and y > 0:
            self.position = (x, y - 1)
        elif action == "right" and y < grid_size - 1:
            self.position = (x, y + 1)


### Environment

In [None]:
class MultiAgentEnvironment:
    """
    A grid-based environment for multiple agents to learn delivery tasks.
    
    The environment consists of a grid where agents must pick up packages from location A
    and deliver them to location B. Agents can move in four directions, and the environment
    handles collisions, package pickups, deliveries, and rewards. The environment maintains
    the state of all agents and the grid, and provides methods for agents to interact with
    the environment.
    """
    def __init__(self, grid_size=5, num_agents=4, package_position=None, target_position=None, rewards=REWARDS):
        """
        Initialize a multi-agent environment with a grid and agents.
        
        Args:
            grid_size (int): Size of the square grid
            num_agents (int): Number of agents in the environment
            package_position (tuple): (x,y) position of package pickup point A
            target_position (tuple): (x,y) position of delivery point B
            rewards (dict): Dictionary of reward values for different actions
        """
        self.grid_size = grid_size
        self.num_agents = num_agents
        self.grid = np.zeros((grid_size, grid_size), dtype=object)
        
        # Store rewards configuration
        self.rewards = rewards
        
        # Randomly place A and B unless specified
        positions = self.get_random_positions(2)
        if package_position is None or target_position is None:
            self.package_position = positions[0]  # A location
            self.target_position = positions[1]   # B location
        else:
            self.package_position = package_position
            self.target_position = target_position
        
        # Initialize agents
        self.agents = []
        for i in range(num_agents):
            start_position = random.choice([self.package_position, self.target_position])
            
            agent = Agent(start_position, i)
            # If agent starts at A, it should have the package
            agent.has_package = (start_position == self.package_position)
            self.agents.append(agent)
            
        self.update_grid()
        self.current_agent_idx = 0

    def get_random_positions(self, n, exclude=None):
        """
        Get n random positions in the grid, excluding specified positions.
        
        Args:
            n (int): Number of positions to generate
            exclude (list): List of positions to exclude
            
        Returns:
            list: List of n random (x,y) positions
        """
        if exclude is None:
            exclude = []
        positions = []
        while len(positions) < n:
            pos = (random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1))
            if pos not in exclude and pos not in positions:
                positions.append(pos)
        return positions
    
    def get_opposite_agents(self, agent):
        """
        Get information about agents with opposite package status in neighboring cells.
        Used for collision detection.
        
        Args:
            agent (Agent): The agent to check neighbors for
            
        Returns:
            list: Boolean list indicating presence of opposite agents in 8 neighboring cells
        """
        x, y = agent.position
        neighbors = []
        for dx, dy in [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)]:
            new_x, new_y = x + dx, y + dy
            if 0 <= new_x < self.grid_size and 0 <= new_y < self.grid_size:
                has_opposite = False
                for other_agent in self.agents:
                    if (other_agent.position == (new_x, new_y) and 
                        other_agent.has_package != agent.has_package):
                        has_opposite = True
                        break
                neighbors.append(has_opposite)
            else:
                neighbors.append(False)
        return neighbors

    def is_head_on_collision(self, agent, new_position):
        """
        Check if moving to new_position would cause a head-on collision.
        A head-on collision occurs when two agents with opposite package status
        try to occupy the same cell (except at A or B locations).
        
        Args:
            agent (Agent): The agent attempting to move
            new_position (tuple): The position to check for collision
            
        Returns:
            bool: True if collision would occur, False otherwise
        """
        # Don't check for collisions at A or B locations
        if new_position == self.package_position or new_position == self.target_position:
            return False
        for other_agent in self.agents:
            if other_agent.position == new_position and other_agent != agent:
                return agent.has_package != other_agent.has_package
        return False

    def move_agent(self, agent, action):
        """
        Move an agent and handle package pickup/delivery.
        
        Args:
            agent (Agent): The agent to move
            action (str): The action to take ("up", "down", "left", "right")
            
        Returns:
            tuple: (reward, info) where info contains collision and delivery status
        """
        x, y = agent.position
        possible_moves = {
            "up": (-1, 0),
            "down": (1, 0),
            "left": (0, -1),
            "right": (0, 1)
        }
        dx, dy = possible_moves[action]
        new_x, new_y = x + dx, y + dy
        new_pos = (new_x, new_y)

        # Initialize reward and info
        reward = self.rewards['base_step_cost']  # Base step cost
        info = {"valid": True, "collision": False, "delivery": False}

        # Wall collision - still prevent these as they're out of bounds
        if not (0 <= new_x < self.grid_size and 0 <= new_y < self.grid_size):
            return -1, {"valid": False, "collision": False, "delivery": False}

        # Calculate Manhattan distance to target
        target_x, target_y = self.target_position if agent.has_package else self.package_position
        current_dist = abs(x - target_x) + abs(y - target_y)
        new_dist = abs(new_x - target_x) + abs(new_y - target_y)

        # Heuristic 1: Reward moving closer to target
        if new_dist < current_dist:
            reward += self.rewards['closer_reward']  # Small reward for moving closer
        elif new_dist > current_dist:
            reward += self.rewards['away_penalty']  # Small penalty for moving away

        # Head-on collision - allow but penalize if needed
        collision = self.is_head_on_collision(agent, new_pos)
        if collision:
            reward += self.rewards['collision_penalty']  # Currently 0, can be adjusted later
            info["collision"] = True

        # Move agent
        agent.position = new_pos

        # Handle package pickup at A
        if agent.position == self.package_position and not agent.has_package:
            agent.has_package = True
            reward += self.rewards['pickup_reward']  # Reward for pickup

        # Handle package delivery at B
        if agent.position == self.target_position and agent.has_package:
            agent.has_package = False
            reward += self.rewards['delivery_reward']  # Reward for successful delivery
            info["delivery"] = True

        self.update_grid()
        return reward, info
    
    def step(self, action):
        """
        Execute one step in the environment.
        The current agent takes an action, and the environment updates.
        
        Args:
            action (str): The action for the current agent to take
            
        Returns:
            tuple: (observation, reward, info)
        """
        current_agent = self.agents[self.current_agent_idx]
        reward, info = self.move_agent(current_agent, action)
        self.current_agent_idx = (self.current_agent_idx + 1) % self.num_agents
        next_agent = self.agents[self.current_agent_idx]
        observation = {
            'state': next_agent.get_state(self.package_position, self.target_position)
        }
        return observation, reward, info

    def update_grid(self):
        """
        Update the grid representation with current agent positions and package status.
        """
        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=object)
        self.grid[self.package_position] = "A"
        self.grid[self.target_position] = "B"
        for agent in self.agents:
            x, y = agent.position
            if agent.has_package:
                self.grid[x, y] = f"AG{agent.agent_id}(A)"
            else:
                self.grid[x, y] = f"AG{agent.agent_id}"

    def reset(self):
        """
        Reset the environment to a new random state.
        Randomly places A and B, and resets agent positions.
        """
        # Randomly place A and B
        positions = self.get_random_positions(2)
        self.package_position = positions[0]
        self.target_position = positions[1]
        
        # Reset agents
        for i, agent in enumerate(self.agents):
            start_position = random.choice([self.package_position, self.target_position])
            
            agent.position = start_position
            agent.has_package = (start_position == self.package_position)
        
        self.update_grid()
        self.current_agent_idx = 0



See how get_opposite_agents is defined but not used. This is because we are using only one agent during training so there is no need to check this. However, the method stays within the class in case there is a modification to the training later and its use is needed.

## Q Learning Training Process

Something important to highlight here, is how the relation between exploration/exploitation (epsilon start, end and decay values) and the rest of the hyperparameters were chosen. Initially the idea was to be conservative, with a low epsilon from the start (0.3) and to reach the minimum value (0.1) at approximately 1.000.000 steps (66% of the training). For this, we simply did some basic algebra knowing that E_min = E_start * E_decay^N with N as the number of steps, and found E_decay which after this calculation was 0.9999989. The learning rate was initially set to 0.1 because it is a commonly used setting in Q learning which allows our agent to learn without changing the Q values too aggresive or too slow. The discount factor was initially set as 0.99 since our agents need to learn a path so it is important  for them to prioritize future rewards.

After some iterations with these initial values, the only one that changed was E_decay = 0.999999 which gave us the best performance.

### Q Learning class

In [None]:
class MultiAgentQLearning:
    """
    Implements Q-learning for multiple agents in the delivery environment.
    
    This class manages the Q-table and implements the Q-learning algorithm for training
    agents. It handles state representation, action selection using epsilon-greedy policy,
    and Q-value updates. The Q-table is shared among all agents, allowing them to learn
    from each other's experiences.
    """
    def __init__(self, grid_size, num_agents, 
                 learning_rate=0.1,
                 discount_factor=0.99,
                 epsilon_start=0.3,
                 epsilon_min=0.1,
                 epsilon_decay=0.999999):
        """
        Initialize Q-learning for multiple agents.
        
        Args:
            grid_size (int): Size of the grid
            num_agents (int): Number of agents
            learning_rate (float): Alpha parameter for Q-learning
            discount_factor (float): Gamma parameter for Q-learning
            epsilon_start (float): Initial exploration rate
            epsilon_min (float): Minimum exploration rate
            epsilon_decay (float): Rate at which epsilon decays
        """
        self.alpha = learning_rate
        self.gamma = discount_factor
        self.epsilon_start = epsilon_start
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.epsilon = epsilon_start
        self.actions = ["up", "down", "left", "right"]
        
        self.grid_size = grid_size
        self.num_agents = num_agents
        self.actions = ["up", "down", "left", "right"]
        self.action_indices = {a: i for i, a in enumerate(self.actions)}
        
        # Single shared Q-table: (agent_x, agent_y, package_x, package_y, target_x, target_y, has_package, action)
        self.q_table = np.zeros((grid_size, grid_size, grid_size, grid_size, grid_size, grid_size, 2, 4))

    def decay_epsilon(self):
        """
        Decay epsilon and ensure it doesn't go below minimum.
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def get_action(self, agent, state):
        """
        Get the next action for an agent using epsilon-greedy policy.
        
        Args:
            agent (Agent): The agent to get action for
            state (tuple): Current state of the agent
            
        Returns:
            str: The selected action
        """
        if random.random() < self.epsilon:
            return random.choice(self.actions)
        x, y, px, py, tx, ty, has_package = state
        has_package = int(has_package)
        q_values = self.q_table[x, y, px, py, tx, ty, has_package, :]
        best_action_idx = np.argmax(q_values)
        return self.actions[best_action_idx]

    def update(self, agent, state, action, reward, next_state):
        """
        Update Q-values using Q-learning update rule.
        
        Args:
            agent (Agent): The agent being updated
            state (tuple): Current state
            action (str): Action taken
            reward (float): Reward received
            next_state (tuple): Next state
        """
        action_indices = self.action_indices
        x, y, px, py, tx, ty, has_package = state
        has_package = int(has_package)
        nx, ny, npx, npy, ntx, nty, nhas_package = next_state
        nhas_package = int(nhas_package)
        a_idx = action_indices[action]
        current_q = self.q_table[x, y, px, py, tx, ty, has_package, a_idx]
        max_next_q = np.max(self.q_table[nx, ny, npx, npy, ntx, nty, nhas_package, :])
        new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
        self.q_table[x, y, px, py, tx, ty, has_package, a_idx] = new_q


### Train single Agent loop

This returns the q values table along with the history of rewards, deliveries, collisions (as mentioned many times, in this training always 0 but still defined in case the training is modified and improved later on) and epsilon recorded every defined progress interval, which in this case is 10000 steps. As mentioned in the methodology section, the environment is reset every 100 steps.

In [None]:
def train_single_agent(env, q_learning, steps):
    """
    Train a single agent without collision tracking.
    
    Args:
        env (MultiAgentEnvironment): The environment to train in
        q_learning (MultiAgentQLearning): The Q-learning model
        steps (int): Number of training steps
        
    Returns:
        dict: Training metrics and results
    """
    total_steps = 0
    deliveries = 0
    rewards = 0
    
    rewards_history = [0]
    deliveries_history = [0]
    collisions_history = [0]
    epsilon_history = [q_learning.epsilon]
    progress_interval = 10000
    
    while total_steps < steps:
        if total_steps % 100 == 0:
            env.reset()
            
        current_agent = env.agents[env.current_agent_idx]
        state = current_agent.get_state(env.package_position, env.target_position)
        action = q_learning.get_action(current_agent, state)
        next_observation, reward, info = env.step(action)
        q_learning.update(current_agent, state, action, reward, next_observation['state'])
        
        q_learning.decay_epsilon()
        rewards += reward
        if info["delivery"]:
            deliveries += 1
        total_steps += 1
        
        if total_steps % progress_interval == 0:
            print(f"Steps: {total_steps}, Deliveries: {deliveries}")
            print(f"Current epsilon: {q_learning.epsilon:.4f}")
            rewards_history.append(rewards)
            deliveries_history.append(deliveries)
            collisions_history.append(0)  # Single agent has no collisions
            epsilon_history.append(q_learning.epsilon)
    
    return {
        'rewards': rewards_history,
        'deliveries': deliveries_history,
        'collisions': collisions_history,
        'epsilon_history': epsilon_history,
        'q_learning': q_learning
    }

### Training

Here we create an environment, a Q Learning instance, and we input these two into our train single agent loop

In [None]:
def train_agent(
    steps=1_500_000,
    learning_rate=0.1,
    discount_factor=0.99,
    epsilon_start=0.3,
    epsilon_min=0.1,
    epsilon_decay=0.999999,
    rewards=REWARDS
):
    """
    Train a single agent with Q-learning.
    
    Args:
        steps (int): Number of training steps
        learning_rate (float): Alpha parameter for Q-learning
        discount_factor (float): Gamma parameter for Q-learning
        epsilon_start (float): Initial exploration rate
        epsilon_min (float): Minimum exploration rate
        epsilon_decay (float): Rate at which epsilon decays
        rewards (dict): Dictionary of reward values
        
    Returns:
        dict: Training results and environment
    """
    print("Starting training...")
    # Create environment and Q-learning agent
    env = MultiAgentEnvironment(
        grid_size=5, 
        num_agents=1,
        rewards=rewards
    )
    q_learning = MultiAgentQLearning(
        grid_size=5,
        num_agents=1,
        learning_rate=learning_rate,
        discount_factor=discount_factor,
        epsilon_start=epsilon_start,
        epsilon_min=epsilon_min,
        epsilon_decay=epsilon_decay
    )
    
    # Train single agent
    results = train_single_agent(env, q_learning, steps)
    
    return {
        'results': results,
        'q_learning': q_learning,
        'env': env
    }

### Training implementation

In [None]:
results = train_agent(
        steps=1_500_000,
        learning_rate=0.1,
        discount_factor=0.99,
        epsilon_start=0.3,
        epsilon_min=0.1,
        epsilon_decay=0.999999,
        rewards=REWARDS
    )

q_learning = results['q_learning']

### Training metrics plotting

Using the return of the train_single_agent method, we plot the rewards, deliveries and collisions over time to visualize how the trained agent is actually improving over time in relation to these metrics.

We show for each metric the cumulative value over time, and also the value of each metric per step.

In [None]:
def plot_training_metrics(metrics, progress_interval=10000):
    """
    Plot the training metrics over time.

    Args:
        metrics (dict): Dictionary containing training metrics:
                        - 'rewards': List of cumulative rewards
                        - 'deliveries': List of cumulative deliveries
                        - 'collisions': List of cumulative collisions
        progress_interval (int): Number of steps between each metric record
    """
    fig, axs = plt.subplots(3, 2, figsize=(15, 12))  # 3 rows, 2 columns

    # Create x-axis values representing environment steps
    x = [i * progress_interval for i in range(len(metrics['rewards']))]

    # Left column: Cumulative metrics
    axs[0, 0].plot(x, metrics['rewards'])
    axs[0, 0].set_title('Cumulative Reward')
    axs[0, 0].set_xlabel('Environment Steps')
    axs[0, 0].set_ylabel('Reward')

    axs[1, 0].plot(x, metrics['deliveries'])
    axs[1, 0].set_title('Cumulative Deliveries')
    axs[1, 0].set_xlabel('Environment Steps')
    axs[1, 0].set_ylabel('Number of Deliveries')

    axs[2, 0].plot(x, metrics['collisions'])
    axs[2, 0].set_title('Cumulative Collisions')
    axs[2, 0].set_xlabel('Environment Steps')
    axs[2, 0].set_ylabel('Number of Collisions')

    # Right column: Rate metrics (per interval)
    reward_rates = np.diff([0] + metrics['rewards']) / progress_interval
    delivery_rates = np.diff([0] + metrics['deliveries']) / progress_interval
    collision_rates = np.diff([0] + metrics['collisions']) / progress_interval

    axs[0, 1].plot(x, reward_rates)
    axs[0, 1].set_title('Reward per step')
    axs[0, 1].set_xlabel('Environment Steps')
    axs[0, 1].set_ylabel('Avg Reward per step')

    axs[1, 1].plot(x, delivery_rates)
    axs[1, 1].set_title('Deliveries per step')
    axs[1, 1].set_xlabel('Environment Steps')
    axs[1, 1].set_ylabel('Deliveries per step')

    axs[2, 1].plot(x, collision_rates)
    axs[2, 1].set_title('Collisions per step')
    axs[2, 1].set_xlabel('Environment Steps')
    axs[2, 1].set_ylabel('Collisions per step')

    plt.tight_layout()
    plt.show()

### Training metrics plotting implementation

In [None]:
plot_training_metrics(results['results'])

## Visualization, testing and final performance evaluation

### Visualization

In order to see the behaviour of our agents, we make a visualization of the grid, which will further allow to exemplify any desired scenario, with the purposes of both confirming that our agents actually act as expected and debug certain scenarios of interest. Our grid visualization always shows the number of deliveries and collisions for the scenario in process.

In [None]:
class MultiAgentVisualizer:
    """
    Handles the visualization of the multi-agent environment.
    
    This class provides methods to render the current state of the environment,
    including the grid, agents, package pickup point (A), and delivery point (B).
    It uses different colors to distinguish between agents with and without packages,
    and provides options for displaying step-by-step visualization or saving images.
    """
    def __init__(self, env):
        """
        Initialize the visualizer with an environment.
        
        Args:
            env (MultiAgentEnvironment): The grid world environment to visualize
        """
        self.env = env
        self.colors = {
            "agent_to_b": "blue",      # Agents moving towards B
            "agent_to_a": "green",     # Agents moving towards A
            "package": "yellow",       # Location A
            "target": "red",          # Location B
        }
        
    def render(self, title=None, show=True, save_path=None):
        """
        Render the current state of the environment.
        
        Args:
            title (str, optional): Title for the plot
            show (bool): Whether to display the plot
            save_path (str, optional): Path to save the plot image
            
        Returns:
            matplotlib.figure.Figure: The generated figure
        """
        fig, ax = plt.subplots(figsize=(7, 7))
        
        # Set plot limits
        ax.set_xlim(-0.5, self.env.grid_size - 0.5)
        ax.set_ylim(-0.5, self.env.grid_size - 0.5)
        
        # Draw grid lines
        for i in range(self.env.grid_size + 1):
            ax.axhline(i - 0.5, color='black', linestyle='-', alpha=0.2)
            ax.axvline(i - 0.5, color='black', linestyle='-', alpha=0.2)
        
        # Draw package location (A)
        x, y = self.env.package_position
        package = patches.Rectangle((y - 0.4, x - 0.4), 0.8, 0.8, 
                                color=self.colors["package"], alpha=0.7)
        ax.add_patch(package)
        ax.text(y, x, "A", ha='center', va='center', fontsize=12)
        
        # Draw target location (B)
        x, y = self.env.target_position
        target = patches.Rectangle((y - 0.4, x - 0.4), 0.8, 0.8, 
                                color=self.colors["target"], alpha=0.7)
        ax.add_patch(target)
        ax.text(y, x, "B", ha='center', va='center', fontsize=12)
        
        # Draw agents
        for agent in self.env.agents:
            x, y = agent.position
            # Choose color based package status
            color = self.colors["agent_to_b"] if agent.has_package else self.colors["agent_to_a"]
                
            agent_circle = patches.Circle((y, x), 0.3, color=color, alpha=0.8)
            ax.add_patch(agent_circle)
            label = f"{agent.agent_id}"
            ax.text(y, x, label, ha='center', va='center', fontsize=10, color='white')
        
        if title:
            ax.set_title(title)
        ax.set_xlabel("Column")
        ax.set_ylabel("Row")
        
        ax.set_xticks(range(self.env.grid_size))
        ax.set_yticks(range(self.env.grid_size))
        
        # Invert y-axis to match grid coordinates (0,0 at top-left)
        ax.invert_yaxis()
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path)
        
        if show:
            plt.show()
        else:
            plt.close()
    
        return fig

### Testing

This function allows us to test our agents visualizing their behaviour, where we can define the number of tests, the number of steps for each test, whether we want to show all the steps or define an interval of steps in which the test will show us the grid world.

In [None]:
def test_trained_agents(
    env, q_learning, num_tests=10, steps_per_test=200, render=True, render_delay=0.5, 
    show_all_steps=False, policy="trained", render_interval=5
):
    """
    Test the Q-learning or random agents in the multi-agent environment and collect performance metrics.

    Args:
        env (MultiAgentEnvironment): The environment to test in (should have 4 agents)
        q_learning (MultiAgentQLearning): The trained Q-learning model
        num_tests (int): Number of test scenarios to run
        steps_per_test (int): Maximum number of steps per test scenario
        render (bool): Whether to render the environment visually
        render_delay (float): Delay between renders in seconds
        show_all_steps (bool): If True, renders every step; if False, renders every render_interval steps
        policy (str): "trained" (Q-learning) or "random" (naive)
        render_interval (int): Number of steps between renders when show_all_steps is False

    Returns:
        dict: Test metrics including:
            - deliveries: List of deliveries completed in each test
            - collisions: List of collisions in each test
            - rewards: List of total rewards in each test
            - avg_deliveries: Average deliveries across all tests
            - avg_collisions: Average collisions across all tests
            - avg_reward: Average reward across all tests
    """
    visualizer = MultiAgentVisualizer(env)
    all_deliveries = []
    all_collisions = []
    all_rewards = []

    # For per-step plots of the first test
    first_test_deliveries = []
    first_test_collisions = []
    first_test_rewards = []

    for test_num in range(num_tests):
        print(f"\nTest {test_num + 1}/{num_tests}")
        test_deliveries = 0
        test_collisions = 0
        test_reward = 0
        per_step_deliveries = []
        per_step_collisions = []
        per_step_rewards = []

        # Initialize counters for each agent
        agent_deliveries = [0] * env.num_agents
        agent_collisions = [0] * env.num_agents
        agent_rewards = [0] * env.num_agents

        for step in range(steps_per_test):
            # Get the current agent and its state
            current_agent = env.agents[env.current_agent_idx]
            state = current_agent.get_state(env.package_position, env.target_position)
            
            # Action selection
            if policy == "trained":
                action = q_learning.get_action(current_agent, state)
            elif policy == "random":
                action = random.choice(["up", "down", "left", "right"])
            else:
                raise ValueError("policy must be 'trained' or 'random'")
            
            # Execute the action in the environment
            next_observation, reward, info = env.step(action)
            
            # Update metrics for the current agent
            agent_rewards[env.current_agent_idx] += reward
            if info["delivery"]:
                agent_deliveries[env.current_agent_idx] += 1
                test_deliveries += 1
            if info["collision"]:
                agent_collisions[env.current_agent_idx] += 1
                test_collisions += 1
            test_reward += reward
                
            # Record per-step metrics for plotting
            per_step_deliveries.append(test_deliveries)
            per_step_collisions.append(test_collisions)
            per_step_rewards.append(test_reward)
            
            # Render the environment based on settings
            if show_all_steps:
                visualizer.render(
                    title=f"Test {test_num + 1}, Step {step}\n"
                          f"Deliveries: {test_deliveries}, Collisions: {test_collisions}\n"
                          f"Agent {env.current_agent_idx} moving",
                    show=True
                )
            elif render and step % render_interval == 0:
                visualizer.render(
                    title=f"Test {test_num + 1}, Step {step}\n"
                          f"Deliveries: {test_deliveries}, Collisions: {test_collisions}\n"
                          f"Agent {env.current_agent_idx} moving",
                    show=True
                )
                plt.pause(render_delay)
                
        # Store results for this test
        all_deliveries.append(test_deliveries)
        all_collisions.append(test_collisions)
        all_rewards.append(test_reward)
        
        # Print per-agent results
        print(f"\nTest {test_num + 1} Results:")
        for agent_idx in range(env.num_agents):
            print(f"Agent {agent_idx}: Deliveries={agent_deliveries[agent_idx]}, "
                  f"Collisions={agent_collisions[agent_idx]}, "
                  f"Reward={agent_rewards[agent_idx]:.2f}")
        print(f"Total: Deliveries={test_deliveries}, Collisions={test_collisions}, Reward={test_reward:.2f}")

        # Save per-step metrics for the first test for detailed analysis
        if test_num == 0:
            first_test_deliveries = per_step_deliveries
            first_test_collisions = per_step_collisions
            first_test_rewards = per_step_rewards

        # Reset environment for next test
        env.reset()

    # Calculate aggregate statistics
    avg_deliveries = np.mean(all_deliveries)
    avg_collisions = np.mean(all_collisions)
    avg_reward = np.mean(all_rewards)
    
    # Print summary statistics
    print("\nOverall Test Results:")
    print(f"Average Deliveries per Test: {avg_deliveries:.2f}")
    print(f"Average Collisions per Test: {avg_collisions:.2f}")
    print(f"Average Reward per Test: {avg_reward:.2f}")

    # Plot test results across all tests
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(10, 12))
    
    # Deliveries plot
    ax1.bar(range(num_tests), all_deliveries)
    ax1.set_title('Deliveries per Test (4 Agents)')
    ax1.set_xlabel('Test Number')
    ax1.set_ylabel('Number of Deliveries')
    ax1.axhline(y=avg_deliveries, color='r', linestyle='--', label='Average')
    ax1.legend()
    
    # Collisions plot
    ax2.bar(range(num_tests), all_collisions)
    ax2.set_title('Collisions per Test (4 Agents)')
    ax2.set_xlabel('Test Number')
    ax2.set_ylabel('Number of Collisions')
    ax2.axhline(y=avg_collisions, color='r', linestyle='--', label='Average')
    ax2.legend()
    
    # Rewards plot
    ax3.bar(range(num_tests), all_rewards)
    ax3.set_title('Total Reward per Test (4 Agents)')
    ax3.set_xlabel('Test Number')
    ax3.set_ylabel('Reward')
    ax3.axhline(y=avg_reward, color='r', linestyle='--', label='Average')
    ax3.legend()
    
    plt.tight_layout()
    plt.show()

    # Plot detailed per-step metrics for the first test
    if len(first_test_deliveries) > 0:
        steps = list(range(steps_per_test))
        fig, axs = plt.subplots(3, 2, figsize=(15, 12))
        
        # Left column: Cumulative metrics
        axs[0, 0].plot(steps, first_test_rewards)
        axs[0, 0].set_title('Cumulative Reward (First Test, 4 Agents)')
        axs[0, 0].set_xlabel('Step')
        axs[0, 0].set_ylabel('Total Reward')
        
        axs[1, 0].plot(steps, first_test_deliveries)
        axs[1, 0].set_title('Cumulative Deliveries (First Test, 4 Agents)')
        axs[1, 0].set_xlabel('Step')
        axs[1, 0].set_ylabel('Total Deliveries')
        
        axs[2, 0].plot(steps, first_test_collisions)
        axs[2, 0].set_title('Cumulative Collisions (First Test, 4 Agents)')
        axs[2, 0].set_xlabel('Step')
        axs[2, 0].set_ylabel('Total Collisions')
        
        # Right column: Per-step rates
        reward_rate = np.diff([0] + first_test_rewards)
        delivery_rate = np.diff([0] + first_test_deliveries)
        collision_rate = np.diff([0] + first_test_collisions)
        
        axs[0, 1].plot(steps, reward_rate)
        axs[0, 1].set_title('Reward per Step (First Test, 4 Agents)')
        axs[0, 1].set_xlabel('Step')
        axs[0, 1].set_ylabel('Step Reward')
        
        axs[1, 1].plot(steps, delivery_rate)
        axs[1, 1].set_title('Delivery per Step (First Test, 4 Agents)')
        axs[1, 1].set_xlabel('Step')
        axs[1, 1].set_ylabel('Step Deliveries')
        
        axs[2, 1].plot(steps, collision_rate)
        axs[2, 1].set_title('Collision per Step (First Test, 4 Agents)')
        axs[2, 1].set_xlabel('Step')
        axs[2, 1].set_ylabel('Step Collisions')
        
        plt.tight_layout()
        plt.show()

    return {
        'deliveries': all_deliveries,
        'collisions': all_collisions,
        'rewards': all_rewards,
        'avg_deliveries': avg_deliveries,
        'avg_collisions': avg_collisions,
        'avg_reward': avg_reward
    }


### Testing implementation

We will different test cases listed below:

- 5 tests with 200 steps per tests where we visualize the grid every 5 steps (to not overload the view, but can be changed if desired) for both random and trained agents to visualize how different they behave in different scenarios.

- 3 tests with 50 steps each where we visualize all steps for our trained agents, just to visualize the complete behaviour of each single agent in each step in different scenarios.

- A debug test where we visualize all steps for a single test where we define A and B positions. The objective of this test is to debug and visualize the behaviour of our agents in specific scenarios we are interested in.

*Note: Visualization of all these scenarios can be a lot, so in case not eveything is wanted to be visualized the notebook user can interrupt the execution and "Clear all ouputs" of the corresponding cell.

#### Environment for testing

In [None]:
test_env = MultiAgentEnvironment(grid_size=5, num_agents=4, rewards=REWARDS)

#### Random agents - 5 random tests - visualization every 5 steps

In [None]:
random_results = test_trained_agents(test_env, q_learning, num_tests=5, steps_per_test=200, 
                                       render=True, render_delay=0.5, policy="random", render_interval=5)

#### Trained agents - 5 random tests - visualization every 5 steps

In [None]:
trained_results = test_trained_agents(test_env, q_learning, num_tests=5, steps_per_test=200, 
                                        render=True, render_delay=0.5, policy="trained", render_interval=5)

#### Trained agents - 3 random tests - visualization in all steps

In [None]:
for test_num in range(3):
        print(f"\nDetailed Test {test_num + 1}/3")
        detailed_env = MultiAgentEnvironment(grid_size=5, num_agents=4, rewards=REWARDS)
        detailed_results = test_trained_agents(
            detailed_env,
            q_learning,
            num_tests=1,
            steps_per_test=50,
            render=True,
            render_delay=0.5,
            show_all_steps=True,
            policy="trained",
            render_interval=1
        )

#### Specific Scenario test (Debug)

Change package_position and target_position when creating the environment below to the desired A and B positions.

In [None]:
specific_env = MultiAgentEnvironment(
        grid_size=5, 
        num_agents=4,
        package_position=(4,4),
        target_position=(3,0),
        rewards=REWARDS
    )

specific_results = test_trained_agents(
        specific_env, 
        q_learning, 
        num_tests=1, 
        steps_per_test=200, 
        render=True, 
        render_delay=0.5, 
        show_all_steps=True,
        policy="trained",
        render_interval=1
    )

### Final Performance

Here we evaluate the final performance of a group of agents (random or trained) in a test that simulates all possible scenarios (all posible combinations of A and B, with each of the 4 agents starting at A or B), which gives us 9600 different scenarios. The target our agents is to succeed in 75% of them.

Let's remember the definition of "succeed":

- The agents succeed in a scenario if all of them manage to get to A (starting from B), take the package and then deliver the package at B without any collisions in no more than 25 steps (per agent). Since some agents can also start at A, we give them some time to get to B (also 25 steps since it is not specified by the assignment requirements) and then we start the timer of 25 steps to complete their "journey".

We will evaluate both random (naive) and trained agents and will record the number of successful scenarios, the number of scenarios failed by collisions, the number of scenarios failed due to timeouts, and the rewards obtained. Then we will calculate the overall success rate, which is expected to be over 75%, and will print this along with the other recorded metrics.

After this, we will plot these metrics with the x axis being the scenario (from 1 to 9600).

Finally, it will also print a table with columns as metric, random agents, trained agents and improvement percentage of trained compared to random where we will compare success rate, total deliveries, scenarios with collisions, scenarios with timeouts and total rewards.

In [None]:
def evaluate_multiagent_final_performance_qtable(
    env_class, q_learning, grid_size=5, num_agents=4, max_agent_steps=25, plot=True, policy="trained"
):
    """
    Evaluates the performance of a Q-table policy in a multi-agent environment.
    Records and plots metrics for comparison.

    Args:
        env_class: The environment class to instantiate
        q_learning: The trained MultiAgentQLearning object (None for random policy)
        grid_size: Size of the grid environment (default: 5x5)
        num_agents: Number of agents in the environment (default: 4)
        max_agent_steps: Maximum steps allowed after visiting position B (default: 25)
        plot: Whether to plot the results at the end
        policy: "trained" for Q-learning or "random" for random actions

    Returns:
        success_rates: List of success rates for each agent
        metrics: Dict of per-scenario metrics
    """
    positions = [(x, y) for x in range(grid_size) for y in range(grid_size)]
    total_scenarios = 0
    successful_scenarios = 0  # Count scenarios where ALL agents succeed

    # Basic counters
    total_deliveries = 0
    total_collisions = 0 
    total_timeouts = 0
    total_rewards = 0

    # For plotting
    scenario_success = []  # 1 for success, 0 for failure
    scenario_deliveries = []  # Number of deliveries in each scenario
    scenario_collisions = []  # 1 if scenario had collision, 0 otherwise
    scenario_timeouts = []  # 1 if scenario had timeout, 0 otherwise
    scenario_rewards = []  # Total rewards in each scenario

    scenario_count = 0
    for a_pos in positions:
        for b_pos in positions:
            if a_pos == b_pos:
                continue
            for agent_starts in itertools.product([a_pos, b_pos], repeat=num_agents):
                env = env_class(grid_size=grid_size, num_agents=num_agents)
                # Manually set A and B
                env.package_position = a_pos
                env.target_position = b_pos
                # Manually set agent positions and has_package
                for i, agent in enumerate(env.agents):
                    agent.position = agent_starts[i]
                    agent.has_package = (agent_starts[i] == a_pos)

                pre_timer = [None if agent_starts[i] == b_pos else 0 for i in range(num_agents)]
                timer_started = [False if agent_starts[i] == a_pos else True for i in range(num_agents)]
                steps_since_b = [0] * num_agents
                delivered = [False] * num_agents
                collided = [False] * num_agents
                timed_out = [False] * num_agents
                scenario_reward = 0
                scenario_delivery_count = 0
                scenario_had_collision = False
                scenario_had_timeout = False

                while any(
                    (not delivered[i] and not collided[i] and not timed_out[i] and
                     ((pre_timer[i] is None and (not timer_started[i] or steps_since_b[i] < max_agent_steps)) or
                      (pre_timer[i] is not None and pre_timer[i] < max_agent_steps)))
                    for i in range(num_agents)
                ):
                    current_agent_idx = env.current_agent_idx
                    agent = env.agents[current_agent_idx]

                    # Action selection based on policy
                    if policy == "random":
                        action = random.choice(["up", "down", "left", "right"])
                    else:  # trained policy
                        state = agent.get_state(env.package_position, env.target_position)
                        x, y, px, py, tx, ty, has_package = state
                        has_package = int(has_package)
                        q_values = q_learning.q_table[x, y, px, py, tx, ty, has_package, :]
                        action_idx = np.argmax(q_values)
                        action_map = {0: "up", 1: "down", 2: "left", 3: "right"}
                        action = action_map[action_idx]

                    next_observation, reward, info = env.step(action)
                    scenario_reward += reward
                    total_rewards += reward

                    # Only update status if not already done
                    if not (delivered[current_agent_idx] or collided[current_agent_idx] or timed_out[current_agent_idx]):
                        if pre_timer[current_agent_idx] is not None:
                            if tuple(agent.position) == b_pos:
                                pre_timer[current_agent_idx] = None
                                timer_started[current_agent_idx] = True
                                steps_since_b[current_agent_idx] = 0
                            else:
                                pre_timer[current_agent_idx] += 1
                                if pre_timer[current_agent_idx] >= max_agent_steps:
                                    timed_out[current_agent_idx] = True
                                    scenario_had_timeout = True
                                    total_timeouts += 1
                        elif timer_started[current_agent_idx]:
                            steps_since_b[current_agent_idx] += 1
                            if steps_since_b[current_agent_idx] >= max_agent_steps:
                                timed_out[current_agent_idx] = True
                                scenario_had_timeout = True
                                total_timeouts += 1
                        elif tuple(agent.position) == b_pos:
                            timer_started[current_agent_idx] = True
                            steps_since_b[current_agent_idx] = 0

                        if info["collision"]:
                            collided[current_agent_idx] = True
                            scenario_had_collision = True
                        if info["delivery"]:
                            delivered[current_agent_idx] = True
                            total_deliveries += 1
                            scenario_delivery_count += 1

                # Count successful scenarios (all agents must succeed)
                is_successful = all(delivered) and not any(collided) and not any(timed_out)
                if is_successful:
                    successful_scenarios += 1
                    scenario_success.append(1)
                else:
                    scenario_success.append(0)

                scenario_deliveries.append(scenario_delivery_count)
                scenario_collisions.append(1 if scenario_had_collision else 0)
                scenario_timeouts.append(1 if scenario_had_timeout else 0)
                if scenario_had_collision:
                    total_collisions += 1
                scenario_rewards.append(scenario_reward)

                total_scenarios += 1
                scenario_count += 1
                if scenario_count % 100 == 0:
                    print(f"Tested {scenario_count} scenarios...")

    # Calculate overall success rate (scenarios where ALL agents succeed)
    overall_success_rate = successful_scenarios / total_scenarios if total_scenarios > 0 else 0
    
    print("\nFinal Evaluation Results:")
    print(f"Successful Scenarios: {successful_scenarios}/{total_scenarios}")
    print(f"Overall Success Rate: {overall_success_rate*100:.2f}% (target: 75%)")
    print(f"\nTotal Counts:")
    print(f"Total Deliveries: {total_deliveries}")
    print(f"Scenarios with Collisions: {total_collisions}")
    print(f"Scenarios with Timeouts: {total_timeouts}")
    print(f"Total Rewards: {total_rewards}")

    # Plot metrics if requested
    if plot:
        scenarios = np.arange(total_scenarios)
        fig, axs = plt.subplots(5, 1, figsize=(15, 25))
        
        # Success (1 or 0)
        axs[0].plot(scenarios, scenario_success)
        axs[0].set_title('Success per Scenario (1 = Success, 0 = Failure)')
        axs[0].set_xlabel('Scenario')
        axs[0].set_ylabel('Success')
        axs[0].set_ylim(-0.1, 1.1)
        
        # Deliveries
        axs[1].plot(scenarios, scenario_deliveries)
        axs[1].set_title('Deliveries per Scenario')
        axs[1].set_xlabel('Scenario')
        axs[1].set_ylabel('Number of Deliveries')
        
        # Collisions
        axs[2].plot(scenarios, scenario_collisions)
        axs[2].set_title('Collisions per Scenario (1 = Had Collision, 0 = No Collision)')
        axs[2].set_xlabel('Scenario')
        axs[2].set_ylabel('Collision')
        axs[2].set_ylim(-0.1, 1.1)
        
        # Timeouts
        axs[3].plot(scenarios, scenario_timeouts)
        axs[3].set_title('Timeouts per Scenario (1 = Had Timeout, 0 = No Timeout)')
        axs[3].set_xlabel('Scenario')
        axs[3].set_ylabel('Timeout')
        axs[3].set_ylim(-0.1, 1.1)
        
        # Rewards
        axs[4].plot(scenarios, scenario_rewards)
        axs[4].set_title('Rewards per Scenario')
        axs[4].set_xlabel('Scenario')
        axs[4].set_ylabel('Total Reward')
        
        plt.tight_layout()
        plt.show()

    metrics = {
        "Success rate": overall_success_rate,
        "Total deliveries": total_deliveries,
        "Scenarios with collisions": total_collisions,
        "Scenarios with timeouts": total_timeouts,
        "Total rewards": total_rewards
    }
    return [overall_success_rate] * num_agents, metrics  # Return same success rate for all agents


### Final Performance Implementation

In [None]:
# Evaluate random agents
print("\nEvaluating random agents...")
random_success_rates, random_metrics = evaluate_multiagent_final_performance_qtable(
    MultiAgentEnvironment, None, grid_size=5, num_agents=4, policy="random"
)

# Evaluate trained agents
print("\nEvaluating trained agents...")
trained_success_rates, trained_metrics = evaluate_multiagent_final_performance_qtable(
    MultiAgentEnvironment, q_learning, grid_size=5, num_agents=4
)

# Compare evaluation results in a table
print("\nEvaluation Results Comparison:")
print("=" * 100)
print(f"{'Metric':<30} {'Random Agents':<20} {'Trained Agents':<20} {'Improvement':<20}")
print("-" * 100)

# Calculate improvements
success_improvement = ((np.mean(trained_success_rates) - np.mean(random_success_rates)) / 
                        np.mean(random_success_rates) * 100)
delivery_improvement = ((trained_metrics['Total deliveries'] - random_metrics['Total deliveries']) / 
                        random_metrics['Total deliveries'] * 100)
collision_improvement = ((random_metrics['Scenarios with collisions'] - trained_metrics['Scenarios with collisions']) / 
                        random_metrics['Scenarios with collisions'] * 100)
timeout_improvement = ((random_metrics['Scenarios with timeouts'] - trained_metrics['Scenarios with timeouts']) / 
                        random_metrics['Scenarios with timeouts'] * 100)
reward_improvement = ((trained_metrics['Total rewards'] - random_metrics['Total rewards']) / 
                        random_metrics['Total rewards'] * 100)

print(f"{'Success Rate':<30} {np.mean(random_success_rates)*100:<20.2f}% {np.mean(trained_success_rates)*100:<20.2f}% {success_improvement:>19.1f}%")
print(f"{'Total deliveries':<30} {random_metrics['Total deliveries']:<20.2f} {trained_metrics['Total deliveries']:<20.2f} {delivery_improvement:>19.1f}%")
print(f"{'Scenarios with collisions':<30} {random_metrics['Scenarios with collisions']:<20.2f} {trained_metrics['Scenarios with collisions']:<20.2f} {collision_improvement:>19.1f}%")
print(f"{'Scenarios with timeouts':<30} {random_metrics['Scenarios with timeouts']:<20.2f} {trained_metrics['Scenarios with timeouts']:<20.2f} {timeout_improvement:>19.1f}%")
print(f"{'Total rewards':<30} {random_metrics['Total rewards']:<20.2f} {trained_metrics['Total rewards']:<20.2f} {reward_improvement:>19.1f}%")
print("=" * 100)

## Results and Discussions

### Training

In the training metrics plotting we can see, the rewards and the deliveries per step increase over time. We observe that we are reaching a value of between 0.15 and 0.2 deliveries per step, which means that every 5 to 7 steps there is a delivery. This is a good sign since for our desired performance, an agent should take approx 25 steps to make a delivery, which considering that we have 4 agents, could result in having a delivery in 100 steps, and we are way below this value.

From the collisions graph we can see there are no collisions since as it has been mentioned many times before, the training is being done by a single agent, this 0 collisions value is definetely much less than the performance points target of 400.

### Tests and Final Performance

The final performance evaluation shows that the agents with no training (random-naive) have a succesful rate of less than 1%, while our trained agents have a sucess rate of more than 78% which meets the target of 75%.

We definetely significantly improved in all the metrics by training our agents. It is interesting how the scenarios that are failing because of timeouts for our training agents is 0, which is the expected behaviour since the single agent training allowed them to learn effective paths between A and B, so the only reason why our agents fail is because they collide.

What is most interesting out of everything is how our agents do not collide in more than 78% of the scenarios, when collisions were not considered during the training. This is a very interesting fact and after visualizing some cases and reasoning about it I found the reason.

When visualizing the behaviour of the agents, agents from A to B in most cases would choose a different path than from B to A. The Q table in its last dimension has 4 possible values (4 actions: 0: up, 1: down, 2: left and 3: right), so when we do action_idx = np.argmax(q_values) for choosing the action it may change the returned action depending on the current position. This is because let's say agent "0" is in A (0,0) while agent "1" is in B (1,1), for agent "0" going right or down have the same best q-value, but since "down" is first in the q-table, it will tell the agent to go down, while agent "1" has the best q-value when going "up" or "left", but since "up" is first in the q-table, then it will tell the agent to take this direction. So basically the solution works well because of how the order of actions is defined, if i changed it to let's say (0: down, 2: left, 3:up, 4: right) then my agents would definetely always collide



## Limitations

From the last paragraph in the previous section we can infer why we still have around 20% of cases where the agents collide and many of them are when A and B are in the same row or column, because in this case there is indeed only one best path from A to B and it is the same that from B to A. The percentage of cases where A and B are aligned is 17%, which is close to our 20% of our failed cases, so definetely unless we add another training stage for these cases where the agents somehow learn to take different paths depending on the direction, we will not be able to reach more than 83% of success. Let's visualize some of these cases to prove that those are the cases where our agents our colliding.

#### A and B in the same row (row 2)

In [None]:
same_row_env = MultiAgentEnvironment(
    grid_size=5,
    num_agents=4,
    package_position=(2,0),
    target_position=(2,4),
    rewards=REWARDS
)
same_row_results = test_trained_agents(
    same_row_env,
    q_learning,
    num_tests=1,
    steps_per_test=200,
    render=True,
    render_delay=0.5,
    show_all_steps=True,
    policy="trained",
    render_interval=1
)

#### A and B in the same column (column 2)

In [None]:
same_col_env = MultiAgentEnvironment(
    grid_size=5,
    num_agents=4,
    package_position=(0,2),
    target_position=(4,2),
    rewards=REWARDS
)
same_col_results = test_trained_agents(
    same_col_env,
    q_learning,
    num_tests=1,
    steps_per_test=200,
    render=True,
    render_delay=0.5,
    show_all_steps=True,
    policy="trained",
    render_interval=1
)

## Future Work

As discussed in the previous section, to increase our % of success it is necessary to add another training stage with the scenarios of A and B aligned so that our agents learn to take different paths in these situations depending on the direction (if they have the package or not).

# References

Anthropic. (2024). Claude 3.5/3.7 Sonnet [Large language model]. https://www.anthropic.com/

## Acknowledgments

Significant portions of the code were developed in collaboration with Anthropic’s Claude 3.7 Sonnet large language model, following my instructions and design ideas.