
# LunarLander RL Template: Q-Learning and DQN introduction

This notebook provides a reinforcement learning scaffold for **LunarLander-v3** (discrete) in **Gymnasium**.

You will be introduced to 2 different approaches:
- **A) Discretized Tabular Q-Learning**: bins the continuous state into discrete buckets, then learns a dictionary-based Q-table.
- **B) DQN (Deep Q-Network)**: uses your **MLP** to approximate the Q-function, with experience replay and a target network.

You will notice one will perform much better than the other.

In [None]:
#@title Environment setup for Colab
# This cell sets up all dependencies for the LunarLander notebook on Google Colab.
import sys, subprocess, os

def pipi(*args):
    subprocess.check_call([sys.executable, "-m", "pip", "install", *args])

# Upgrade base tools
pipi("--upgrade", "pip", "setuptools", "wheel")

# Gymnasium + Box2D
try:
    pipi("gymnasium[box2d]")
    try:
        pipi("box2d-py")
    except subprocess.CalledProcessError:
        print("[install] box2d-py failed; using Box2D fallback.")
        pipi("Box2D")
except subprocess.CalledProcessError:
    print("[install] gymnasium[box2d] failed; trying gymnasium and Box2D separately.")
    pipi("gymnasium")
    pipi("Box2D")

# Rendering / video / DL
pipi("pygame", "imageio", "imageio-ffmpeg", "matplotlib", "torch")

print("Setup complete. Continue below.")


In [None]:

import os, glob, random, math, time
from dataclasses import dataclass
from typing import Callable, Optional, Tuple, Dict

import numpy as np
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from tqdm import tqdm

from IPython.display import Video, display

# ==== Global Configuration ====
ENV_ID = "LunarLander-v3"       # Discrete environment
SEED = 42
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# On Colab, store videos in /content for easy preview
VIDEO_DIR = "/content/videos"
os.makedirs(VIDEO_DIR, exist_ok=True)

MAX_STEPS = 2500
RNG = np.random.default_rng(SEED)

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if DEVICE == "cuda":
    torch.cuda.manual_seed_all(SEED)

print(f"Using device: {DEVICE}")


### Some helper functions

In [None]:

def make_env(env_id: str = ENV_ID, seed: int = SEED, render_mode: Optional[str] = None):
    """Create and seed a Gymnasium environment with a safe fallback."""
    try:
        env = gym.make(env_id, render_mode=render_mode)
    except Exception as e:
        print(f"[make_env] Could not create '{env_id}' ({e}). Falling back to 'LunarLander-v3'.")
        env = gym.make("LunarLander-v3", render_mode=render_mode)

    try:
        env.reset(seed=seed)
    except TypeError:
        env.reset()
    try:
        env.action_space.seed(seed)
        env.observation_space.seed(seed)
    except Exception:
        pass
    return env


def rollout_episode(env, policy: Callable, max_steps: int = MAX_STEPS, capture_frames: bool = False):
    """Run a single episode with the provided policy callable: action = policy(obs, action_space)."""
    frames = []
    obs, info = env.reset(seed=SEED)
    total_reward = 0.0
    for t in range(max_steps):
        if capture_frames and hasattr(env, "render"):
            frame = env.render()
            if frame is not None:
                frames.append(frame)
        action = policy(obs, env.action_space)
        obs, reward, terminated, truncated, info = env.step(action)
        total_reward += float(reward)
        if terminated or truncated:
            break
    return total_reward, frames


In [None]:

from collections import deque

@dataclass
class RewardTracker:
    window: int = 100
    def __post_init__(self):
        self.history = []
        self._dq = deque(maxlen=self.window)
    def update(self, ret: float):
        self.history.append(ret)
        self._dq.append(ret)
    @property
    def moving_avg(self) -> float:
        return float(np.mean(self._dq)) if self._dq else 0.0


In [None]:

def random_policy(obs, action_space):
    return action_space.sample()

def record_and_display(policy_fn: Callable, env_id: str = ENV_ID, seed: int = SEED, video_dir: str = VIDEO_DIR, max_steps: int = MAX_STEPS):
    os.makedirs(video_dir, exist_ok=True)
    env = make_env(env_id, seed=seed, render_mode="rgb_array")
    env = RecordVideo(env, video_dir, episode_trigger=lambda e: True, name_prefix="demo")
    total, _ = rollout_episode(env, policy_fn, max_steps=max_steps, capture_frames=False)
    env.close()

    mp4s = sorted(glob.glob(os.path.join(video_dir, "*.mp4")))
    latest = mp4s[-1] if mp4s else None
    print(f"Episode return: {total:.2f}")
    if latest:
        display(Video(latest, embed=True, html_attributes="controls loop autoplay"))
    else:
        print("No video found. If on Colab, ensure imageio-ffmpeg is installed.")
    return total, latest

