# Assignment 2 – CSCN 8020: Reinforcement Learning

## Q-Learning and Deep Q-Network (DQN) on Taxi-v3

### Overview
This notebook implements and compares two reinforcement learning algorithms:
- **Q-Learning**: A model-free, value-based learning algorithm
- **Deep Q-Network (DQN)**: A neural network-based approach to Q-Learning using experience replay

Both algorithms are trained on the **Taxi-v3** environment from OpenAI Gymnasium.

### Environment Description
The Taxi-v3 environment is a 5x5 grid where:
- A taxi must pick up a passenger at a random location and drop them off at their destination
- Actions: South, North, East, West, Pickup, Dropoff (6 total)
- State space: 500 discrete states (taxi position, passenger location, destination)
- Rewards: +20 for successful drop-off, -1 for each step taken, -10 for illegal pickup/dropoff

## Setup and Installation

Install required dependencies:

In [1]:
# Install required packages
import subprocess
import sys

packages = [
    'gymnasium>=0.29.0',
    'numpy>=1.24.0',
    'matplotlib>=3.7.0',
    'torch>=2.0.0'
]

for package in packages:
    try:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', package])
        print(f"✓ {package} installed")
    except:
        print(f"✗ Failed to install {package}")

✓ gymnasium>=0.29.0 installed
✓ numpy>=1.24.0 installed
✓ matplotlib>=3.7.0 installed
✓ torch>=2.0.0 installed


## Part 1: Environment Utilities

These utility functions help us load the Taxi-v3 environment and understand its structure.

In [2]:
import gymnasium as gym
import numpy as np


def load_environment(render_mode=None):
    """Load and return the Taxi-v3 Gymnasium environment."""
    env = gym.make("Taxi-v3", render_mode=render_mode)
    return env


def print_env_info(env):
    """Print basic information about the environment."""
    print("=" * 50)
    print("Taxi-v3 Environment Info")
    print("=" * 50)
    print(f"Action Space:       {env.action_space}")
    print(f"Observation Space:  {env.observation_space}")
    print(f"Number of States:   {env.observation_space.n}")
    print(f"Number of Actions:  {env.action_space.n}")
    print()
    action_meanings = {
        0: "Move South (down)",
        1: "Move North (up)",
        2: "Move East (right)",
        3: "Move West (left)",
        4: "Pickup passenger",
        5: "Drop off passenger",
    }
    print("Action Meanings:")
    for a, desc in action_meanings.items():
        print(f"  {a}: {desc}")
    print()


def decode_state(state: int):
    """
    Decode a state scalar into its components.
    State encoding: ((taxi_row * 5 + taxi_col) * 5 + passenger_loc) * 4 + destination

    Parameters
    ----------
    state : int
        Encoded state value (0 – 499)

    Returns
    -------
    dict with taxi_row, taxi_col, passenger_loc, destination
    """
    destination = state % 4
    state //= 4
    passenger_loc = state % 5
    state //= 5
    taxi_col = state % 5
    taxi_row = state // 5

    passenger_map = {0: "Red", 1: "Green", 2: "Yellow", 3: "Blue", 4: "In Taxi"}
    destination_map = {0: "Red", 1: "Green", 2: "Yellow", 3: "Blue"}

    info = {
        "taxi_row": taxi_row,
        "taxi_col": taxi_col,
        "passenger_loc": passenger_map[passenger_loc],
        "destination": destination_map[destination],
    }
    return info


def print_state_info(state: int):
    """Print a human-readable description of a state scalar."""
    info = decode_state(state)
    print(f"State {state}:")
    print(f"  Taxi Position  : row={info['taxi_row']}, col={info['taxi_col']}")
    print(f"  Passenger Loc  : {info['passenger_loc']}")
    print(f"  Destination    : {info['destination']}")
    print()

### Test Environment Loading

In [3]:
# Load and inspect the environment
env = load_environment()
print_env_info(env)
obs, _ = env.reset()
print(f"Initial observation: {obs}")
print_state_info(obs)
env.close()

Taxi-v3 Environment Info
Action Space:       Discrete(6)
Observation Space:  Discrete(500)
Number of States:   500
Number of Actions:  6

Action Meanings:
  0: Move South (down)
  1: Move North (up)
  2: Move East (right)
  3: Move West (left)
  4: Pickup passenger
  5: Drop off passenger

Initial observation: 473
State 473:
  Taxi Position  : row=4, col=3
  Passenger Loc  : Blue
  Destination    : Green



## Part 2: Q-Learning Implementation

