
# Multi-Agent Systems Mini Exercises

This notebook introduces a lightweight cooperative multi-agent scenario that you can use to demonstrate core coordination patterns in an **AI multi-agent system**. The activities are designed for a live class where learners edit the provided scaffolding and immediately observe the effect of their changes.



## Learning objectives

By the end of this mini workshop, learners will be able to:

* Describe the difference between *local* observations and *shared* team knowledge in a cooperative setting.
* Implement a simple message-passing strategy that lets agents coordinate their actions.
* Evaluate whether a coordination strategy improves team performance compared with a naive baseline.



## How to use this notebook

* Run the setup cell first to load the simple grid-world environment.
* Skim the scenario description to understand what each agent can observe.
* Work through the exercises in order. Each exercise has a code cell with clear `TODO` comments for students to edit.
* At the end of the notebook there is an **example solution** that you can reveal after discussion.



## Setup

The only libraries needed are part of the Python standard library. Running the next cell defines a small cooperative search environment plus a helper simulator that we will reuse in the exercises.


In [None]:

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import random

Coordinate = Tuple[int, int]
Action = str


@dataclass
class AgentState:
    name: str
    position: Coordinate
    memory: Dict[str, float] = field(default_factory=dict)


class CooperativeGridWorld:
    """A 2-agent cooperative search task on a small grid."""

    ACTION_VECTORS: Dict[Action, Coordinate] = {
        "up": (-1, 0),
        "down": (1, 0),
        "left": (0, -1),
        "right": (0, 1),
        "stay": (0, 0),
    }

    def __init__(self, grid_size: Coordinate = (5, 5), target: Coordinate | None = None):
        self.grid_size = grid_size
        self.agent_order = ("scout", "spotter")
        self.start_positions = {
            "scout": (0, 0),
            "spotter": (grid_size[0] - 1, grid_size[1] - 1),
        }
        self._target = target
        self._rng = random.Random(7)
        self.reset()

    @property
    def target(self) -> Coordinate:
        return self._target

    def reset(self, seed: int | None = None) -> Dict[str, Dict[str, float]]:
        if seed is not None:
            self._rng.seed(seed)
        if self._target is None:
            max_row, max_col = self.grid_size
            choices = [
                (r, c)
                for r in range(max_row)
                for c in range(max_col)
                if (r, c) not in self.start_positions.values()
            ]
            self._target = self._rng.choice(choices)
        self.turn = 0
        self.agent_states = {
            name: AgentState(name=name, position=self.start_positions[name])
            for name in self.agent_order
        }
        return self._build_observations()

    def step(self, actions: Dict[str, Action]):
        self.turn += 1
        for name, action in actions.items():
            dr, dc = self.ACTION_VECTORS[action]
            r, c = self.agent_states[name].position
            new_position = (max(0, min(self.grid_size[0] - 1, r + dr)),
                            max(0, min(self.grid_size[1] - 1, c + dc)))
            self.agent_states[name].position = new_position
        observations = self._build_observations()
        reward = 0.0
        done = any(state.position == self.target for state in self.agent_states.values())
        if done:
            reward = 1.0
        return observations, reward, done

    def _build_observations(self) -> Dict[str, Dict[str, float]]:
        observations: Dict[str, Dict[str, float]] = {}
        for name, state in self.agent_states.items():
            row, col = state.position
            target_row, target_col = self.target
            observations[name] = {
                "row_delta": target_row - row,
                "col_delta": target_col - col,
                "turn": float(self.turn),
            }
        return observations

    def render(self) -> str:
        grid = [["." for _ in range(self.grid_size[1])] for _ in range(self.grid_size[0])]
        tr, tc = self.target
        grid[tr][tc] = "T"
        for state in self.agent_states.values():
            r, c = state.position
            grid[r][c] = state.name[0].upper()
        return "
".join(" ".join(row) for row in grid)


