# Cops and Robbers: Multi-Agent RL Environment


### 1) Setup and Imports


In [1]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from pettingzoo.utils.env import ParallelEnv
import functools
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.env_util import make_vec_env

### 2) Environment Implementation


In [2]:
class CopsAndRobbers(ParallelEnv):
    metadata = {"render_modes": ["human"], "name": "cops_and_robbers_v0"}

    def __init__(self):
        self.possible_agents = ["cop", "robber"]
        self.agents = self.possible_agents.copy()
        self.grid_size = 10
        self._action_spaces = {agent: spaces.Discrete(5) for agent in self.possible_agents}
        self._observation_spaces = {agent: spaces.Box(low=0, high=self.grid_size - 1, shape=(4,), dtype=np.float32) for agent in self.possible_agents}

    def reset(self, seed=None, options=None):
        self.agents = self.possible_agents.copy()
        if seed is not None:
            np.random.seed(seed)
        self.state = {agent: np.random.randint(self.grid_size, size=2) for agent in self.agents}
        observations = self._get_observations()
        return observations, {agent: {} for agent in self.agents}

    def step(self, actions):
        for agent in self.agents:
            action = actions.get(agent, 0)  # Default to 'stay' if no action is provided
            if action == 1: self.state[agent][1] = min(self.state[agent][1] + 1, self.grid_size - 1)
            elif action == 2: self.state[agent][1] = max(self.state[agent][1] - 1, 0)
            elif action == 3: self.state[agent][0] = max(self.state[agent][0] - 1, 0)
            elif action == 4: self.state[agent][0] = min(self.state[agent][0] + 1, self.grid_size - 1)

        caught = np.array_equal(self.state["cop"], self.state["robber"])
        rewards = {agent: 10 if agent == "cop" and caught else -10 if agent == "robber" and caught else -1 if agent == "cop" else 1 for agent in self.agents}
        terminations = {agent: caught for agent in self.agents}
        truncations = {agent: False for agent in self.agents}
        observations = self._get_observations()
        return observations, rewards, terminations, truncations, {agent: {} for agent in self.agents}

    def _get_observations(self):
        return {
            agent: np.concatenate([self.state[agent], self.state["robber" if agent == "cop" else "cop"]]).astype(np.float32)
            for agent in self.agents
        }

    def observation_space(self, agent): 
        return self._observation_spaces[agent]

    def action_space(self, agent): 
        return self._action_spaces[agent]

    def render(self):
        grid = np.full((self.grid_size, self.grid_size), '.', dtype=str)
        for agent, position in self.state.items():
            grid[position[1], position[0]] = agent[0].upper()
        print("\n" + "=" * (self.grid_size * 2 + 3))
        for row in grid:
            print("| " + " ".join(row) + " |")
        print("=" * (self.grid_size * 2 + 3))
        print("\nAgent Positions:")
        for agent, position in self.state.items():
            print(f"{agent.capitalize()}: ({position[0]}, {position[1]})")
        print()

    def close(self): pass

### 3) Wrapper for Single-Agent Training


In [3]:
class SingleAgentWrapper(gym.Env):
    def __init__(self, env, agent_name):
        self.env = env
        self.agent_name = agent_name
        self.action_space = env.action_space(agent_name)
        self.observation_space = env.observation_space(agent_name)

    def reset(self, seed=None, options=None):
        obs, _ = self.env.reset(seed=seed, options=options)
        return obs[self.agent_name], {}

    def step(self, action):
        actions = {agent: self.env.action_space(agent).sample() for agent in self.env.possible_agents}
        actions[self.agent_name] = action
        obs, rewards, terminations, truncations, infos = self.env.step(actions)
        return obs[self.agent_name], rewards[self.agent_name], terminations[self.agent_name], truncations[self.agent_name], infos[self.agent_name]

    def render(self):
        self.env.render()

### 4) Training Function


In [4]:
def make_env(agent_name):
    def _init():
        env = CopsAndRobbers()
        return SingleAgentWrapper(env, agent_name)
    return _init

def train_agent(agent_name, total_timesteps=100000):
    vec_env = make_vec_env(make_env(agent_name), n_envs=4)

    model = PPO(MlpPolicy, vec_env, verbose=1)
    model.learn(total_timesteps=total_timesteps)
    
    return model

# Train the cop and robber agents
cop_model = train_agent("cop")
robber_model = train_agent("robber")

Using cuda device
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 167      |
|    ep_rew_mean     | -156     |
| time/              |          |
|    fps             | 1132     |
|    iterations      | 1        |
|    time_elapsed    | 7        |
|    total_timesteps | 8192     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 133         |
|    ep_rew_mean          | -122        |
| time/                   |             |
|    fps                  | 502         |
|    iterations           | 2           |
|    time_elapsed         | 32          |
|    total_timesteps      | 16384       |
| train/                  |             |
|    approx_kl            | 0.010264745 |
|    clip_fraction        | 0.118       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.6        |
|    explained_variance   | -0.00879    |
|    learnin

### 5) Testing and Simulation


In [5]:

def test_environment():
    env = CopsAndRobbers()
    
    print("Environment test started.")

    obs = env.reset()
    for step in range(20):
        print(f"\nStep {step + 1}")
        actions = {agent: np.random.choice(5) for agent in env.agents}
        obs, rewards, terminations, _, _ = env.step(actions)
        env.render()
        print(f"Actions: {actions}, Rewards: {rewards}")
        if all(terminations.values()):
            print("Game over!")
            break
    env.close()

test_environment()

Environment test started.

Step 1

| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . C . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . R |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |

Agent Positions:
Cop: (4, 2)
Robber: (9, 4)

Actions: {'cop': 1, 'robber': 1}, Rewards: {'cop': -1, 'robber': 1}

Step 2

| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . C . . . . . |
| . . . . . . . . . R |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |

Agent Positions:
Cop: (4, 3)
Robber: (9, 4)

Actions: {'cop': 1, 'robber': 4}, Rewards: {'cop': -1, 'robber': 1}

Step 3

| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . C . . . . |
| . . . . . . . . . R |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |
| . . . . . . . . . . |

# After spending hours debugging, I finally managed to get the code running. I believe I've done a fair job on this project.

### Name- Chirag Sindhwani
### Dept of Electrical Engineering
### Roll no. - 23085130