# Example video of a random policy
_ = record_and_display(random_policy)


In [None]:
def select_action(agent, obs, action_space, epsilon: float = 0.0):
  return agent.select_action(obs, action_space, epsilon)

def to_tensor(obs: np.ndarray) -> torch.Tensor:
    return torch.as_tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)

In [None]:
def record_agent_video(agent: nn.Module, env_id: str = ENV_ID, seed: int = SEED,
                       video_dir: str = VIDEO_DIR, max_steps: int = MAX_STEPS):
    os.makedirs(video_dir, exist_ok=True)
    env = make_env(env_id, seed=seed, render_mode="rgb_array")
    env = RecordVideo(env, video_dir, episode_trigger=lambda e: True, name_prefix="dqn")
    total = 0.0
    obs, _ = env.reset(seed=seed)
    for _ in range(max_steps):
        obs = to_tensor(obs)
        a = select_action(agent, obs, env.action_space, epsilon=0.0)
        obs, r, term, trunc, _ = env.step(a)
        total += float(r)
        if term or trunc:
            break
    env.close()
    mp4s = sorted(glob.glob(os.path.join(video_dir, "*.mp4")))
    latest = mp4s[-1] if mp4s else None
    print(f"Agent video return: {total:.2f}")
    if latest:
        display(Video(latest, embed=True, html_attributes="controls loop autoplay"))
    return total, latest



## A) Tabular Q-Learning (with state discretization)

LunarLander observations are continuous. We **bin** each dimension into a small number of buckets to get a discrete state key. Then we apply vanilla Q-learning:
```
Q[s,a] ← Q[s,a] + α (r + γ max_a' Q[s',a'] − Q[s,a])
```
Good for learning dynamics, but **DQN**s are preferred for function approximation and scalability.

See what results you can get with the tabular Q-learning approach.


In [None]:
class Discretizer:
    """Uniformly discretize each observation dimension into `bins` buckets."""
    def __init__(self, low: np.ndarray, high: np.ndarray, bins: int = 8):
        self.bins = bins
        low = np.where(np.isfinite(low), low, -1.0)
        high = np.where(np.isfinite(high), high, 1.0)
        self.low = low
        self.high = high

    def encode(self, obs: np.ndarray) -> Tuple[int, ...]:
        ratios = (obs - self.low) / (self.high - self.low + 1e-8)
        ratios = np.clip(ratios, 0.0, 1.0)
        idxs = (ratios * self.bins).astype(int)
        idxs = np.clip(idxs, 0, self.bins - 1)
        return tuple(int(i) for i in idxs)

In [None]:
from collections import defaultdict

class LunarLanderAgent:
    def __init__(
        self,
        env: gym.Env,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Q-Learning agent.

        Args:
            env: The training environment
            learning_rate: How quickly to update Q-values (0-1)
            initial_epsilon: Starting exploration rate (usually 1.0)
            epsilon_decay: How much to reduce epsilon each episode
            final_epsilon: Minimum exploration rate (usually 0.1)
            discount_factor: How much to value future rewards (0-1)
        """
        self.env = env

        self.disc = Discretizer(env.observation_space.low, env.observation_space.high)

        # Q-table: maps (state, action) to expected reward
        # defaultdict automatically creates entries with zeros for new states
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor  # How much we care about future rewards

        # Exploration parameters
        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        # Track learning progress
        self.training_error = []

    def select_action(self, obs: tuple[int, int, bool], action_space = None, epsilon = None) -> int:
        """Choose an action using epsilon-greedy strategy.

        Returns:
            action: 0, 1, 2, or 3 (left, right, up, down)
        """
        #TODO: implement the epsilon-greedy strategy
        comp = np.random.random()
        if (comp < epsilon):
          choice = np.random.choice(action_space)
        else:
          choice = np.argmax(self.q_values[obs])
        action_decision = choice

        return action_decision

    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Update Q-value based on experience.

        This is the heart of Q-learning: learn from (state, action, reward, next_state)
        """
        obs = self.disc.encode(obs)
        next_obs = self.disc.encode(next_obs)

        # What should the Q-value be? (Bellman equation, defined as V(s) = max_a(R(s, a) + g*V(s')))
        # a simplified version would be V(s) = R(s, a) + max(g*V(s')), which is basically saying that
        # the expected value is equal the reward of the current state and action + the max/best q-value possible for the next state * some discount factor

        # What's the best we could do from the next state?
        # (Zero if episode terminated - no future rewards possible)
        best_next_q = max(self.q_values[next_obs])  #TODO: calculate the best q-value possible for the next state that will be used in the Bellman equation

        target = reward + (best_next_q * self.discount_factor) #TODO: calculate target value using the Bellman equation

        # How wrong was our current estimate?
        temporal_difference = target - self.q_values[obs][action]

        # Update our estimate in the direction of the error
        # Learning rate controls how big steps we take
        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )

        # Track learning progress (useful for debugging)
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        """Reduce exploration rate after each episode."""
        self.epsilon = self.epsilon * self.epsilon_decay # TODO: implement epsilon decaying

