In [None]:
# SINGLE-CELL PONG DQN + DOUBLE DQN + TUNED DOUBLE DQN
# -------------------------------------------------------------------------------
# 1) Colab: Runtime -> Change runtime type -> Hardware accelerator: GPU
# 2) Run this cell.

!pip install -q gymnasium ale-py "autorom[accept-rom-license]" imageio imageio-ffmpeg

import os
import csv
import random
from collections import deque, namedtuple
from dataclasses import dataclass, asdict
from typing import List, Tuple, Dict, Any

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

import gymnasium as gym
from gymnasium.wrappers import RecordEpisodeStatistics, AtariPreprocessing

import ale_py
gym.register_envs(ale_py)

import imageio.v2 as imageio

ENV_ID = "PongNoFrameskip-v4"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 50_000
REPLAY_START_SIZE = 10_000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1_000

EPS_START = 1.0
EPS_FINAL = 0.02
EPS_DECAY_FRAMES_BASE = 1_000_000

MAX_FRAMES_DQN_BASE = 400_000
MAX_FRAMES_DDQN_BASE = 400_000

MAX_FRAMES_DDQN_TUNED = 600_000
EPS_DECAY_FRAMES_TUNED = 300_000

VIDEO_MAX_STEPS = 5_000

os.makedirs("outputs", exist_ok=True)

@dataclass
class ExperimentConfig:
    run_name: str
    variant_name: str
    use_ddqn: bool
    max_frames: int
    eps_decay_frames: int
    comment: str = ""

experiment_logs: List[Dict[str, Any]] = []

class FrameStack(gym.Wrapper):
    def __init__(self, env, num_stack):
        super().__init__(env)
        self.num_stack = num_stack
        self.frames = deque(maxlen=num_stack)
        old_space = env.observation_space
        low = np.repeat(old_space.low[np.newaxis, ...], num_stack, axis=0)
        high = np.repeat(old_space.high[np.newaxis, ...], num_stack, axis=0)
        self.observation_space = gym.spaces.Box(low=low, high=high, dtype=old_space.dtype)

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        for _ in range(self.num_stack):
            self.frames.append(obs)
        return self._get_obs(), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(obs)
        return self._get_obs(), reward, terminated, truncated, info

    def _get_obs(self):
        return np.stack(self.frames, axis=0)

def make_env(env_id: str) -> gym.Env:
    env = gym.make(env_id, render_mode="rgb_array")
    env = RecordEpisodeStatistics(env)
    env = AtariPreprocessing(env, noop_max=30, frame_skip=4, screen_size=84, grayscale_obs=True, scale_obs=False)
    env = FrameStack(env, num_stack=4)
    return env

Experience = namedtuple("Experience", ["state", "action", "reward", "done", "next_state"])

class ReplayBuffer:
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def push(self, *args):
        self.buffer.append(Experience(*args))

    def sample(self, batch_size: int) -> List[Experience]:
        idx = np.random.choice(len(self.buffer), batch_size, replace=False)
        return [self.buffer[i] for i in idx]

def batch_to_tensors(batch: List[Experience], device: torch.device):
    states = np.stack([e.state for e in batch])
    next_states = np.stack([e.next_state for e in batch])
    actions = np.array([e.action for e in batch], dtype=np.int64)
    rewards = np.array([e.reward for e in batch], dtype=np.float32)
    dones = np.array([e.done for e in batch], dtype=np.bool_)

    return (
        torch.as_tensor(states, device=device),
        torch.as_tensor(actions, device=device),
        torch.as_tensor(rewards, device=device),
        torch.as_tensor(dones, device=device),
        torch.as_tensor(next_states, device=device),
    )