def run_episode(env: CooperativeGridWorld,
                message_fn,
                policy_fn,
                max_steps: int = 12,
                verbose: bool = False):
    """Run a single episode and return (success, transcript)."""
    transcript: List[str] = []
    observations = env.reset()
    shared_board: Dict[str, Dict[str, float]] = {name: {} for name in env.agent_order}

    for step in range(max_steps):
        messages = {
            name: message_fn(name, observations[name])
            for name in env.agent_order
        }
        for name, payload in messages.items():
            shared_board[name] = payload
        actions = {
            name: policy_fn(name, observations[name], shared_board)
            for name in env.agent_order
        }
        observations, reward, done = env.step(actions)
        if verbose:
            transcript.append(f"Step {step + 1}: actions={actions} reward={reward:.1f}
{env.render()}")
        if done:
            transcript.append(f"Target found in {step + 1} steps! Reward={reward:.1f}")
            return True, transcript
    transcript.append("Episode timed out without finding the target.")
    return False, transcript


env = CooperativeGridWorld()
print(env.render())



The environment contains two cooperative agents:

* **Scout** starts in the top-left corner. It observes the *row* distance to the hidden target.
* **Spotter** starts in the bottom-right corner. It observes the *column* distance.

On each turn both agents can move up, down, left, right, or stay put. They win as soon as either agent reaches the target location.



### Warm-up: inspect the environment state

Run the following cell to reset the environment, view the grid, and sample a single random episode. This provides a baseline before building coordinated behaviour.


In [None]:

success, transcript = run_episode(
    env,
    message_fn=lambda name, obs: {},
    policy_fn=lambda name, obs, board: random.choice(list(CooperativeGridWorld.ACTION_VECTORS)),
    verbose=True,
)
print("Success?", success)
print("--- Transcript ---")
for line in transcript:
    print(line)



## Exercise 1 – Designing the message protocol

Each agent has only a *partial* view of the world. The scout knows the vertical distance to the target, while the spotter knows the horizontal distance. Implement `compose_message` so that an agent shares the most useful part of its observation with the team.

Guidelines:

* Return a dictionary containing numeric hints rather than a long natural-language description.
* Include both the sign (direction) and magnitude of the helpful delta.
* Feel free to clip or normalise values if you think it helps stability.


In [None]:

def compose_message(agent_name: str, observation: Dict[str, float]) -> Dict[str, float]:
    """TODO: summarise what this agent knows about the target location.

    Parameters
    ----------
    agent_name:
        The identifier of the agent sending the message ("scout" or "spotter").
    observation:
        A dictionary with keys `row_delta`, `col_delta`, and `turn`.

    Returns
    -------
    Dict[str, float]
        Information that teammates can use. Keep it numeric so that policies
        can reason over it easily.
    """
    # TODO: update this placeholder implementation.
    return {"hint": 0.0}



## Exercise 2 – Turning shared knowledge into actions

Create a policy that chooses an action based on both the *local* observation and the team board filled with messages from all agents. The policy should move the agent closer to the target every turn when it receives useful information.

Design tips:

* Use the team board to access teammate messages. You can assume the board has an entry per agent (e.g. `board["scout"]`).
* Combine local deltas with shared hints to decide which axis to move along.
* Always return one of the valid actions listed in `CooperativeGridWorld.ACTION_VECTORS`.


In [None]:

def coordinated_policy(agent_name: str,
                       observation: Dict[str, float],
                       board: Dict[str, Dict[str, float]]) -> Action:
    """TODO: choose an action that uses both local observations and messages."""
    # TODO: replace this naive policy.
    return "stay"



## Exercise 3 – Evaluate your strategy

Once you have implemented both `compose_message` and `coordinated_policy`, run the following cell to simulate multiple episodes. Try to achieve a success rate above 80% within 12 steps.


In [None]:

def evaluate(strategy_runs: int = 10) -> float:
    wins = 0
    for seed in range(strategy_runs):
        env.reset(seed=seed)
        success, _ = run_episode(env, compose_message, coordinated_policy, verbose=False)
        wins += int(success)
    return wins / strategy_runs

success_rate = evaluate(strategy_runs=20)
print(f"Success rate over 20 runs: {success_rate:.0%}")



### Reflect and iterate

* How does the success rate change if agents only share the sign of their deltas?
* What happens if you limit the number of turns (`max_steps`) in `run_episode`?
* Can you design a protocol where the scout leads the exploration while the spotter only adjusts when close to the target?



## Example solution (for instructors)

The following cell contains one possible solution that reaches 100% success on the default settings. Keep it collapsed during class and expand it afterwards to review the key ideas with your learners.


In [None]:

def solution_compose_message(agent_name: str, observation: Dict[str, float]) -> Dict[str, float]:
    payload = {}
    if agent_name == "scout":
        payload["row_direction"] = float(observation["row_delta"] > 0) - float(observation["row_delta"] < 0)
        payload["row_distance"] = abs(observation["row_delta"])
    elif agent_name == "spotter":
        payload["col_direction"] = float(observation["col_delta"] > 0) - float(observation["col_delta"] < 0)
        payload["col_distance"] = abs(observation["col_delta"])
    return payload


def solution_coordinated_policy(agent_name: str,
                                observation: Dict[str, float],
                                board: Dict[str, Dict[str, float]]) -> Action:
    deltas = {
        "row": observation["row_delta"],
        "col": observation["col_delta"],
    }
    teammate = "spotter" if agent_name == "scout" else "scout"
    teammate_payload = board.get(teammate, {})

    if "row_direction" in teammate_payload:
        deltas["row"] = teammate_payload["row_direction"] * teammate_payload["row_distance"]
    if "col_direction" in teammate_payload:
        deltas["col"] = teammate_payload["col_direction"] * teammate_payload["col_distance"]

    if abs(deltas["row"]) >= abs(deltas["col"]):
        return "down" if deltas["row"] > 0 else "up"
    if abs(deltas["col"]) > 0:
        return "right" if deltas["col"] > 0 else "left"
    return "stay"


env.reset(seed=42)
success, transcript = run_episode(
    env,
    message_fn=solution_compose_message,
    policy_fn=solution_coordinated_policy,
    verbose=True,
)
print("Instructor solution success?", success)
print("--- Transcript ---")
for line in transcript:
    print(line)



## Next steps

To extend this activity you can:

* Add more agents with specialised sensors to emphasise the value of scalable message protocols.
* Introduce stochastic noise into observations so that students explore robustness.
* Ask students to convert their deterministic policy into a learning agent using Q-learning or policy gradients.
