<a href="https://colab.research.google.com/github/Hedgemon4/mountain_car_example/blob/main/dlr_dqn_mountaincar_workshop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DLRL Summer School Workshop

## Implementing Deep Q‑Networks (DQN) in PyTorch

Welcome\! In this 30‑45  minute workshop you’ll learn how to implement, train, **and actually watch** a Deep Q‑Network solve the classic control problem *MountainCar‑v0*.

This notebook is designed for those new to Deep Reinforcement Learning (DRL). We'll go step-by-step through the process of building a DRL agent, starting from the foundational concepts and culminating in a trained model that can successfully solve the task. We'll focus on not just *what* the code does, but *why* it's designed the way it is.

**Learning objectives**

1.  Understand the Gymnasium API and the MountainCar environment.
2.  Code a random baseline agent and benchmark its performance.
3.  Examine the environment’s state space and design a neural‑network encoder.
4.  Implement the core building blocks of DQN:
      * Replay buffer
      * Target network
      * ε‑greedy exploration
5.  Train the agent end‑to‑end and visualise learning curves & roll‑outs.
6.  Briefly extend the solution with *frame stacking* and propose further experiments.


## 0 — Setup

Run the next cell **once** to install the libraries if you’re in Colab or a fresh environment. If you’re on an offline cluster that already has PyTorch & Gymnasium you can skip it.

In [None]:

!pip install gymnasium[classic_control] torch numpy matplotlib tqdm
!pip install moviepy


### Imports & global configuration

Here, we import the necessary libraries. We'll be using:

  * `gymnasium` for the MountainCar environment.
  * `torch` (PyTorch) for building and training our neural network.
  * `numpy` for numerical operations.
  * `matplotlib` for plotting our results.
  * Other standard Python libraries for utilities.

We also set a `SEED` for all random number generators. This is a crucial step in research to ensure that our results are **reproducible**. Anyone running this notebook should get the exact same results. Finally, we set up our device to use a GPU (`cuda`) if available, which will significantly speed up training. Otherwise, it will default to the CPU.


In [None]:
# Core libraries
import gymnasium as gym
import math, random, itertools, collections, os, copy, pathlib, sys, time, json
from datetime import datetime
from dataclasses import dataclass

# Data manipulation and deep learning
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Plotting and visualization
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML, display

# --- Reproducibility helpers ---
# Set a seed for all sources of randomness to ensure that the results are the same every time we run the code.
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# --- Device configuration ---
# Check if a CUDA-enabled GPU is available for faster computation, otherwise use the CPU.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

## 1 — The MountainCar Environment

MountainCar is a *sparse‑reward* task: a small car must build momentum to climb a hill. The agent receives **-1 reward** for each time-step it does not reach the goal. The goal is to reach the flag on the right hill. Because the car's engine is not strong enough to drive directly up the hill, it must learn to drive back and forth to build up enough momentum. This makes it a classic challenge in reinforcement learning.

We'll use the `gymnasium` library, which provides a standardized interface for reinforcement learning environments.

In [None]:
# Initialize the MountainCar-v0 environment.
# "render_mode" is set to "rgb_array" so we can capture frames for visualization.
env = gym.make("MountainCar-v0", render_mode="rgb_array")

# The observation space defines the structure of the state received from the environment.
# For MountainCar, it's a 2D vector: [position, velocity].
print("Observation space:", env.observation_space)

# The action space defines the set of possible actions the agent can take.
# For MountainCar, there are 3 discrete actions: 0 (push left), 1 (do nothing), 2 (push right).
print("Action space      :", env.action_space)

# Reset the environment to get an initial state.
# env.reset() returns the initial observation and optional info.
state, _ = env.reset(seed=SEED)
state

## 2 — Random Policy Baseline

Before diving into complex algorithms like DQN, it's essential to establish a **baseline**. A simple, yet effective baseline is to see how well a completely random agent performs. This gives us a lower bound on performance; our sophisticated agent should, at the very least, perform better than random chance.

We'll create a function `run_episode` that takes an environment and a policy function. The policy function decides which action to take given a state. For our random baseline, the policy will simply be to choose a random action at every step.