### Algorithm Overview
Q-Learning is an off-policy, value-based reinforcement learning algorithm. It learns the optimal action-value function (Q-function) that maps state-action pairs to expected future rewards.

**Update Rule:**
$$Q(s,a) \leftarrow Q(s,a) + \alpha \left[r + \gamma \max_{a'} Q(s',a') - Q(s,a)\right]$$

Where:
- $\alpha$ is the learning rate
- $\gamma$ is the discount factor
- $r$ is the immediate reward
- $s'$ is the next state

In [4]:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import os
import json


def train_qlearning(
    alpha: float = 0.1,
    epsilon: float = 0.1,
    gamma: float = 0.9,
    n_episodes: int = 10_000,
    max_steps: int = 200,
    seed: int = 42,
):
    """
    Train a Q-Learning agent on Taxi-v3.

    Returns
    -------
    Q : np.ndarray  – final Q-table (500 x 6)
    metrics : dict  – episode returns, steps, and running averages
    """
    env = load_environment()
    n_states = env.observation_space.n   # 500
    n_actions = env.action_space.n       # 6

    rng = np.random.default_rng(seed)
    Q = np.zeros((n_states, n_actions))

    episode_returns = []
    episode_steps   = []

    for ep in range(n_episodes):
        obs, _ = env.reset(seed=seed + ep)
        total_reward = 0.0

        for step in range(max_steps):
            # ε-greedy action selection
            if rng.random() < epsilon:
                action = env.action_space.sample()
            else:
                action = int(np.argmax(Q[obs]))

            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Q-Learning update
            best_next = np.max(Q[next_obs])
            Q[obs, action] += alpha * (reward + gamma * best_next - Q[obs, action])

            obs = next_obs
            total_reward += reward

            if done:
                break

        episode_returns.append(total_reward)
        episode_steps.append(step + 1)
        
        if (ep + 1) % 1000 == 0:
            print(f"Episode {ep + 1}/{n_episodes} completed")

    env.close()

    # Compute 100-episode rolling average
    avg_returns = np.convolve(episode_returns, np.ones(100) / 100, mode="valid")
    avg_steps   = np.convolve(episode_steps,   np.ones(100) / 100, mode="valid")

    metrics = {
        "episode_returns": episode_returns,
        "episode_steps":   episode_steps,
        "avg_returns":     avg_returns.tolist(),
        "avg_steps":       avg_steps.tolist(),
        "total_episodes":  n_episodes,
    }
    return Q, metrics

### Metrics and Visualization Functions

In [5]:
def plot_metrics(metrics_dict: dict, title_suffix: str, save_dir: str = "plots"):
    os.makedirs(save_dir, exist_ok=True)

    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    for label, m in metrics_dict.items():
        x = range(1, len(m["avg_returns"]) + 1)
        axes[0].plot(x, m["avg_returns"], label=label)
        axes[1].plot(x, m["avg_steps"],   label=label)

    axes[0].set_title("Average Return per Episode (100-ep window)")
    axes[0].set_xlabel("Episode")
    axes[0].set_ylabel("Average Return")
    axes[0].legend()
    axes[0].grid(True)

    axes[1].set_title("Average Steps per Episode (100-ep window)")
    axes[1].set_xlabel("Episode")
    axes[1].set_ylabel("Average Steps")
    axes[1].legend()
    axes[1].grid(True)

    plt.tight_layout()
    fname = os.path.join(save_dir, f"qlearning_{title_suffix}.png")
    plt.savefig(fname, dpi=150)
    plt.close()
    print(f"  Saved plot → {fname}")


def summarise(label: str, metrics: dict):
    print(f"\n[{label}]")
    print(f"  Total episodes       : {metrics['total_episodes']}")
    print(f"  Mean steps/episode   : {np.mean(metrics['episode_steps']):.2f}")
    print(f"  Mean return/episode  : {np.mean(metrics['episode_returns']):.2f}")
    print(f"  Final 100-ep return  : {metrics['avg_returns'][-1]:.2f}")

### Train Q-Learning Agent - Baseline

Train with default hyperparameters: α=0.1, ε=0.1, γ=0.9

In [6]:
print("=== Q-Learning: Baseline α=0.1, ε=0.1, γ=0.9 ===")
Q_base, m_base = train_qlearning(alpha=0.1, epsilon=0.1, gamma=0.9, n_episodes=5_000)
summarise("Baseline Q-Learning", m_base)

=== Q-Learning: Baseline α=0.1, ε=0.1, γ=0.9 ===
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[Baseline Q-Learning]
  Total episodes       : 5000
  Mean steps/episode   : 30.45
  Mean return/episode  : -21.63
  Final 100-ep return  : 1.68


### Hyperparameter Sweep - Learning Rate

In [7]:
print("\n=== Q-Learning: Learning Rate Sweep ===")
lr_metrics = {}

for alpha in [0.01, 0.05, 0.1, 0.2]:
    label = f"α={alpha}"
    print(f"\nTraining {label} …")
    _, m = train_qlearning(alpha=alpha, epsilon=0.1, gamma=0.9, n_episodes=5_000)
    summarise(label, m)
    lr_metrics[label] = m

# Plot comparison
plot_metrics(lr_metrics, "learning_rate_sweep", "plots")


=== Q-Learning: Learning Rate Sweep ===

Training α=0.01 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[α=0.01]
  Total episodes       : 5000
  Mean steps/episode   : 126.80
  Mean return/episode  : -160.15
  Final 100-ep return  : -71.33

Training α=0.05 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[α=0.05]
  Total episodes       : 5000
  Mean steps/episode   : 44.26
  Mean return/episode  : -41.39
  Final 100-ep return  : 2.55

Training α=0.1 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[α=0.1]
  Total episodes       : 5000
  Mean steps/episode   : 30.35
  Mean return/episode  : -21.41
  Final 100-ep return  : 2.23

Training α=0.2 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode

### Hyperparameter Sweep - Exploration Rate (ε)

In [8]:
print("\n=== Q-Learning: Exploration Factor (ε) Sweep ===")
eps_metrics = {}

for epsilon in [0.05, 0.1, 0.2, 0.3]:
    label = f"ε={epsilon}"
    print(f"\nTraining {label} …")
    _, m = train_qlearning(alpha=0.1, epsilon=epsilon, gamma=0.9, n_episodes=5_000)
    summarise(label, m)
    eps_metrics[label] = m

# Plot comparison
plot_metrics(eps_metrics, "exploration_sweep", "plots")


=== Q-Learning: Exploration Factor (ε) Sweep ===

Training ε=0.05 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[ε=0.05]
  Total episodes       : 5000
  Mean steps/episode   : 29.57
  Mean return/episode  : -17.69
  Final 100-ep return  : 4.80

Training ε=0.1 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[ε=0.1]
  Total episodes       : 5000
  Mean steps/episode   : 30.43
  Mean return/episode  : -21.59
  Final 100-ep return  : 2.38

Training ε=0.2 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Episode 3000/5000 completed
Episode 4000/5000 completed
Episode 5000/5000 completed

[ε=0.2]
  Total episodes       : 5000
  Mean steps/episode   : 32.89
  Mean return/episode  : -32.79
  Final 100-ep return  : -5.41

Training ε=0.3 …
Episode 1000/5000 completed
Episode 2000/5000 completed
Epi

## Part 3: Deep Q-Network (DQN) Implementation

### Algorithm Overview
DQN uses a neural network to approximate the Q-function instead of storing a lookup table. Key improvements:
- **Experience Replay**: Store transitions and sample random batches for training to break temporal correlations
- **Target Network**: Use a separate network for computing target Q-values to improve stability
- **One-hot Encoding**: Convert discrete states to binary vectors for neural network input

**Loss Function:**
$$L = (Q(s,a) - (r + \gamma \max_{a'} Q_{target}(s',a')))^2$$

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque


class DQN(nn.Module):
    """Fully-connected Q-Network for discrete state/action spaces."""
    def __init__(self, n_states: int, n_actions: int, hidden: int = 128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_states, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, n_actions),
        )

    def forward(self, x):
        return self.net(x)


class ReplayBuffer:
    """Experience replay buffer for storing and sampling transitions."""
    def __init__(self, capacity: int = 10_000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards, dtype=np.float32),
            np.array(next_states),
            np.array(dones, dtype=np.float32),
        )

    def __len__(self):
        return len(self.buffer)


def one_hot(state: int, n_states: int) -> np.ndarray:
    """Convert discrete state to one-hot encoded vector."""
    v = np.zeros(n_states, dtype=np.float32)
    v[state] = 1.0
    return v

### DQN Training Function

In [10]:
def train_dqn(
    alpha: float  = 1e-3,
    epsilon_start: float = 1.0,
    epsilon_end:   float = 0.05,
    epsilon_decay: int   = 5_000,
    gamma: float   = 0.9,
    n_episodes: int = 2_000,
    max_steps:  int = 200,
    batch_size: int = 64,
    target_update: int = 100,
    buffer_capacity: int = 10_000,
    seed: int = 42,
):
    """Train a DQN agent on Taxi-v3 and return training metrics."""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    env = load_environment()
    n_states  = env.observation_space.n   # 500
    n_actions = env.action_space.n        # 6

    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    policy_net = DQN(n_states, n_actions).to(device)
    target_net = DQN(n_states, n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=alpha)
    buffer    = ReplayBuffer(capacity=buffer_capacity)

    episode_returns = []
    episode_steps   = []
    global_step     = 0

    for ep in range(n_episodes):
        obs, _ = env.reset(seed=seed + ep)
        total_reward = 0.0

        for step in range(max_steps):
            # Epsilon-greedy exploration with decay
            epsilon = epsilon_end + (epsilon_start - epsilon_end) * np.exp(
                -global_step / epsilon_decay
            )

            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                state_t = torch.tensor(one_hot(obs, n_states),
                                       dtype=torch.float32).unsqueeze(0).to(device)
                with torch.no_grad():
                    action = int(policy_net(state_t).argmax(dim=1).item())

            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            buffer.push(one_hot(obs, n_states), action, reward,
                        one_hot(next_obs, n_states), float(done))

            obs = next_obs
            total_reward += reward
            global_step  += 1

            # Experience Replay: Sample and train on batch
            if len(buffer) >= batch_size:
                states_b, actions_b, rewards_b, next_states_b, dones_b = buffer.sample(batch_size)

                states_t      = torch.tensor(states_b,      dtype=torch.float32).to(device)
                actions_t     = torch.tensor(actions_b,     dtype=torch.long).to(device)
                rewards_t     = torch.tensor(rewards_b,     dtype=torch.float32).to(device)
                next_states_t = torch.tensor(next_states_b, dtype=torch.float32).to(device)
                dones_t       = torch.tensor(dones_b,       dtype=torch.float32).to(device)

                current_q = policy_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)
                with torch.no_grad():
                    max_next_q = target_net(next_states_t).max(dim=1).values
                    target_q   = rewards_t + gamma * max_next_q * (1 - dones_t)

                loss = nn.functional.mse_loss(current_q, target_q)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            # Update target network
            if global_step % target_update == 0:
                target_net.load_state_dict(policy_net.state_dict())

            if done:
                break

        episode_returns.append(total_reward)
        episode_steps.append(step + 1)
        
        if (ep + 1) % 500 == 0:
            print(f"Episode {ep + 1}/{n_episodes} completed")

    env.close()

    avg_returns = np.convolve(episode_returns, np.ones(100) / 100, mode="valid")
    avg_steps   = np.convolve(episode_steps,   np.ones(100) / 100, mode="valid")

    metrics = {
        "episode_returns": episode_returns,
        "episode_steps":   episode_steps,
        "avg_returns":     avg_returns.tolist(),
        "avg_steps":       avg_steps.tolist(),
        "total_episodes":  n_episodes,
    }

    torch.save(policy_net.state_dict(), "dqn_model.pth")
    print("\nDQN model saved → dqn_model.pth")
    return policy_net, metrics

### DQN Visualization and Analysis

In [11]:
def plot_dqn_metrics(metrics: dict, save_dir: str = "plots"):
    """Plot DQN training metrics."""
    os.makedirs(save_dir, exist_ok=True)
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))

    x = range(1, len(metrics["avg_returns"]) + 1)
    axes[0].plot(x, metrics["avg_returns"], color="steelblue")
    axes[0].set_title("DQN – Average Return (100-ep window)")
    axes[0].set_xlabel("Episode")
    axes[0].set_ylabel("Average Return")
    axes[0].grid(True)

    axes[1].plot(x, metrics["avg_steps"], color="darkorange")
    axes[1].set_title("DQN – Average Steps (100-ep window)")
    axes[1].set_xlabel("Episode")
    axes[1].set_ylabel("Average Steps")
    axes[1].grid(True)

    plt.tight_layout()
    fname = os.path.join(save_dir, "dqn_training.png")
    plt.savefig(fname, dpi=150)
    plt.close()
    print(f"  Saved plot → {fname}")