class DQN(nn.Module):
    def __init__(self, input_shape: Tuple[int, int, int], n_actions: int):
        super().__init__()
        c, h, w = input_shape
        self.conv = nn.Sequential(
            nn.Conv2d(c, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
        )
        with torch.no_grad():
            o = self.conv(torch.zeros(1, c, h, w))
            conv_size = o.view(1, -1).size(1)

        self.fc = nn.Sequential(
            nn.Linear(conv_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions),
        )

    def forward(self, x):
        x = x.float() / 255.0
        x = self.conv(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

class Agent:
    def __init__(self, env: gym.Env, buffer: ReplayBuffer):
        self.env = env
        self.buffer = buffer
        self.state, _ = self.env.reset()
        self.total_reward = 0.0

    def reset(self):
        self.state, _ = self.env.reset()
        self.total_reward = 0.0

    @torch.no_grad()
    def play_step(self, net: DQN, device: torch.device, epsilon: float):
        if random.random() < epsilon:
            action = self.env.action_space.sample()
        else:
            q = net(torch.as_tensor(self.state, device=device).unsqueeze(0))
            action = int(q.argmax(dim=1).item())

        next_state, reward, terminated, truncated, _ = self.env.step(action)
        done = terminated or truncated

        self.total_reward += reward
        self.buffer.push(self.state, action, reward, done, next_state)
        self.state = next_state

        if done:
            r = self.total_reward
            self.reset()
            return r
        return None

def calc_loss(batch, net, tgt_net, device, use_ddqn):
    states_t, actions_t, rewards_t, dones_t, next_states_t = batch_to_tensors(batch, device)
    q_sa = net(states_t).gather(1, actions_t.unsqueeze(-1)).squeeze(-1)

    with torch.no_grad():
        if use_ddqn:
            next_actions = net(next_states_t).argmax(dim=1)
            next_q = tgt_net(next_states_t).gather(1, next_actions.unsqueeze(-1)).squeeze(-1)
        else:
            next_q = tgt_net(next_states_t).max(dim=1)[0]

        next_q[dones_t] = 0.0
        expected = rewards_t + GAMMA * next_q

    return nn.MSELoss()(q_sa, expected)

def train_variant(config: ExperimentConfig):
    print(f"\n=== Starting run: {config.run_name} ({config.variant_name}) ===")
    if config.comment:
        print("  note:", config.comment)

    env = make_env(ENV_ID)
    n_actions = env.action_space.n
    obs_shape = env.observation_space.shape

    buffer = ReplayBuffer(REPLAY_SIZE)
    agent = Agent(env, buffer)

    net = DQN(obs_shape, n_actions).to(DEVICE)
    tgt_net = DQN(obs_shape, n_actions).to(DEVICE)
    tgt_net.load_state_dict(net.state_dict())

    optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)

    frame_idx = 0
    epsilon = EPS_START
    rewards_history = []
    mean_rewards = []

    while frame_idx < config.max_frames:
        frame_idx += 1
        epsilon = max(EPS_FINAL, EPS_START - frame_idx / config.eps_decay_frames)

        reward = agent.play_step(net, DEVICE, epsilon)
        if reward is not None:
            rewards_history.append(reward)
            mean_r = np.mean(rewards_history[-100:])
            mean_rewards.append((frame_idx, mean_r))
            if len(rewards_history) % 10 == 0:
                print(f"[{config.run_name}] frame {frame_idx:7d} | eps={epsilon:.3f} | R={reward:6.1f} | mean100={mean_r:6.2f}")

        if len(buffer) < REPLAY_START_SIZE:
            continue

        batch = buffer.sample(BATCH_SIZE)
        loss_t = calc_loss(batch, net, tgt_net, DEVICE, config.use_ddqn)
        optimizer.zero_grad()
        loss_t.backward()
        optimizer.step()

        if frame_idx % SYNC_TARGET_FRAMES == 0:
            tgt_net.load_state_dict(net.state_dict())

    model_path = os.path.join("outputs", f"{config.run_name}_net.pth")
    torch.save(net.state_dict(), model_path)
    print(f"[{config.run_name}] Saved model to {model_path}")

    final_mean_reward = float("nan")
    if mean_rewards:
        frames, means = zip(*mean_rewards)
        final_mean_reward = float(means[-1])
        plt.figure(figsize=(8,4))
        plt.plot(frames, means)
        plt.xlabel("Frames")
        plt.ylabel("Mean reward (100 ep)")
        plt.grid(True)
        curve_path = os.path.join("outputs", f"{config.run_name}_learning_curve.png")
        plt.savefig(curve_path, bbox_inches="tight")
        plt.close()

    env.close()

    log_entry = {
        "run_name": config.run_name,
        "variant_name": config.variant_name,
        "use_ddqn": config.use_ddqn,
        "max_frames": config.max_frames,
        "eps_decay_frames": config.eps_decay_frames,
        "gamma": GAMMA,
        "batch_size": BATCH_SIZE,
        "replay_size": REPLAY_SIZE,
        "target_sync_frames": SYNC_TARGET_FRAMES,
        "learning_rate": LEARNING_RATE,
        "final_mean_reward": final_mean_reward,
        "comment": config.comment,
    }
    experiment_logs.append(log_entry)

    return model_path, final_mean_reward

def record_video_random(env_id: str, filename: str, max_steps: int = VIDEO_MAX_STEPS):
    env = make_env(env_id)
    frames = []
    state, _ = env.reset()
    for _ in range(max_steps):
        frames.append(env.render())
        a = env.action_space.sample()
        state, _, terminated, truncated, _ = env.step(a)
        if terminated or truncated:
            break
    env.close()
    path = os.path.join("outputs", filename)
    imageio.mimsave(path, frames, fps=30)
    print(f"[VIDEO] Saved random video to {path}")

@torch.no_grad()
def record_video_agent(env_id: str, model_path: str, filename: str, epsilon: float = 0.05, max_steps: int = VIDEO_MAX_STEPS):
    env = make_env(env_id)
    n_actions = env.action_space.n
    obs_shape = env.observation_space.shape

    net = DQN(obs_shape, n_actions).to(DEVICE)
    net.load_state_dict(torch.load(model_path, map_location=DEVICE))
    net.eval()

    frames = []
    state, _ = env.reset()
    for _ in range(max_steps):
        frames.append(env.render())
        if random.random() < epsilon:
            a = env.action_space.sample()
        else:
            q = net(torch.as_tensor(state, device=DEVICE).unsqueeze(0))
            a = int(q.argmax(dim=1).item())

        state, _, terminated, truncated, _ = env.step(a)
        if terminated or truncated:
            break

    env.close()
    path = os.path.join("outputs", filename)
    imageio.mimsave(path, frames, fps=30)

# experiment configs

dqn_baseline_cfg = ExperimentConfig(
    run_name="pong_dqn_baseline",
    variant_name="DQN",
    use_ddqn=False,
    max_frames=MAX_FRAMES_DQN_BASE,
    eps_decay_frames=EPS_DECAY_FRAMES_BASE,
    comment="Baseline DQN"
)

ddqn_baseline_cfg = ExperimentConfig(
    run_name="pong_ddqn_baseline",
    variant_name="Double DQN",
    use_ddqn=True,
    max_frames=MAX_FRAMES_DDQN_BASE,
    eps_decay_frames=EPS_DECAY_FRAMES_BASE,
    comment="Baseline DDQN"
)

ddqn_tuned_cfg = ExperimentConfig(
    run_name="pong_ddqn_tuned",
    variant_name="Double D


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/434.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m31.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for AutoROM.accept-rom-license (pyproject.toml) ... [?25l[?25hdone
Using device: cuda




[VIDEO] Saved random video to outputs/pong_random.mp4

=== Starting run: pong_dqn_baseline (DQN) ===
  use_ddqn=False, max_frames=400000, eps_decay_frames=1000000
  note: Baseline DQN from starter: long epsilon decay, 400k frames.
[pong_dqn_baseline] frame    8857 | eps=0.991 | ep   10 | R= -21.0 | mean_100=-20.70
[pong_dqn_baseline] frame   17862 | eps=0.982 | ep   20 | R= -19.0 | mean_100=-20.60
[pong_dqn_baseline] frame   27917 | eps=0.972 | ep   30 | R= -20.0 | mean_100=-20.23
[pong_dqn_baseline] frame   37576 | eps=0.962 | ep   40 | R= -20.0 | mean_100=-20.18
[pong_dqn_baseline] frame   47106 | eps=0.953 | ep   50 | R= -21.0 | mean_100=-20.14
[pong_dqn_baseline] frame   56568 | eps=0.943 | ep   60 | R= -21.0 | mean_100=-20.22
[pong_dqn_baseline] frame   66169 | eps=0.934 | ep   70 | R= -20.0 | mean_100=-20.20
[pong_dqn_baseline] frame   75492 | eps=0.925 | ep   80 | R= -21.0 | mean_100=-20.16
[pong_dqn_baseline] frame   84732 | eps=0.915 | ep   90 | R= -20.0 | mean_100=-20.21
[pon



[VIDEO] Saved agent video to outputs/pong_dqn_baseline.mp4




[VIDEO] Saved agent video to outputs/pong_ddqn_baseline.mp4




[VIDEO] Saved agent video to outputs/pong_ddqn_tuned.mp4

Saved experiment log to outputs/experiment_log.csv

=== Final mean rewards (approx. over last 100 episodes) ===
pong_dqn_baseline     : -18.89
pong_ddqn_baseline    : -18.69
pong_ddqn_tuned       :  18.64

Done. Check the 'outputs/' folder in the Colab file browser.


In [None]:
import shutil
from google.colab import files

shutil.make_archive('pong_project_outputs', 'zip', 'outputs')

files.download('pong_project_outputs.zip')

Zipping files...
Downloading...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>