In [None]:
def run_episode(env, policy_fn, render=False, max_steps=200):
    """
    Runs a single episode in the environment following a given policy.

    Args:
        env: The Gymnasium environment.
        policy_fn: A function that takes a state and returns an action.
        render: If True, captures frames for rendering.
        max_steps: The maximum number of steps per episode.

    Returns:
        The total reward accumulated during the episode and a list of frames if render=True.
    """
    state, _ = env.reset()
    total_reward = 0
    frames = []

    # Loop for the maximum duration of an episode
    for t in range(max_steps):
        if render:
            # Render the environment and store the frame
            frame = env.render()
            frames.append(frame)

        # Get an action from the policy function
        action = policy_fn(state)
        # Take the action in the environment
        next_state, reward, terminated, truncated, _ = env.step(action)

        total_reward += reward
        state = next_state

        # An episode ends if the agent reaches the goal (terminated) or the max steps are reached (truncated)
        if terminated or truncated:
            break

    return total_reward, frames

In [None]:
# Define a random policy: it ignores the state and returns a random action.
rand_policy = lambda s: env.action_space.sample()

# Run 50 episodes with the random policy to get a stable average performance.
returns = [run_episode(env, rand_policy)[0] for _ in range(50)]
print(f'Random policy - average return over 50 episodes: {np.mean(returns):.2f}')

As you can see, the random policy performs poorly, never managing to reach the goal and accumulating a penalty of -200 (the maximum number of steps). Now, let's see what that looks like.

In [None]:
# Render one random episode as a GIF to visualize its behavior.
_, frames = run_episode(env, rand_policy, render=True)

def display_frames_as_gif(frames, interval=40):
    """
    Displays a list of frames as a GIF in the notebook.
    """
    fig = plt.figure(figsize=(frames[0].shape[1]/80, frames[0].shape[0]/80), dpi=80)
    plt.axis('off')
    imgs = [[plt.imshow(f, animated=True)] for f in frames]
    ani = animation.ArtistAnimation(fig, imgs, interval=interval, blit=True)
    plt.close(fig) # Avoid displaying the static plot
    display(HTML(ani.to_jshtml()))

display_frames_as_gif(frames)

The car just wiggles around in the valley, which is what we'd expect. It never builds the momentum needed to climb the hill.

## 3 — State‑Space Analysis

The observation from the environment is a 2‑D vector containing the car's `[position, velocity]`. This is a continuous state space. Since neural networks are universal function approximators, they are well-suited to handle such continuous inputs.

We will feed this 2D state vector directly into a small Multi‑Layer Perceptron (MLP). The MLP's job will be to learn a mapping from any given state to the expected future rewards for each of the 3 possible actions. These expected rewards are called **Q-values**.

Let's examine the range of values for position and velocity. This can be useful for normalization or for understanding the scale of our inputs, though for this simple MLP, it's not strictly necessary.

In [None]:
# Get the lower and upper bounds of the observation space
obs_low, obs_high = env.observation_space.low, env.observation_space.high
print('Position range :', obs_low[0], '→', obs_high[0])
print('Velocity range :', obs_low[1], '→', obs_high[1])

## 4 — Deep Q‑Networks (DQN) Recap

The core idea of DQN is to approximate the optimal action‑value function, **Q\*(s,a)**, using a neural network. This function tells us the expected total future reward if we are in state *s*, take action *a*, and then continue to act optimally thereafter.

Training a neural network with data generated from exploration in an RL environment is notoriously unstable. DQN introduces three key innovations to solve this:

1.  **Replay Buffer**: This stores past experiences (state, action, reward, next\_state). When we train the network, we sample random mini-batches from this buffer. This breaks the temporal correlation between consecutive samples, making the training data more like the independent and identically distributed (IID) data that neural networks are designed for.
2.  **Target Network**: This is a separate, frozen copy of our main Q-network. It is used to calculate the target Q-values for our loss function. By keeping its weights fixed for a period of time, it provides a stable target, preventing the network from chasing a moving target, which can lead to oscillations and divergence.
3.  **ε‑greedy Exploration**: This is a simple yet effective strategy to balance exploration (trying new actions) and exploitation (taking the best-known action). With probability ε, we take a random action; otherwise, we take the action with the highest Q-value according to our network. We typically start with a high ε and gradually decrease it as we learn more about the environment.

