# **Multiple agents in the grids**

In [4]:
pip install stable_baselines3 gym numpy

  and should_run_async(code)


Collecting stable_baselines3
  Downloading stable_baselines3-2.3.2-py3-none-any.whl.metadata (5.1 kB)
Collecting gymnasium<0.30,>=0.28.1 (from stable_baselines3)
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium<0.30,>=0.28.1->stable_baselines3)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading stable_baselines3-2.3.2-py3-none-any.whl (182 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m182.3/182.3 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium, stable_baselines3
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1 stable_baselines3-2.3.

In [18]:
import gym
from gym import spaces
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from IPython.display import clear_output
import random
import time
from matplotlib.patches import Arrow, Rectangle
import matplotlib.pyplot as plt
import numpy as np
import time
from IPython.display import clear_output
import random
import gym
from gym import spaces

In [19]:
class MultiAgentWarehouseEnv(gym.Env):
    """Multi-Agent Environment where each agent has a unique goal and waits once it reaches the goal."""
    metadata = {'render.modes': ['human']}

    def __init__(self, rows, cols, num_agents, num_obstacles):
        super(MultiAgentWarehouseEnv, self).__init__()
        self.rows = rows
        self.cols = cols
        self.num_agents = num_agents
        self.num_obstacles = num_obstacles
        self.grid = np.zeros((rows, cols))  # Empty grid initialized

        self.obstacles = []  # Obstacles will be placed dynamically
        self.agents_pos = []  # List of agents' positions
        self.agents_goal = []  # List of agents' goals
        self.agents_done = [False] * num_agents  # Track whether each agent has reached its goal
        self.agents_prev_pos = [None] * num_agents  # Track previous positions to determine direction

        self.time_elapsed = 0  # Initialize the time_elapsed variable

        # Colors for each agent and their respective goals
        self.agent_colors = ['blue', 'red', 'purple', 'orange', 'yellow', 'cyan']
        self.goal_colors = ['green', 'pink', 'brown', 'magenta', 'lime', 'olive']

        # Action space: one action per agent (each agent has 5 possible actions)
        self.action_space = spaces.MultiDiscrete([5] * num_agents)  # 5 actions per agent

        # Observation space: each agent's (x, y) position and their goal's (x, y)
        # Concatenate all agents' positions and goals in one big vector
        self.observation_space = spaces.Box(low=0, high=max(self.rows, self.cols),
                                            shape=(num_agents * 4,), dtype=np.int32)

        # Initialize environment with agents and obstacles
        self._place_dynamic_objects()

    def _place_dynamic_objects(self):
        """ Dynamically place agents, goals, and obstacles in the environment. """
        all_positions = [(x, y) for x in range(self.rows) for y in range(self.cols)]
        random.shuffle(all_positions)  # Shuffle to get random positions

        # Place obstacles
        self.obstacles = [all_positions.pop() for _ in range(self.num_obstacles)]
        for (x, y) in self.obstacles:
            self.grid[x][y] = -1  # Mark obstacles on the grid

        # Place agents and their goals
        self.agents_pos = [all_positions.pop() for _ in range(self.num_agents)]
        self.agents_goal = [all_positions.pop() for _ in range(self.num_agents)]

        # Reset agents_done and agents_prev_pos status
        self.agents_done = [False] * self.num_agents
        self.agents_prev_pos = [None] * self.num_agents

    def reset(self):
        """ Reset the environment by dynamically placing objects. """
        # Reset the grid
        self.grid = np.zeros((self.rows, self.cols))
        self._place_dynamic_objects()

        self.time_elapsed = 0  # Reset the time elapsed when the environment is reset

        # Return a concatenated observation of all agents' positions and goals
        return self._get_observation()

    def _get_observation(self):
        """ Concatenate agents' positions and goals into one observation vector. """
        obs = []
        for i in range(self.num_agents):
            obs.extend(self.agents_pos[i])  # Agent position (x, y)
            obs.extend(self.agents_goal[i])  # Agent goal (x, y)
        return np.array(obs, dtype=np.int32)

    def step(self, actions):
        """ Execute one time step in the environment for all agents. """
        self.time_elapsed += 1  # Increment time elapsed at every step

        rewards = []
        done_list = []

        for i, action in enumerate(actions):
            if self.agents_done[i]:
                # If the agent has reached its goal, it waits (action = 4)
                action = 4

            x, y = self.agents_pos[i]
            self.agents_prev_pos[i] = (x, y)  # Save the current position as the previous one

            if action == 0:  # Forward
                new_pos = (x-1, y)
            elif action == 1:  # Reverse
                new_pos = (x+1, y)
            elif action == 2:  # Left
                new_pos = (x, y-1)
            elif action == 3:  # Right
                new_pos = (x, y+1)
            elif action == 4:  # Wait
                new_pos = (x, y)

            # Ensure the move is valid and doesn't result in collisions with other agents
            if self.is_valid_position(new_pos, i):
                self.agents_pos[i] = new_pos

            # Calculate reward
            reward = self.get_reward(i)
            rewards.append(reward)

            # Check if the agent reached the goal
            if self.agents_pos[i] == self.agents_goal[i]:
                self.agents_done[i] = True  # Mark agent as done
            done_list.append(self.agents_done[i])

        # Return the new concatenated observation, rewards, done status for each agent
        return self._get_observation(), np.sum(rewards), all(done_list), {}

    def is_valid_position(self, pos, agent_idx):
        """ Check if a position is valid (inside the grid, not an obstacle, and not occupied by another agent). """
        x, y = pos
        if not (0 <= x < self.rows and 0 <= y < self.cols):
            return False  # Out of bounds
        if self.grid[x][y] == -1:
            return False  # Obstacle
        # Check if any other agent is at the position
        for i, agent_pos in enumerate(self.agents_pos):
            if i != agent_idx and agent_pos == pos:
                return False  # Another agent is at the position
        return True

    def get_reward(self, agent_idx):
        """ Get the reward for the current position of an agent. """
        if self.agents_pos[agent_idx] == self.agents_goal[agent_idx]:
            return 100  # Reached destination
        elif self.agents_pos[agent_idx] in self.obstacles:
            return -10  # Hit an obstacle
        else:
            return -1  # Time penalty

    def render(self, mode='human'):
        """ Render the environment with agents represented as boxes with arrow heads indicating direction. """
        clear_output(wait=True)
        fig, ax = plt.subplots(figsize=(5, 5))

        # Draw grid and obstacles
        for x in range(self.rows):
            for y in range(self.cols):
                ax.add_patch(Rectangle((y, x), 1, 1, fill=False, edgecolor='gray'))

        # Draw obstacles as black boxes
        for (x, y) in self.obstacles:
            ax.add_patch(Rectangle((y, x), 1, 1, color='black'))

        # Draw goals for each agent in green
        for i, (goal_x, goal_y) in enumerate(self.agents_goal):
            ax.add_patch(Rectangle((goal_y, goal_x), 1, 1, color='green'))

        # Draw agents as boxes with arrows showing direction
        for i, (agent_x, agent_y) in enumerate(self.agents_pos):
            # Change color to red if the agent has reached its goal
            color = 'blue' if not self.agents_done[i] else 'red'
            ax.add_patch(Rectangle((agent_y, agent_x), 1, 1, color=color))

            # Draw an arrow showing the direction of the agent
            direction = self.get_agent_direction(i)
            if direction == 0:  # Up
                ax.add_patch(Arrow(agent_y + 0.5, agent_x + 0.5, 0, 0.4, width=0.1, color='black'))
            elif direction == 1:  # Down
                ax.add_patch(Arrow(agent_y + 0.5, agent_x + 0.5, 0, -0.4, width=0.1, color='black'))
            elif direction == 2:  # Left
                ax.add_patch(Arrow(agent_y + 0.5, agent_x + 0.5, -0.4, 0, width=0.1, color='black'))
            elif direction == 3:  # Right
                ax.add_patch(Arrow(agent_y + 0.5, agent_x + 0.5, 0.4, 0, width=0.1, color='black'))

        # Set grid limits and labels
        ax.set_xlim([0, self.cols])
        ax.set_ylim([0, self.rows])
        ax.set_xticks(np.arange(0, self.cols, 1))
        ax.set_yticks(np.arange(0, self.rows, 1))
        ax.grid(True)

        # Display elapsed time and agent positions
        ax.set_title(f"Time Elapsed: {self.time_elapsed}, Agent Positions: {self.agents_pos}")

        # Show the plot and pause briefly
        plt.show()
        time.sleep(0.1)

    def get_agent_direction(self, agent_idx):
        """Returns the direction in which the agent is facing/moving.
           0: Up, 1: Down, 2: Left, 3: Right."""
        if self.agents_prev_pos[agent_idx]:
            prev_x, prev_y = self.agents_prev_pos[agent_idx]
            curr_x, curr_y = self.agents_pos[agent_idx]

            if curr_x < prev_x:
                return 0  # Moving up
            elif curr_x > prev_x:
                return 1  # Moving down
            elif curr_y < prev_y:
                return 2  # Moving left
            elif curr_y > prev_y:
                return 3  # Moving right
        return 0  # Default direction (up) if no movement detected

    def close(self):
        plt.close()


In [None]:
from stable_baselines3 import PPO,DQN,A2C,DDPG,TD3

# Initialize the multi-agent environment with 2 agents and 3 obstacles
env = MultiAgentWarehouseEnv(rows=5, cols=5, num_agents=2, num_obstacles=5)

# Initialize PPO model for multi-agent environment
model = PPO("MlpPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=2000000)
model.save("PPO MLP")
# Test the trained model

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 39.5     |
|    ep_rew_mean     | 1.54e+03 |
| time/              |          |
|    fps             | 959      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 39.5        |
|    ep_rew_mean          | 1.54e+03    |
| time/                   |             |
|    fps                  | 726         |
|    iterations           | 2           |
|    time_elapsed         | 5           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.004197836 |
|    clip_fraction        | 0.00625     |
|    clip_range           | 0.2         |
|    entropy_loss         | -3.22       |
|    explained_variance   | 3.93e-06    |
|    learning_rate        | 0.

In [None]:
obs = env.reset()
for i in range(50):
    action, _states = model.predict(obs)
    obs, reward, done, _ = env.step(action)
    env.render()
    if done:
        print("All agents reached their goals!")
        break

env.close()


In [None]:
def test_agents(env, model, max_steps=100):
    obs = env.reset()
    total_rewards = [0] * env.num_agents  # Track total rewards for each agent
    steps = [0] * env.num_agents  # Track the number of steps each agent takes
    rewards_per_step = [[] for _ in range(env.num_agents)]  # Track rewards at each step
    positions = [[] for _ in range(env.num_agents)]  # Track positions of each agent
    done_list = [False] * env.num_agents  # Track whether each agent has finished

    # Start time measurement
    start_time = time.time()

    for step in range(max_steps):
        actions, _states = model.predict(obs)  # Get actions for both agents
        obs, rewards, _, _ = env.step(actions)  # Receive the done status as a single boolean

        # Track data for each agent
        for i in range(env.num_agents):
            if not done_list[i]:  # Only track if the agent is still moving
                positions[i].append(env.agents_pos[i])  # Track position
                rewards_per_step[i].append(rewards)  # Track reward at each step
                total_rewards[i] += rewards  # Accumulate reward
                steps[i] += 1  # Increment step count

                # Check if the agent reached its goal
                if obs[4 * i:4 * i + 2].tolist() == obs[4 * i + 2:4 * i + 4].tolist():  # Check if position equals goal
                    done_list[i] = True  # Mark agent as done

        # If all agents are done, break the loop
        if all(done_list):
            print("All agents reached their goals!")
            break

    # End time measurement
    end_time = time.time()

    # Calculate total time taken for both agents to finish
    total_time = end_time - start_time

    return total_rewards, steps, positions, rewards_per_step, total_time


# Function to visualize the results for both agents
def visualize_agents_results(total_rewards, steps, positions, rewards_per_step, total_time):
    num_agents = len(total_rewards)

    # Set up subplots for visualization
    fig, axs = plt.subplots(4, num_agents, figsize=(6 * num_agents, 16))

    for i in range(num_agents):
        # 1. Rewards per step
        axs[0, i].plot(rewards_per_step[i], marker='o', color='blue')
        axs[0, i].set_title(f'Agent {i+1} - Rewards per Step')
        axs[0, i].set_xlabel('Step')
        axs[0, i].set_ylabel('Reward')
        axs[0, i].grid(True)

        # 2. Total rewards (accumulated rewards)
        axs[1, i].bar([f'Agent {i+1}'], [total_rewards[i]], color='green')
        axs[1, i].set_title(f'Agent {i+1} - Total Rewards')
        axs[1, i].set_ylabel('Total Reward')
        axs[1, i].grid(True)

        # 3. Total steps taken by each agent
        axs[2, i].bar([f'Agent {i+1}'], [steps[i]], color='orange')
        axs[2, i].set_title(f'Agent {i+1} - Total Steps Taken')
        axs[2, i].set_ylabel('Steps')
        axs[2, i].grid(True)

        # 4. Path of the agent
        x_positions = [pos[1] for pos in positions[i]]  # Extract X coordinates
        y_positions = [pos[0] for pos in positions[i]]  # Extract Y coordinates
        axs[3, i].plot(x_positions, y_positions, marker='o', color='red')
        axs[3, i].set_title(f'Agent {i+1} - Path Taken')
        axs[3, i].set_xlabel('X Position')
        axs[3, i].set_ylabel('Y Position')
        axs[3, i].grid(True)

    # Display total time taken for both agents to finish
    fig.suptitle(f"Total Time Taken: {total_time:.2f} seconds", fontsize=16)

    # Adjust layout
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()


# Initialize the environment and load the trained model
env = MultiAgentWarehouseEnv(rows=5, cols=5, num_agents=2, num_obstacles=3)

# Test the agents and collect the necessary data
total_rewards, steps, positions, rewards_per_step, total_time = test_agents(env, model)

# Visualize the results for both agents
visualize_agents_results(total_rewards, steps, positions, rewards_per_step, total_time)