In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import random
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F


from collections import deque, namedtuple
from pettingzoo.mpe import simple_spread_v3

if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_

In [None]:
EXP_SEED = 42
def reset_seeds():
    np.random.seed(EXP_SEED)
    random.seed(EXP_SEED)
reset_seeds()

# 1) Trap Environment and Q-Learning

In [None]:
def show_results(agent, agent_desc, rewards, num_steps, epsilons=None, print_q_table=True):

  print()

  if print_q_table:
    if hasattr(agent, 'q_table'):
      print("Agent Q-Table:")
      table = agent.q_table
      for row_idx, row in enumerate(table):
        for col_idx, value in enumerate(row):
          print(f"State {row_idx}, Action {col_idx} : {value:.2f}")
      print()

  fig, axs = plt.subplots(1, 3, figsize=(15, 4))
  axs = axs.flatten()

  axs[0].plot(rewards)
  axs[0].set_xlabel("Episode")
  axs[0].set_ylabel("Total Reward")
  axs[0].set_title("Episode Reward")
  axs[0].grid(True)

  axs[1].plot(num_steps)
  axs[1].set_xlabel("Episode")
  axs[1].set_ylabel("Steps")
  axs[1].set_title("Episode Steps")
  axs[1].grid(True)

  if epsilons is not None:
    axs[2].plot(epsilons)
    axs[2].set_xlabel("Episode")
    axs[2].set_ylabel("Epsilon")
    axs[2].set_title("Epsilon Decay")
    axs[2].grid(True)
  else:
    axs[2].axis('off')

  fig.suptitle(f"{agent_desc}")
  fig.tight_layout()
  plt.show()

In [None]:
def train_agent(agent, env, episodes=500):
  rewards, num_steps, epsilons = list(), list(), list()
  step_counter = 0

  for ep in range(episodes):
    print(f"\r{ep+1}/{episodes} (Steps={step_counter})", end="")
    state, _ = env.reset()
    total_reward = 0
    step_counter = 0
    terminated, truncated = False, False

    while not (terminated or truncated):
      step_counter += 1
      action = agent.act(state)
      next_state, reward, terminated, truncated, info = env.step(action)
      agent.learn(state, action, reward, next_state)
      state = next_state
      total_reward += reward

    epsilons.append(getattr(agent, 'epsilon', None))
    num_steps.append(step_counter)
    rewards.append(total_reward)

  return rewards, num_steps, epsilons

## 1.1) Trap Corridor Environment

- A tunnel-shaped environment where agent walks through different cells, trying to reach to the goal cell.
- Reaching to goal state results in big reward, while every other movement is penalized with small negative rewards.
- Environment includes one trap cell aling the way, which produces small positive rewards for staying in it and a negative reward for leaving it towards the goal.

### **Layout**

``[ S0 ] -- [ S1 ] -- [S2] -- [S3]``

where:
- **S0**: Start state
- **S1**: **Trap** — leads to small positive reward before goal
- **S2**: Intermediate state
- **S3**: Goal — **terminal** state

### **Actions**
- `0 = Stay in place`
- `1 = Move right`
- `2 = Move left`

### **Transitions**

|State / Action |   0     |   1     |   2     |
|---------------|:-------:|:-------:|:-------:|
| S0            |   S0    |   S1    |   S0    |
| S1            |   S1    |   S2    |   S0    |
| S2            |   S2    |   S3    |   S1    |
| S3            |   Terminal                  |

### **Rewards**

All transitions are penalized with a negative reward -1, except:
- Staying in trap state rewarded +1,
- Going forward from reward state penalized -10,
- Going to terminal state rewarded +100.

|State / Action |     0   |     1   |     2   |
|---------------|:-------:|:-------:|:-------:|
| S0            |   -1    |   -1    |   -1    |
| S1            |   +1    |   -10   |   -1    |
| S2            |   -1    |   100   |   -1    |
| S3            |          Terminal           |

### **Termination**
- When reached to **S3** OR,
- Maximum 10 steps.

In [None]:
class TrapEnv:
    def __init__(self):
        self.state = 0  # Start at S
        self.terminal_state = 3
        self.num_states = 4
        self.num_actions = 3
        self.num_steps = 0
        self.max_steps = 10

    def reset(self, seed=None):
        self.num_steps = 0
        self.state = 0
        info = None
        return self.state, info

    def step(self, action):

        self.num_steps += 1

        if action == 0:
          next_state = self.state
        elif action == 1:
          next_state = self.state+1 if self.state < 3 else self.state
        elif action == 2:
          next_state = self.state-1 if self.state > 0 else self.state
        else:
          raise ValueError("Invalid action")

        if self.state == 1 and next_state == 1:
          reward = +1
        elif self.state == 1 and next_state == 2:
          reward = -10
        elif self.state == 2 and next_state == 3:
          reward = 100
        else:
          reward = -1

        terminated = next_state == self.terminal_state
        truncated = self.num_steps >= self.max_steps
        self.state = next_state
        
        info = None
        return next_state, reward, terminated, truncated, info