def summarise_dqn(metrics: dict):
    """Print DQN training summary."""
    print("\n[DQN Results]")
    print(f"  Total episodes       : {metrics['total_episodes']}")
    print(f"  Mean steps/episode   : {np.mean(metrics['episode_steps']):.2f}")
    print(f"  Mean return/episode  : {np.mean(metrics['episode_returns']):.2f}")
    print(f"  Final 100-ep return  : {metrics['avg_returns'][-1]:.2f}")

### Train DQN Agent

In [12]:
print("\n=== Training Deep Q-Network on Taxi-v3 ===")
model, dqn_metrics = train_dqn(n_episodes=2_000)
summarise_dqn(dqn_metrics)
plot_dqn_metrics(dqn_metrics, "plots")

# Save metrics
with open("dqn_metrics.json", "w") as f:
    json.dump(dqn_metrics, f, indent=2)
print("DQN metrics saved → dqn_metrics.json")


=== Training Deep Q-Network on Taxi-v3 ===
Using device: cpu
Episode 500/2000 completed
Episode 1000/2000 completed
Episode 1500/2000 completed
Episode 2000/2000 completed

DQN model saved → dqn_model.pth

[DQN Results]
  Total episodes       : 2000
  Mean steps/episode   : 69.65
  Mean return/episode  : -69.70
  Final 100-ep return  : -0.23
  Saved plot → plots\dqn_training.png