The loss function for DQN is the mean squared temporal‑difference (TD) error:

$$L(\theta) = \mathbb{E}_{(s,a,r,s') \sim U(B)} \left[ \left( \underbrace{r + \gamma \max_{a'} Q_{\theta^-}(s', a')}_{\text{TD Target}} - \underbrace{Q_{\theta}(s, a)}_{\text{Current Q-value}} \right)^2 \right]$$

Where:

  * $\theta$ are the weights of our main Q-network.
  * $\theta^-$ are the weights of our stable target network.
  * $(s, a, r, s')$ is a transition sampled from the replay buffer $B$.
  * $\gamma$ is the discount factor, which determines how much we value future rewards.


### Hyperparameters

We will now define all our hyperparameters in one place using a `dataclass`. This makes it easy to see and modify the configuration for an experiment.

In [None]:
@dataclass
class Args:
    # --- Experiment ---
    num_episodes: int = 1200
    max_steps_per_episode: int = 200

    # --- DQN Algorithm ---
    lr: float = 1e-3
    gamma: float = 0.99
    buffer_capacity: int = 50_000
    batch_size: int = 64
    target_update_freq: int = 1000 # in steps
    learning_starts: int = 1000 # in steps

    # --- Epsilon-Greedy ---
    eps_start: float = 1.0
    eps_end: float = 0.01
    eps_decay: float = 10000

# Instantiate the arguments
args = Args()

### The Q-Network Architecture

The observation from the environment is a 2‑D vector containing the car's `[position, velocity]`. This is a continuous state space. We will feed this 2D state vector directly into a small Multi‑Layer Perceptron (MLP). The MLP's job will be to learn a mapping from any given state to the expected future rewards (Q-values) for each of the 3 possible actions.

We'll use a simple MLP with two hidden layers of 128 neurons each and a ReLU activation function.

In [None]:
class QNetwork(nn.Module):
    """A simple Multi-Layer Perceptron (MLP) to approximate Q-values."""
    def __init__(self, obs_dim, n_actions, hidden_sizes=(128, 128)):
        """
        Args:
            obs_dim (int): The dimension of the observation space.
            n_actions (int): The number of possible actions.
            hidden_sizes (tuple): A tuple containing the size of each hidden layer.
        """
        super().__init__()
        layers = []
        last_dim = obs_dim
        # Create the hidden layers
        for h in hidden_sizes:
            layers.append(nn.Linear(last_dim, h))
            layers.append(nn.ReLU()) # ReLU is a common choice for activation function
            last_dim = h
        # Create the output layer
        layers.append(nn.Linear(last_dim, n_actions))
        # Combine all layers into a sequential model
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        """
        Defines the forward pass of the network.

        Args:
            x (torch.Tensor): The input state tensor.

        Returns:
            A tensor of Q-values for each action.
        """
        return self.model(x)

## 5 — Training the Agent

Now we have all the pieces. Instead of wrapping everything in an `Agent` class, we will set up the components and the training loop directly in the script. This makes the flow of data and logic explicit and easier to follow.

The training loop will:

1.  Initialize the Q-Network, Target Network, Optimizer, and Replay Buffer.
2.  Iterate for a specified number of episodes.
3.  In each step of an episode:
    a. Select an action using the ε-greedy policy.
    b. Execute the action and observe the outcome (`next_state`, `reward`, `done`).
    c. Store this transition in the replay buffer.
    d. If enough steps have passed, sample a batch from the buffer and perform a learning update on the Q-Network.
    e. Periodically update the target network weights.
4.  Log the results for visualization.

In [None]:
# --- 1. Initialization ---

# Get observation and action space dimensions from the environment
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n

# Main Q-network, which is actively trained
q_net = QNetwork(obs_dim, n_actions).to(device)
# Target network, a frozen copy of the q_net, used for stable target calculation
target_net = QNetwork(obs_dim, n_actions).to(device)
target_net.load_state_dict(q_net.state_dict())
target_net.eval()  # Set the target network to evaluation mode

# Optimizer for the q_net
optimizer = optim.Adam(q_net.parameters(), lr=args.lr)

# Experience Replay Buffer using a deque
replay_buffer = collections.deque(maxlen=args.buffer_capacity)

# --- 2. Main Training Loop ---
start_time = time.time()
print("Starting training...")

# Lists to store metrics
train_returns = []
train_losses = []
global_step = 0

for ep in range(args.num_episodes):
    state, _ = env.reset()
    ep_return = 0

    for t in range(args.max_steps_per_episode):
        # --- 3a. Select action with Epsilon-Greedy ---
        # Calculate the current value of epsilon using an exponential decay formula
        epsilon = args.eps_end + (args.eps_start - args.eps_end) * math.exp(-1. * global_step / args.eps_decay)

        if random.random() < epsilon:
            # Exploration: take a random action
            action = env.action_space.sample()
        else:
            # Exploitation: take the best action according to the Q-network
            with torch.no_grad():
                state_v = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                q_vals = q_net(state_v)
                action = int(torch.argmax(q_vals).item())

        # --- 3b. Execute action and observe outcome ---
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        ep_return += reward

        # --- 3c. Store transition in replay buffer ---
        replay_buffer.append((state, action, reward, next_state, done))

        # Update the current state
        state = next_state
        global_step += 1

        # --- 3d. Perform a learning step ---
        if global_step > args.learning_starts:
            # Sample a batch from the replay buffer
            batch = random.sample(replay_buffer, args.batch_size)
            states, actions, rewards, next_states, dones = map(np.array, zip(*batch))

            # Convert numpy arrays to PyTorch tensors
            states_v = torch.tensor(states, dtype=torch.float32, device=device)
            actions_v = torch.tensor(actions, dtype=torch.int64, device=device).unsqueeze(1)
            rewards_v = torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1)
            next_states_v = torch.tensor(next_states, dtype=torch.float32, device=device)
            dones_v = torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1)

            # --- Calculate the TD target ---
            with torch.no_grad():
                # Get the max Q-value for the next state from the target network
                max_next_q = target_net(next_states_v).max(1, keepdim=True)[0]
                # The TD target formula
                target_q = rewards_v + (1 - dones_v) * args.gamma * max_next_q

            # Get the Q-values for the actions that were actually taken
            q_values = q_net(states_v).gather(1, actions_v)

            # Compute the Mean Squared Error (MSE) loss
            loss = F.mse_loss(q_values, target_q)
            train_losses.append(loss.item())

            # Perform the optimization step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # --- 3e. Periodically update the target network ---
            if global_step % args.target_update_freq == 0:
                target_net.load_state_dict(q_net.state_dict())

        if done:
            break

    train_returns.append(ep_return)
    # Print progress every 20 episodes
    if (ep + 1) % 20 == 0:
        avg_return = np.mean(train_returns[-20:])
        print(f'Episode {ep+1}/{args.num_episodes} — Avg Return (last 20): {avg_return:.1f} — Epsilon: {epsilon:.2f} — Steps: {global_step}')