## 1.2) Agents

> Let's benchmark three different agents in this environment.

### 1.2.1) Random policy

In [None]:
class RandomAgent:
  """
  An agent that takes random actions.
  """
  def __init__(self, n_actions):
    self.n_actions = n_actions

  def act(self, state):
    return random.randint(0, self.n_actions - 1)

  def learn(self, state, action, reward, next_state):
    pass


In [None]:
env = TrapEnv()
agent = RandomAgent(n_actions=env.num_actions)

rewards, num_steps, _ = train_agent(agent, env, episodes=500)
show_results(agent, "Random Agent", rewards, num_steps)

### 1.2.2) Q-Learning

**Q-learning** is an off-policy reinforcement learning algorithm that learns the **optimal action-value function**:

Q(s, a) represents the expected cumulative reward by taking action $a$ in state $s$ and following the best policy after that.

> Update Rule

Q-value expectation are updated using the **Bellman Optimality Equiation**.

$$
Q(s, a) \leftarrow Q(s, a) + \alpha \left[ r + \gamma \cdot \max_{a'} Q(s', a') - Q(s, a) \right]
$$

Where:
- $ \alpha $: learning rate  
- $ \gamma $: discount factor  
- $ r $: immediate reward after taking action  
- $ s' $: next state  
- $ \max_{a'} Q(s', a') $: estimate of future rewards

---

> Greedy policy

$$
a_{t} = \arg\max_a Q(s, a)
$$




In [None]:
class QLearningAgent(RandomAgent):
    def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.99):
        super().__init__(n_actions)
        self.q_table = np.zeros((n_states, n_actions))
        self.alpha = alpha
        self.gamma = gamma

    def act(self, state):
        return int(np.argmax(self.q_table[state]))

    def learn(self, state, action, reward, next_state):
        best_next = np.max(self.q_table[next_state])
        td_target = reward + self.gamma * best_next
        td_error = td_target - self.q_table[state, action]
        self.q_table[state, action] += self.alpha * td_error

    def get_policy(self):
        return np.argmax(self.q_table, axis=1)


In [None]:
env = TrapEnv()
agent = QLearningAgent(n_states=env.num_states, n_actions=env.num_actions)

rewards, num_steps, _ = train_agent(agent, env, episodes=500)
show_results(agent, "Q Learning Agent", rewards, num_steps)

### 1.2.3) Epsilon-Greedy Policy

An agent needs balance between:
- Exploring unknown solutions.
- Exploiting previously explored solutions.

For a sufficient exploration, one of the most used techniques (particularly in value-based methods) is called **Epsilon-greeedy**.

By this, agent makes random decisions for explorations, and slowly becoming deterministic in the later stages of the training.

This procedure can be handled with two parameters:
- $\epsilon$: A parameter determining the randomness of the policy.
- $\epsilon$ decay rate: A parameter determining how fast the agent will become deterministic.

$$
a_t =
\begin{cases}
\text{random action} & \text{with probability } \epsilon, \\\\
\underset{a}{\arg\max}\, Q(s_t, a) & \text{with probability } 1 - \epsilon.
\end{cases}
$$

where, $\epsilon$ is decayed iteratively.

In [None]:
class QLearningEpsGreedyAgent(QLearningAgent):
    def __init__(self, n_states, n_actions, alpha=0.2, gamma=0.9, epsilon=0.99, eps_decay_rate=0.99):
        super().__init__(n_states, n_actions, alpha, gamma)
        self.epsilon = epsilon
        self.eps_decay_rate = eps_decay_rate

    def act(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.n_actions - 1)
        return np.argmax(self.q_table[state])

    def learn(self, state, action, reward, next_state):
        super().learn(state, action, reward, next_state)
        self.epsilon *= self.eps_decay_rate

In [None]:
env = TrapEnv()
agent = QLearningEpsGreedyAgent(n_states=env.num_states, n_actions=env.num_actions)

rewards, num_steps, epsilons = train_agent(agent, env, episodes=500)
show_results(agent, "Q Learning Epsilon Greedy Agent", rewards, num_steps, epsilons)

# Gym

[Gym](https://www.gymlibrary.dev/index.html) is a Python toolkit for developing and comparing reinforcement learning algorithms.

It provides:
- A wide range of standardized environments.
- A simple API: `env.reset()`, `env.step(action)`, `env.render()`
- Easy integration with RL libraries like Stable-Baselines3, RLlib, and others.


### Frozen Lake

<p align="center">
  <img src="https://gymnasium.farama.org/_images/frozen_lake.gif" width="200"/>
</p>

Frozen lake involves crossing a frozen lake from start to goal without falling into any holes by walking over the frozen lake. The player may not always move in the intended direction due to the slippery nature of the frozen lake. [Details](https://gymnasium.farama.org/environments/toy_text/frozen_lake/)

In [None]:
reset_seeds()
env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=False)
env.action_space.seed(EXP_SEED)
agent = QLearningEpsGreedyAgent(env.observation_space.n,
                                env.action_space.n,
                                epsilon=1.0,
                                eps_decay_rate=0.9999)

rewards, num_steps, epsilons = train_agent(agent, env, episodes=2000)
env.close()
show_results(agent, "Q Learning Epsilon Greedy Agent", rewards, num_steps, epsilons, print_q_table=False)

In [None]:
reset_seeds()
num_test_episodes = 10
env = gym.make('FrozenLake-v1', render_mode='human', map_name="4x4", is_slippery=False)

for _ in range(num_test_episodes):
  state, _ = env.reset()
  terminated, truncated = False, False
  step_counter = 0
  while not (terminated or truncated):
    step_counter += 1
    action = np.argmax(agent.q_table[state])
    state, reward, terminated, truncated, info = env.step(action)
    time.sleep(0.2)
env.close()

In [None]:
reset_seeds()
env = gym.make('FrozenLake-v1', map_name="8x8", is_slippery=False)
env.action_space.seed(EXP_SEED)
agent = QLearningEpsGreedyAgent(env.observation_space.n,
                                env.action_space.n,
                                epsilon=1.0,
                                eps_decay_rate=0.999999)

rewards, num_steps, epsilons = train_agent(agent, env, episodes=16000)
env.close()
show_results(agent, "Q Learning Epsilon Greedy Agent", rewards, num_steps, epsilons, print_q_table=False)

In [None]:
reset_seeds()
num_test_episodes = 10
env = gym.make('FrozenLake-v1', render_mode='human', map_name="8x8", is_slippery=False)

for _ in range(num_test_episodes):
  state, _ = env.reset()
  terminated, truncated = False, False
  step_counter = 0
  while not (terminated or truncated):
    step_counter += 1
    action = np.argmax(agent.q_table[state])
    state, reward, terminated, truncated, info = env.step(action)
    time.sleep(0.2)
env.close()

# Deep Q-Network (DQN)

DQN is a **value-based** RL algorithm that **approximates** the Q-function using a neural network.

<p align="center">
  <img src="https://www.baeldung.com/wp-content/uploads/sites/4/2023/04/dql-vs-ql-1.png" width="600"/>
</p>

The version we will see today (Double DQN) uses:
- An policy network $Q_{\text{policy}}$ to learn the Q-values
- A target network $Q_{\text{target}}$ to compute stable targets
- A replay buffer to store and sample past experiences

### Update Rule

Given a transition $(s, a, r, s{\prime}, \text{done})$, the target is:
$$
y = r + \gamma \cdot \max_{a{\prime}} Q_{\text{target}}(s{\prime}, a{\prime}) \cdot (1 - \text{done})
$$
The loss minimized is:
$$
L = \left( Q_{\text{policy}}(s, a) - y \right)^2
$$

Workflow:
1.	Store each experience in the replay buffer
2.	Sample a batch of transitions from the buffer
3.	Compute target values using the target network
4.	Update the policy network by minimizing the loss
5.	Periodically copy policy weights to the target network

In [None]:
# Environment config
NUM_AGENTS = 3
MAX_CYCLES = 10
NUM_EPISODES = 1500

# DQN Parameters
BUFFER_SIZE = 100000
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 10000
LR = 0.0005
TARGET_UPDATE_FREQ = 20
HIDDEN_SIZE = 128

In [None]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=HIDDEN_SIZE):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x)
    
Transition = namedtuple('Transition',
                        ('state', 'action', 'reward', 'next_state', 'done'))

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        actual_batch_size = min(batch_size, len(self.memory))
        return random.sample(self.memory, actual_batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class DQNAgent:
    def __init__(self, agent_id, state_size, action_size, device):
        self.agent_id_str = agent_id
        self.state_size = state_size
        self.action_size = action_size
        self.device = device

        # Q-Network & Target Network
        self.policy_net = QNetwork(state_size, action_size).to(device)
        self.target_net = QNetwork(state_size, action_size).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()

        # Optimizer
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=LR, amsgrad=True)

        # Each agent has its own buffer
        self.memory = ReplayBuffer(BUFFER_SIZE)

        # Epsilon-greedy parameters
        self.epsilon = EPS_START
        self.steps_done = 0 # use for decay

    def act(self, state):
        # Choose an action using epsilon-greedy policy.
        decay_steps = EPS_DECAY
        self.epsilon = max(EPS_END, EPS_START - (EPS_START - EPS_END) * (self.steps_done / decay_steps))
        # Note: steps_done is incremented globally in the training loop

        if random.random() > self.epsilon:
            with torch.no_grad():
                if not isinstance(state, np.ndarray):
                     state = np.array(state, dtype=np.float32)
                state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
                action_values = self.policy_net(state_tensor)
                action = np.argmax(action_values.cpu().data.numpy())
                return action
        else:
            return random.randrange(self.action_size)

    def learn(self):
        #Update policy network using samples from replay buffer
        if len(self.memory) < BATCH_SIZE:
            return None # Not enough samples yet

        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))

        # Convert batch arrays to tensors
        state_batch = torch.tensor(np.array(batch.state), dtype=torch.float32, device=self.device)
        action_batch = torch.tensor(batch.action, dtype=torch.long, device=self.device).unsqueeze(1)
        reward_batch = torch.tensor(batch.reward, dtype=torch.float32, device=self.device).unsqueeze(1)
        next_state_batch = torch.tensor(np.array(batch.next_state), dtype=torch.float32, device=self.device)
        done_batch = torch.tensor(batch.done, dtype=torch.float32, device=self.device).unsqueeze(1) # Boolean/int to float

        # Compute Q(s_t, a) from POLICY network
        q_values = self.policy_net(state_batch).gather(1, action_batch)

        # Compute V(s_{t+1}) using the TARGET network
        with torch.no_grad():
            next_q_values = self.target_net(next_state_batch).max(1)[0].unsqueeze(1)

        # Compute the expected Q values: R + gamma * max_a' Q_target(s', a') * (1 - done)
        expected_q_values = reward_batch + (GAMMA * next_q_values * (1 - done_batch))

        criterion = nn.SmoothL1Loss()
        loss = criterion(q_values, expected_q_values)

        # Training
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        self.optimizer.step()

        return loss.item()

    def update_target_net(self):
        # Copy weights from policy network to target network
        self.target_net.load_state_dict(self.policy_net.state_dict())