DQN metrics saved → dqn_metrics.json


## Part 4: Comparison - Q-Learning vs DQN

Compare the performance of Q-Learning and DQN approaches:

In [13]:
# Create comparison plot
def plot_comparison(q_metrics, dqn_metrics, save_dir="plots"):
    """Compare Q-Learning and DQN performance."""
    os.makedirs(save_dir, exist_ok=True)
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    x_q = range(1, len(q_metrics["avg_returns"]) + 1)
    x_dqn = range(1, len(dqn_metrics["avg_returns"]) + 1)
    
    axes[0].plot(x_q, q_metrics["avg_returns"], label="Q-Learning", linewidth=2)
    axes[0].plot(x_dqn, dqn_metrics["avg_returns"], label="DQN", linewidth=2)
    axes[0].set_title("Average Return Comparison")
    axes[0].set_xlabel("Episode")
    axes[0].set_ylabel("Average Return")
    axes[0].legend()
    axes[0].grid(True)
    
    axes[1].plot(x_q, q_metrics["avg_steps"], label="Q-Learning", linewidth=2)
    axes[1].plot(x_dqn, dqn_metrics["avg_steps"], label="DQN", linewidth=2)
    axes[1].set_title("Average Steps Comparison")
    axes[1].set_xlabel("Episode")
    axes[1].set_ylabel("Average Steps")
    axes[1].legend()
    axes[1].grid(True)
    
    plt.tight_layout()
    fname = os.path.join(save_dir, "ql_vs_dqn.png")
    plt.savefig(fname, dpi=150)
    plt.close()
    print(f"Comparison plot saved → {fname}")