print(f'\nTraining finished in {time.time() - start_time:.1f} s')

## 6 — Visualizing Learning Progress

After training, it's crucial to visualize the agent's performance. The most common way to do this is by plotting the return per episode. A rising curve indicates that the agent is learning. Since the returns can be very noisy from one episode to the next, we also plot a smoothed version of the curve to better see the underlying trend.

In [None]:
def smooth(y, w=25):
    """
    Smooths a 1D array using a moving average.

    Args:
        y: The input array.
        w: The window size for the moving average.

    Returns:
        The smoothed array.
    """
    y = np.array(y, dtype=np.float32)
    if len(y) < w:
        return y
    # Convolve with a uniform window to get the moving average
    return np.convolve(y, np.ones(w)/w, mode='valid')

plt.figure(figsize=(10,5))
# Plot the raw returns per episode
plt.plot(train_returns, alpha=0.3, label='Episode return')
# Plot the smoothed returns
plt.plot(smooth(train_returns), label='Smoothed return (w=25)')
# Plot the random baseline for comparison
plt.axhline(np.mean(returns), color='r', linestyle='--', label='Random baseline')
plt.title('Training Progress')
plt.xlabel('Episode')
plt.ylabel('Return')
plt.legend()
plt.grid(True)
plt.show()

The plot should clearly show the agent's performance improving over time, starting from the random baseline and reaching much higher returns as it learns a successful policy.