# PettingZoo

PettingZoo is a Python library for MARL environments, similar to how OpenAI Gym is used for single-agent RL.

It provides:
- A standard API for turn-based (AEC), simultaneous (parallel), and mixed environments
- A wide range of environments: from classic games to robotics and ecology
- Easy integration with RL libraries like Stable-Baselines3 and RLlib

Details [here](https://pettingzoo.farama.org).

## Simple Spread

<p align="center">
  <img src="https://pettingzoo.farama.org/_images/mpe_simple_spread.gif" width="200"/>
</p>

Agents must learn to cover all the landmarks while avoiding collisions.

- All agents are globally rewarded based on how far the closest agent is to each landmark (sum of the minimum distances).
- Locally, the agents are penalized if they collide with other agents (-1 for each collision). 
- The relative weights of these rewards can be controlled with the local_ratio parameter (as we do below).
- Agent observations: `[self_vel, self_pos, landmark_rel_positions, other_agent_rel_positions, communication]`
- Agent action space: `[no_action, move_left, move_right, move_down, move_up]`

In [None]:
def train(env):
    print("Starting Training with PettingZoo simple_spread_v3...")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    agent_ids = env.possible_agents
    sample_agent_id = agent_ids[0]
    observation_space = env.observation_space(sample_agent_id)
    action_space = env.action_space(sample_agent_id)
    print(f"Agent IDs: {agent_ids}")

    state_size = observation_space.shape[0]
    action_size = action_space.n
    print(f"State size: {state_size}, Action size: {action_size}")

    # dict of agents, PettingZoo ID : agent object
    agents = {agent_id: DQNAgent(agent_id, state_size, action_size, device)
              for agent_id in agent_ids}

    episode_rewards_history = []
    total_steps_global = 0
    
    for i_episode in range(1, NUM_EPISODES + 1):
        observations, infos = env.reset()
        current_episode_reward = 0
        episode_loss = 0.0
        steps_in_episode = 0

        for _ in range(MAX_CYCLES):
            total_steps_global += 1
            steps_in_episode += 1

            # For epsilon decay
            for agent in agents.values():
                 agent.steps_done = total_steps_global

            # Collect actions from all agents
            actions = {}
            for agent_id in env.agents: # env.agents lists currently active agents!1
                if agent_id in observations:
                    obs = observations[agent_id]
                    actions[agent_id] = agents[agent_id].act(obs)
                else:
                    pass

            next_observations, rewards, terminations, truncations, infos = env.step(actions)

            # Finished agents
            dones = {agent_id: terminations.get(agent_id, False) or truncations.get(agent_id, False) for agent_id in agents.keys()}

            # Store experience and learn for each agent that took an action
            loss_val_step = 0
            num_active_agents = 0
            active_agents_before_step = list(actions.keys())
            for agent_id in active_agents_before_step:
                # Ensure all necessary data exists
                if agent_id in observations and agent_id in actions and agent_id in rewards and agent_id in next_observations and agent_id in dones:
                    state = observations[agent_id]
                    action = actions[agent_id]
                    reward = sum(rewards.values()) / NUM_AGENTS
                    # We are using average global reward, doesn't have to be the case
                    next_state = next_observations[agent_id]
                    done = dones[agent_id]

                    # Push to specific agent's replay buffer
                    agents[agent_id].memory.push(state, action, reward, next_state, done)

                    # Perform learning step for the agent
                    loss = agents[agent_id].learn()
                    if loss is not None:
                        loss_val_step += loss
                        num_active_agents += 1

            observations = next_observations
            current_episode_reward += sum(rewards.values())
            if num_active_agents > 0:
                episode_loss += loss_val_step / num_active_agents

            if not env.agents: # iF no agents left in the environment
                 break

        # --- End of Episode ---
        episode_rewards_history.append(current_episode_reward)
        avg_loss = episode_loss / steps_in_episode if steps_in_episode > 0 else 0

        # Update target network periodically (parameterized above)
        if i_episode % TARGET_UPDATE_FREQ == 0:
            for agent in agents.values():
                agent.update_target_net()

        # Print progress
        if i_episode % 20 == 0:
             avg_reward = np.mean(episode_rewards_history[-20:])
             current_epsilon = agents[sample_agent_id].epsilon
             print(f"\rEp {i_episode}/{NUM_EPISODES} | Avg Reward (last 20): {avg_reward:.2f} | Last Reward: {current_episode_reward:.2f} | Avg Loss: {avg_loss:.4f} | Epsilon: {current_epsilon:.3f} | Steps: {total_steps_global}", end="")

    env.close()
    print("\n\nTraining finished.")
    return agents

In [None]:
env = simple_spread_v3.parallel_env(N=NUM_AGENTS, local_ratio=0.5,
                                    max_cycles=MAX_CYCLES,
                                    continuous_actions=False)
    
trained_agents_dict = train(env)

In [None]:
def showcase(trained_agents, num_episodes=3):
    print("\n--- Showcasing Trained Policy ---")
    if not trained_agents:
        print("No trained agents provided.")
        return

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    try:
        env = simple_spread_v3.parallel_env(N=NUM_AGENTS, local_ratio=0.5,
                                            max_cycles=MAX_CYCLES,
                                            continuous_actions=False,
                                            render_mode="human")
    except Exception as e:
        print(f"Error creating environment for rendering (ensure pygame is installed): {e}")
        print("Skipping showcase.")
        return

    agents_to_showcase = trained_agents

    for i_episode in range(num_episodes):
        print(f"\nShowcase Episode {i_episode + 1}")
        observations, infos = env.reset()
        total_reward_episode = 0

        for step in range(MAX_CYCLES):
            env.render()
            time.sleep(0.05)

            actions = {}
            current_agents = env.agents # active agents for this step
            if not current_agents:
                break

            for agent_id in current_agents:
                 if agent_id in observations:
                     state = observations[agent_id]
                     # Choose action GREEDILY
                     with torch.no_grad():
                         if not isinstance(state, np.ndarray):
                             state = np.array(state, dtype=np.float32)
                         state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
                         # Use the policy_nets
                         action_values = agents_to_showcase[agent_id].policy_net(state_tensor)
                         action = np.argmax(action_values.cpu().data.numpy())
                         actions[agent_id] = action


            if actions:
                 next_observations, rewards, terminations, truncations, infos = env.step(actions)
                 observations = next_observations
                 step_reward = sum(rewards.values())
                 total_reward_episode += step_reward
            else:
                 break


        print(f"End of Showcase Episode {i_episode + 1}. Total Reward: {total_reward_episode:.2f}")

    env.close()

In [None]:
showcase(trained_agents_dict)