In [None]:
from tqdm import tqdm

def train_q_agent(agent, n_episodes):
  for episode in tqdm(range(n_episodes)):
      # Start a new landing
      obs, info = env.reset()
      done = False

      while not done:
          # Agent chooses action (initially random, gradually more intelligent)
          action = agent.select_action(obs)

          # Take action and observe result
          next_obs, reward, terminated, truncated, info = env.step(action)

          # Learn from this experience
          agent.update(obs, action, reward, terminated, next_obs)

          # Move to next state
          done = terminated or truncated
          obs = next_obs

      if episode % 1000 == 0:
        print("Reward:", reward)

      # Reduce exploration rate (agent becomes less random over time)
      agent.decay_epsilon()

In [None]:
# Training hyperparameters
learning_rate = 0.01        # @param How fast to learn (higher = faster but less stable)
n_episodes = 50_000        # @param Number of landings to practice (may need to increase this)
start_epsilon = 1.0         # @param Start with 100% random actions
epsilon_decay = 0.997 # @param Reduce exploration over time
min_epsilon = 0.1         # @param Always keep some exploration

# Create environment and agent
env = make_env(ENV_ID, SEED, render_mode=None)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)

q_agent = LunarLanderAgent(
    env=env,
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=min_epsilon,
)

In [None]:
train_q_agent(q_agent, n_episodes)

### Visualize your tabular Q-learning lunar lander


In [None]:
record_agent_video(q_agent)


## B) Double DQN (Deep Q-Network)

We train a "policy" Q-network (`DQNModel`) with:
- **Experience Replay** buffer
- **Target Network** (periodically updated)
  - this separate network is used to calculate target Q-values, which decouples action selection (done by the policy network) and action evaluation
  - typically less prone to overestimating Q-values than standard single network DQN
- **ε-greedy** exploration
- **Huber loss** and **Adam** optimizer

Complete any TODOs.




In [None]:
class DQNModel(nn.Module):
    """Q-network architecture.

    For LunarLander:
      - Input: observation vector shape [8]
      - Output: Q-values for each action (shape [n_actions], 4)
    """
    def __init__(self, obs_dim: int, n_actions: int, hidden: int = 128):
        super().__init__()
        # TODO: implement architecture
        # hint: very simple MLP is all you need
        self.linear1 = nn.Linear(obs_dim, hidden)
        self.linear2 = nn.Linear(hidden, hidden)
        self.linear3 = nn.Linear(hidden, n_actions)
        self.act_fn = nn.ReLU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
      #TODO: return the model output (the Q-value)
      x = self.linear1(x)
      x = self.act_fn(x)
      x = self.linear2(x)
      x = self.act_fn(x)
      return self.linear3(x)

    def select_action(self, obs: np.ndarray, action_space, epsilon: float = 0.0) -> int:
      """Map model outputs to a valid discrete action.

      - With probability epsilon, choose a random action.
      - Otherwise, choose argmax Q-value from the model.
      """
      # TODO: return an action (int in {0,1,2,3})
      num = np.random.random()
      if (num < epsilon):
        choice = np.sample(action_space)
      else:
        choice = forward(obs).argmax().item() # If it does not work, try adding dim = (+/-)1 argument to .argmax()
      # hint: similar to how you implemented epsilon-greedy in tabular q-learning, but how do you find the max q-value from the model?
      # think about what the network represents or approximates
      raise NotImplementedError("implement epsilon-greedy over model Q-values here.")



In [None]:
from collections import namedtuple

Transition = namedtuple(
    "Transition", ["state", "action", "next_state", "reward", "done"]
)

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return (
            random.sample(self.memory, batch_size)
            if batch_size < len(self.memory)
            else self.memory
        )

    def __len__(self):
        return len(self.memory)

In [None]:
from dataclasses import dataclass

@dataclass
class DQNConfig:
  num_episodes: int = 500 # @param
  gamma: float = 0.99 # @param
  learning_rate: float = 1e-4 # @param
  tau: float = 0.005 # @param
  batch_size: int = 128 # @param
  epsilon: float = 1.0 # @param
  epsilon_decay: float = 0.995 # @param
  epsilon_min: float = 0.01 # @param
  eval_interval: int = 100 # @param
  eval_episodes: int = 5 # @param
  max_grad_norm: float = 10.0 # @param


In [None]:
# def linear_epsilon(step: int, start: float, end: float, decay_steps: int) -> float:
#     if step >= decay_steps:
#         return end
#     return start + (end - start) * (step / float(decay_steps))