## 7 — Qualitative Evaluation

Numbers and plots are great, but the best way to see if our agent has learned is to watch it in action\! We'll run a few episodes using the trained agent, but this time, we'll turn off exploration (`training=False`). The agent will now always choose the action it believes is best.

In [None]:
def evaluate(agent, env, n_episodes=5, render=True):
    """
    Evaluates the trained agent's performance.

    Args:
        agent: The trained DQNAgent.
        env: The environment.
        n_episodes: The number of evaluation episodes.
        render: If True, captures frames for visualization.

    Returns:
        A list of returns and a list of video frames for each episode.
    """
    rets, videos = [], []
    for _ in range(n_episodes):
        # The policy now uses training=False to ensure it's purely greedy (exploitative)
        ret, frames = run_episode(env, lambda s: agent.policy(s, training=False), render=render)
        rets.append(ret)
        videos.append(frames)
    return rets, videos

# Run the evaluation
eval_returns, eval_videos = evaluate(agent, env, n_episodes=5, render=True)
print('Average return over 5 eval episodes:', np.mean(eval_returns))

In [None]:
# Display the first rollout as a GIF
display_frames_as_gif(eval_videos[0])

Success\! The agent should now be able to consistently solve the MountainCar problem by effectively building momentum to reach the flag.

## 8 — Ideas for Further Exploration 🎯

This notebook provides a solid foundation, but the world of Deep RL is vast. Here are some ideas for how you could extend this project:

* **Algorithmic Improvements**:

  * **Double DQN**: A small tweak to the TD target calculation that helps reduce the overestimation bias of Q-values, often leading to better performance. It uses the main network to select the best action and the target network to evaluate it:

    $$
    Y_t = r + \gamma Q_{\theta^-}\left(s', \arg\max_{a'} Q_{\theta}(s', a')\right)
    $$
  * **Dueling DQN**: A neural network architecture that separates the estimation of state values, $V(s)$, and action advantages, $A(s, a)$. This can lead to better policy evaluation in the presence of many similar-valued actions.
  * **Prioritized Experience Replay (PER)**: Instead of sampling uniformly from the replay buffer, PER samples transitions based on their TD error. This allows the agent to focus more on the experiences it was most surprised by, leading to more efficient learning.

* **New Environments**:

  * Try a **continuous** version of the environment, *MountainCarContinuous-v0*. In this version, the actions are continuous values, not discrete. Algorithms like **Deep Deterministic Policy Gradient (DDPG)** or **Soft Actor-Critic (SAC)** are designed for these types of action spaces.
  * Scale up to a high-dimensional Atari game like **Breakout**. This will require using Convolutional Neural Networks (CNNs) to process the pixel inputs instead of an MLP. You'll also likely need to use **frame stacking** (combining several consecutive frames into one state) to give the agent a sense of motion.

* **Experimentation and Tuning**:

  * **Hyperparameter Search**: The performance of RL agents is often very sensitive to hyperparameters. Experiment with different values for the learning rate, epsilon decay schedule, network size, target update frequency, and replay buffer capacity.
  * **Better Tooling**: For more serious experimentation, log your metrics (returns, losses, etc.) to a dedicated tool like **TensorBoard** or **Weights & Biases**. These tools provide much richer visualization and experiment tracking capabilities.
  * **Code Optimization**: For a small speed-up, you can try using `torch.compile` (available in PyTorch 2.x and later) to Just-In-Time (JIT) compile your model for faster execution.

## References

  * *Human‑level control through deep reinforcement learning* — Mnih et al., 2015. (The original DQN paper)
  * OpenAI Gymnasium documentation: [https://gymnasium.farama.org/](https://gymnasium.farama.org/)
  * PyTorch Reinforcement Learning tutorial: [https://pytorch.org/tutorials/intermediate/reinforcement\_q\_learning.html](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html)