# Note: Use the baseline Q-Learning metrics from earlier
print("\n=== Q-Learning vs DQN Comparison ===")
plot_comparison(m_base, dqn_metrics, "plots")

print("\n[Q-Learning Performance]")
print(f"  Final 100-ep return: {m_base['avg_returns'][-1]:.2f}")
print(f"  Mean steps/episode: {np.mean(m_base['episode_steps']):.2f}")

print("\n[DQN Performance]")
print(f"  Final 100-ep return: {dqn_metrics['avg_returns'][-1]:.2f}")
print(f"  Mean steps/episode: {np.mean(dqn_metrics['episode_steps']):.2f}")


=== Q-Learning vs DQN Comparison ===
Comparison plot saved → plots\ql_vs_dqn.png

[Q-Learning Performance]
  Final 100-ep return: 1.68
  Mean steps/episode: 30.45

[DQN Performance]
  Final 100-ep return: -0.23
  Mean steps/episode: 69.65


## Summary

### Key Findings

**Q-Learning:**
- Simple tabular approach storing Q-values for all state-action pairs
- Converges relatively quickly on discrete, small state spaces
- Low memory overhead for Taxi-v3 (500 states × 6 actions)
- Hyperparameter sensitivity: α and ε significantly impact learning

**Deep Q-Network (DQN):**
- Neural network approximates Q-function for scalability
- Experience replay and target networks stabilize learning
- Handles high-dimensional state spaces better than Q-Learning
- More complex but more generalizable to larger problems

### Hyperparameter Analysis
- **Learning Rate (α)**: Impacts convergence speed and stability
- **Exploration Rate (ε)**: Balance between exploration and exploitation
- **Discount Factor (γ)**: Controls importance of future rewards

Both algorithms successfully learn to solve the Taxi-v3 problem. The choice between them depends on problem complexity and available computational resources.