@torch.no_grad()
def evaluate_model(model: nn.Module, episodes: int = 5, env_id: str = ENV_ID, seed: int = SEED):
    scores = []
    for ep in range(episodes):
        env = make_env(env_id, seed=seed + ep, render_mode=None)
        total = 0.0
        obs, _ = env.reset(seed=seed + ep)
        for _ in range(MAX_STEPS):
            obs = to_tensor(obs)
            a = select_action(model, obs, env.action_space, epsilon=0.0)
            obs, r, term, trunc, _ = env.step(a)
            total += float(r)
            if term or trunc:
                break
        env.close()
        scores.append(total)
    mean_ret = float(np.mean(scores))
    print(f"Eval over {episodes} episodes — mean return: {mean_ret:.2f}")
    return mean_ret

def dqn_train(cfg: DQNConfig):
    # Initialize the environment
    env = gym.make(ENV_ID, render_mode="human")

    n_observations = env.observation_space.shape[0]
    n_actions = env.action_space.n

    policy_net = DQNModel(n_observations, n_actions).to(DEVICE)
    target_net = DQNModel(n_observations, n_actions).to(DEVICE)
    target_net.load_state_dict(policy_net.state_dict())

    replay_memory = ReplayMemory(10_000)

    optimizer = optim.AdamW(policy_net.parameters(), lr=cfg.learning_rate)
    # smooth l1 loss is implementation of Huber loss (MSE for small errors, L1 for larger errors)
    criterion = nn.SmoothL1Loss()

    epsilon = cfg.epsilon

    best_reward = float("-inf")

    for episode in tqdm(range(cfg.num_episodes)):
      state, _ = env.reset(seed=SEED)
      state = to_tensor(state)
      tracker = RewardTracker()
      total_reward = 0.0

      while True:
        action = select_action(policy_net, state, env.action_space, epsilon)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        reward = torch.tensor([reward], device=DEVICE)
        next_state = to_tensor(next_state)
        replay_memory.push(state, torch.tensor([[action]], device=DEVICE), next_state, reward, done)

        state = next_state
        total_reward += 0 #TODO: fix the total_reward update line

        if len(replay_memory) >= cfg.batch_size:
            transitions = replay_memory.sample(cfg.batch_size)
            states, actions, next_states, rewards, dones = zip(*transitions)

            states_batch = torch.cat(states)
            next_states_batch = torch.cat(next_states)
            actions_batch = torch.cat(actions)
            rewards = torch.tensor(rewards, device=DEVICE)
            dones = torch.tensor(dones, device=DEVICE)

            # target network calculates target q-values (action evaluation)
            q_target = (
                cfg.gamma * target_net(next_states_batch).detach().max(-1)[0] * ~dones
                + rewards
            )

            #policy network determines action selection
            q_policy = policy_net(states_batch).gather(1, actions_batch)

            # Calculate the Huber loss (remember that the Huber loss behaves like MSE for errors < q_target and L1 for errors > q_target)
            loss = None  #TODO: use the criterion defined above

            optimizer.zero_grad()
            loss.backward()

            # In-place gradient clipping to stabilize training
            # TODO: use gradient norm clipping on the policy network; see torch.nn.utils.clip_grad_norm_()
            # hint: there is a config member named `max_grad_norm`

            optimizer.step()

        # Update target network (target network is updated less frequently, and it is updated by copying weights from the policy network)
        for target_param, main_param in zip(target_net.parameters(), policy_net.parameters()):
            target_param.data.copy_(cfg.tau * main_param.data + (1 - cfg.tau) * target_param.data)

        if done:
            tracker.update(total_reward)
            if total_reward > best_reward:
                best_reward = total_reward
                torch.save(policy_net.state_dict(), "best_policy.pth")
            if episode % 25 == 0:
                print(f"Episode {episode}, Reward: {total_reward}, Epsilon: {epsilon:.3f}")
            if episode % cfg.eval_interval == 0:
                evaluate_model(policy_net, episodes=cfg.eval_episodes)
            break

      epsilon = epsilon # TODO: decay the epsilon

    env.close()
    return policy_net, target_net, replay_memory, tracker



In [None]:
# default config should work well, but you are encouraged to try different hyperparams;
# e.g., increasing the number of episodes or experimenting with decay rate
cfg = DQNConfig()

In [None]:
# will raise NotImplementedError until you implement the TODOs in DQNModel
policy_net, target_net, rb, tracker = dqn_train(cfg)

In [None]:
best_policy_net_state = torch.load("best_policy.pth")
best_policy_net = DQNModel(8, 4).to(DEVICE)
best_policy_net.load_state_dict(best_policy_net_state)

### Visualize your lunar lander


In [None]:

# print(f"Final moving average: {tr.moving_avg:.2f}")
record_agent_video(best_policy_net)
