<a href="https://colab.research.google.com/github/appababba/DQN-on-New-Atari-Domains/blob/main/Copy_of_c166f25_02b_dqn_pong.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Test 3 --- Final Test for 06.11.2025

In [9]:
!pip install gymnasium[atari,accept-rom-license]
!pip install autorom
!pip install stable-baselines3



In [10]:
!AutoROM --accept-license

AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/usr/local/lib/python3.12/dist-packages/AutoROM/roms

Existing ROMs will be overwritten.


# Install the Gym

In [11]:
import ale_py
import gymnasium as gym

# Configure the model save drive

In [12]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
import os
save_dir = "/content/drive/MyDrive/PUBLIC/Models"
os.makedirs(save_dir, exist_ok=True)

# Now Model

In [14]:
from dataclasses import dataclass
import argparse
import time
from datetime import datetime
import numpy as np
import collections
import typing as tt

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.tensorboard.writer import SummaryWriter

In [15]:
#dqn_model
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
        )
        size = self.conv(torch.zeros(1, *input_shape)).size()[-1]
        self.fc = nn.Sequential(
            nn.Linear(size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
    def forward(self, x: torch.ByteTensor):
        x = x.float() / 255.0
        return self.fc(self.conv(x))

In [16]:
#wrappers

from gymnasium import spaces
from stable_baselines3.common import atari_wrappers


class ImageToPyTorch(gym.ObservationWrapper):
    """
    ImageToPyTorch: Reorders image dimensions from (H, W, C) to (C, H, W)
    for compatibility with PyTorch convolutional layers.
    """
    def __init__(self, env):
        super(ImageToPyTorch, self).__init__(env)
        obs = self.observation_space
        assert isinstance(obs, gym.spaces.Box)
        assert len(obs.shape) == 3
        new_shape = (obs.shape[-1], obs.shape[0], obs.shape[1])
        self.observation_space = gym.spaces.Box(
            low=obs.low.min(), high=obs.high.max(),
            shape=new_shape, dtype=obs.dtype)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)


class BufferWrapper(gym.ObservationWrapper):
    """
    BufferWrapper: Maintains a rolling window of the last `n_steps` frames
    to give the agent a sense of temporal context.
    """
    def __init__(self, env, n_steps):
        super(BufferWrapper, self).__init__(env)
        obs = env.observation_space
        assert isinstance(obs, spaces.Box)
        new_obs = gym.spaces.Box(
            obs.low.repeat(n_steps, axis=0), obs.high.repeat(n_steps, axis=0),
            dtype=obs.dtype)
        self.observation_space = new_obs
        self.buffer = collections.deque(maxlen=n_steps)

    def reset(self, *, seed: tt.Optional[int] = None, options: tt.Optional[dict[str, tt.Any]] = None):
        for _ in range(self.buffer.maxlen):
            self.buffer.append(np.zeros_like(self.env.observation_space.low))
        obs, extra = self.env.reset()
        return self.observation(obs), extra

    def observation(self, observation: np.ndarray) -> np.ndarray:
        self.buffer.append(observation)
        return np.concatenate(self.buffer)


def make_env(env_name: str, n_steps=4, render_mode=None, **kwargs):
    print(f"Creating environment {env_name}")
    env = gym.make(env_name, render_mode=render_mode, **kwargs)
    env = atari_wrappers.AtariWrapper(env, clip_reward=False, noop_max=0)
    env = QbertRewardWrapper(env, life_loss_penalty=-0.2, clip=True)
    env = ImageToPyTorch(env)
    env = BufferWrapper(env, n_steps=n_steps)
    return env

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


In [17]:
# Base Configuration
DEFAULT_ENV_NAME = "ALE/Qbert-v5"
MEAN_REWARD_BOUND = 19

GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 10000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1000
REPLAY_START_SIZE = 10000

SAVE_EPSILON = 0.5  # Only save if at least this much better
EPSILON_DECAY_LAST_FRAME = 150000
EPSILON_START = 1.0
EPSILON_FINAL = 0.01

# Tuple of tensors returned from a sampled minibatch in replay buffer
State = np.ndarray
Action = int
BatchTensors = tt.Tuple[
    torch.ByteTensor,           # current state
    torch.LongTensor,           # actions
    torch.Tensor,               # rewards
    torch.BoolTensor,           # done || trunc
    torch.ByteTensor            # next state
]

In [18]:
# ⚙️ Fast Training Config for Quick Test Run
MEAN_REWARD_BOUND = 5
REPLAY_START_SIZE = 20_000
EPSILON_DECAY_LAST_FRAME = 100_000
SYNC_TARGET_FRAMES = 500

# REPLAY_SIZE = 5000  # optional
# BATCH_SIZE = 16     # optional

In [19]:
import os
from pathlib import Path

# Define directories
save_dir_drive = "/content/drive/MyDrive/PUBLIC/Models"
save_dir_local = "saved_models"

# Create both directories if they don't exist
os.makedirs(save_dir_drive, exist_ok=True)
os.makedirs(save_dir_local, exist_ok=True)

# Safe model filename
env_name = DEFAULT_ENV_NAME
safe_env_name = env_name.replace("/", "_")

In [20]:
@dataclass
class Experience:
    state: State
    action: Action
    reward: float
    done_trunc: bool
    new_state: State


class ExperienceBuffer:
    def __init__(self, capacity: int):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience: Experience):
        self.buffer.append(experience)

    def sample(self, batch_size: int) -> tt.List[Experience]:
        indices = np.random.choice(len(self), batch_size, replace=False)
        return [self.buffer[idx] for idx in indices]

In [21]:
class Agent:
    def __init__(self, env: gym.Env, exp_buffer: ExperienceBuffer):
        self.env = env
        self.exp_buffer = exp_buffer
        self.state: tt.Optional[np.ndarray] = None
        self._reset()

    def _reset(self):
        self.state, _ = self.env.reset()
        self.total_reward = 0.0

    @torch.no_grad()
    def play_step(self, net: DQN, device: torch.device,
                  epsilon: float = 0.0) -> tt.Optional[float]:
        done_reward = None

        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            state_v = torch.as_tensor(self.state).to(device)
            state_v.unsqueeze_(0)
            q_vals_v = net(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            action = int(act_v.item())

        # do step in the environment
        new_state, reward, is_done, is_tr, _ = self.env.step(action)
        self.total_reward += reward

        exp = Experience(
            state=self.state, action=action, reward=float(reward),
            done_trunc=is_done or is_tr, new_state=new_state
        )
        self.exp_buffer.append(exp)
        self.state = new_state
        if is_done or is_tr:
            done_reward = self.total_reward
            self._reset()
        return done_reward

In [22]:
def batch_to_tensors(batch: tt.List[Experience], device: torch.device) -> BatchTensors:
    states, actions, rewards, dones, new_state = [], [], [], [], []
    for e in batch:
        states.append(e.state)
        actions.append(e.action)
        rewards.append(e.reward)
        dones.append(e.done_trunc)
        new_state.append(e.new_state)
    states_t = torch.as_tensor(np.asarray(states))
    actions_t = torch.LongTensor(actions)
    rewards_t = torch.FloatTensor(rewards)
    dones_t = torch.BoolTensor(dones)
    new_states_t = torch.as_tensor(np.asarray(new_state))
    return states_t.to(device), actions_t.to(device), rewards_t.to(device), \
           dones_t.to(device),  new_states_t.to(device)

In [23]:
def calc_loss(batch: tt.List[Experience], net: DQN, tgt_net: DQN,
              device: torch.device) -> torch.Tensor:
    states_t, actions_t, rewards_t, dones_t, new_states_t = batch_to_tensors(batch, device)

    state_action_values = net(states_t).gather(
        1, actions_t.unsqueeze(-1)
    ).squeeze(-1)
    with torch.no_grad():
        next_state_values = tgt_net(new_states_t).max(1)[0]
        next_state_values[dones_t] = 0.0
        next_state_values = next_state_values.detach()

    expected_state_action_values = next_state_values * GAMMA + rewards_t
    return nn.MSELoss()(state_action_values, expected_state_action_values)

In [24]:
class QbertRewardWrapper(gym.Wrapper):
    """
    Clips rewards to [-1, 1] and applies a small penalty when a life is lost.
    Works with ALE Atari envs.
    """
    def __init__(self, env, life_loss_penalty=-0.2, clip=True):
        super().__init__(env)
        self.clip = clip
        self.life_loss_penalty = life_loss_penalty
        self._last_lives = None

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self._last_lives = self._get_lives()
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)

        # reward clipping
        if self.clip:
            if reward > 0:
                reward = 1.0
            elif reward < 0:
                reward = -1.0
            else:
                reward = 0.0

        # life-loss detection -> small penalty
        lives = self._get_lives()
        if self._last_lives is not None and lives < self._last_lives:
            reward += self.life_loss_penalty
        self._last_lives = lives

        return obs, reward, terminated, truncated, info

    def _get_lives(self):
        # Try the info path first; fall back to ALE interface
        try:
            return self.env.unwrapped.ale.lives()
        except Exception:
            # Not available — return None, no penalty will fire
            return None


In [25]:
model_comment = f"test_epsdec{EPSILON_DECAY_LAST_FRAME}_rs{REPLAY_START_SIZE}_sync{SYNC_TARGET_FRAMES}"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

env = make_env(env_name)
net = DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net = DQN(env.observation_space.shape, env.action_space.n).to(device)
writer = SummaryWriter(comment=f"-{env_name}-{model_comment}")
print(net)

buffer = ExperienceBuffer(REPLAY_SIZE)
agent = Agent(env, buffer)
epsilon = EPSILON_START

optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
total_rewards = []
frame_idx = 0
ts_frame = 0
ts = time.time()
best_m_reward = None

start_time = time.time()
while True:
    frame_idx += 1
    epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)

    reward = agent.play_step(net, device, epsilon)
    if reward is not None:
        total_rewards.append(reward)
        speed = (frame_idx - ts_frame) / (time.time() - ts)
        elapsed = time.time() - start_time  # in seconds
        ts_frame = frame_idx
        ts = time.time()
        m_reward = np.mean(total_rewards[-100:])
        #  print(f"{frame_idx}: done {len(total_rewards)} games, reward {m_reward:.3f}, "
        #      f"eps {epsilon:.2f}, speed {speed:.2f} f/s, time {elapsed/60:.1f} min")
        writer.add_scalar("epsilon", epsilon, frame_idx)
        writer.add_scalar("speed", speed, frame_idx)
        writer.add_scalar("reward_100", m_reward, frame_idx)
        writer.add_scalar("reward", reward, frame_idx)
        if best_m_reward is None or m_reward > best_m_reward + SAVE_EPSILON:
            print(f"{frame_idx}: done {len(total_rewards)} games, reward {m_reward:.3f}, "
                f"eps {epsilon:.2f}, speed {speed:.2f} f/s, time {elapsed/60:.1f} min")
            timestamp = datetime.now().strftime("%Y%m%d-%H%M")
            model_filename = f"{safe_env_name}-best_{int(m_reward)}-{timestamp}-{model_comment}.dat"

            # Save to both paths
            model_path_drive = os.path.join(save_dir_drive, model_filename)
            model_path_local = os.path.join(save_dir_local, model_filename)

            torch.save(net.state_dict(), model_path_drive)
            torch.save(net.state_dict(), model_path_local)

            print(f"💾 Model saved to:\n - Google Drive: {model_path_drive}\n - Local:        {model_path_local}")
            if best_m_reward is not None:
                print(f"Best reward updated {best_m_reward:.3f} -> {m_reward:.3f}")
            best_m_reward = m_reward
        if m_reward > MEAN_REWARD_BOUND:
            print("Solved in %d frames!" % frame_idx)
            break
    if len(buffer) < REPLAY_START_SIZE:
        continue
    if frame_idx % SYNC_TARGET_FRAMES == 0:
        tgt_net.load_state_dict(net.state_dict())

    optimizer.zero_grad()
    batch = buffer.sample(BATCH_SIZE)
    loss_t = calc_loss(batch, net, tgt_net, device)
    loss_t.backward()
    optimizer.step()
env.close()
writer.close()

Creating environment ALE/Qbert-v5
DQN(
  (conv): Sequential(
    (0): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
    (6): Flatten(start_dim=1, end_dim=-1)
  )
  (fc): Sequential(
    (0): Linear(in_features=3136, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=6, bias=True)
  )
)
14: done 1 games, reward 0.800, eps 1.00, speed 249.71 f/s, time 0.0 min
💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Qbert-v5-best_0-20250930-0114-test_epsdec100000_rs20000_sync500.dat
 - Local:        saved_models/ALE_Qbert-v5-best_0-20250930-0114-test_epsdec100000_rs20000_sync500.dat
51: done 3 games, reward 1.800, eps 1.00, speed 256.68 f/s, time 0.0 min


  return datetime.utcnow().replace(tzinfo=utc)


💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Qbert-v5-best_1-20250930-0114-test_epsdec100000_rs20000_sync500.dat
 - Local:        saved_models/ALE_Qbert-v5-best_1-20250930-0114-test_epsdec100000_rs20000_sync500.dat
Best reward updated 0.800 -> 1.800
42749: done 2169 games, reward 2.320, eps 0.57, speed 257.18 f/s, time 3.5 min
💾 Model saved to:
 - Google Drive: /content/drive/MyDrive/PUBLIC/Models/ALE_Qbert-v5-best_2-20250930-0117-test_epsdec100000_rs20000_sync500.dat
 - Local:        saved_models/ALE_Qbert-v5-best_2-20250930-0117-test_epsdec100000_rs20000_sync500.dat
Best reward updated 1.800 -> 2.320


KeyboardInterrupt: 

In [30]:
# === MAKE TWO VIDEOS FROM YOUR EXISTING CHECKPOINTS, THEN COPY TO YOUR REPO ===
import os, re, glob, shutil
from pathlib import Path
from datetime import datetime

import torch
import gymnasium as gym
from stable_baselines3.common import atari_wrappers

# ---- 1) Recorder helper (greedy policy, ~10–30s) ----
def record_video_short(model, env_id="ALE/Qbert-v5", out_dir="videos", steps=1500, prefix="qbert"):
    out_dir = Path(out_dir); out_dir.mkdir(parents=True, exist_ok=True)
    rec = gym.make(env_id, render_mode="rgb_array")
    rec = atari_wrappers.AtariWrapper(rec, clip_reward=False, noop_max=0)
    rec = gym.wrappers.RecordVideo(
        rec, video_folder=str(out_dir),
        name_prefix=f"{prefix}_{datetime.now().strftime('%Y%m%d-%H%M%S')}",
        episode_trigger=lambda ep: True
    )
    # match your net’s expected obs format
    rec = ImageToPyTorch(rec)
    rec = BufferWrapper(rec, n_steps=4)

    state, _ = rec.reset()
    total_r = 0.0
    for _ in range(steps):
        with torch.no_grad():
            q = net(torch.as_tensor(state, device=device).unsqueeze(0))
            action = int(torch.argmax(q))
        state, r, done, tr, _ = rec.step(action)
        total_r += r
        if done or tr:
            break
    rec.close()
    vids = sorted(out_dir.glob(f"{prefix}_*.mp4"))
    latest = str(vids[-1])
    print(f"Saved video → {latest}  (return={total_r:.1f})")
    return latest

# ---- 2) Pick EARLY and LATER checkpoints robustly ----
ckpt_dir = Path("/content/drive/MyDrive/PUBLIC/Models")
all_ckpts = sorted(glob.glob(str(ckpt_dir / "ALE_Qbert-v5-best_*.dat")))
if not all_ckpts:
    raise FileNotFoundError(f"No Q*bert checkpoints found in {ckpt_dir}")

# Try to parse score+timestamp; otherwise fall back to file mtime
def parse_meta(p: str):
    name = Path(p).name
    # pattern: ALE_Qbert-v5-best_<score>-<YYYYMMDD-HHMM>-...
    m = re.search(r"_best_(\d+)-(\d{8}-\d{4})", name)
    if m:
        score = int(m.group(1))
        ts = m.group(2)
    else:
        score = -999999  # put unparsed files at the very beginning
        ts = "00000000-0000"
    return score, ts

scored = [(p, *parse_meta(p)) for p in all_ckpts]
# early = lowest score, tie-break by oldest timestamp; later = highest score, tie-break by newest timestamp
scored_sorted = sorted(scored, key=lambda x: (x[1], x[2]))
early_ckpt = scored_sorted[0][0]
scored_sorted_desc = sorted(scored, key=lambda x: (x[1], x[2]), reverse=True)
later_ckpt = scored_sorted_desc[0][0]

# if both end up the same (e.g., only one parsed), use oldest/newest by mtime
if early_ckpt == later_ckpt and len(all_ckpts) > 1:
    early_ckpt = min(all_ckpts, key=lambda p: Path(p).stat().st_mtime)
    later_ckpt = max(all_ckpts, key=lambda p: Path(p).stat().st_mtime)

print("EARLY CKPT:", early_ckpt)
print("LATER CKPT:", later_ckpt)

# ---- 3) Load & record the two videos ----
net.load_state_dict(torch.load(early_ckpt, map_location=device))
early_mp4 = record_video_short(net, env_id="ALE/Qbert-v5", out_dir="videos", steps=1500, prefix="early")

net.load_state_dict(torch.load(later_ckpt, map_location=device))
later_mp4 = record_video_short(net, env_id="ALE/Qbert-v5", out_dir="videos", steps=1500, prefix="later")

# ---- 4) Ensure the repo exists locally; clone if needed ----
repo_local = Path("/content/DQN-on-New-Atari-Domains")
if not repo_local.exists():
    os.system("git clone https://github.com/appababba/DQN-on-New-Atari-Domains /content/DQN-on-New-Atari-Domains")
    assert repo_local.exists(), "Failed to clone the repo; upload videos via GitHub web UI as a fallback."

# ---- 5) Copy videos into the repo/videos folder ----
videos_dir = repo_local / "videos"
videos_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(early_mp4, videos_dir / "early.mp4")
shutil.copy(later_mp4, videos_dir / "later.mp4")
print("\nCopied to repo:")
print(" -", videos_dir / "early.mp4")
print(" -", videos_dir / "later.mp4")

print("\nAdd this to README.md:")
print('<video src="videos/early.mp4" controls width="480"></video>')
print('<video src="videos/later.mp4" controls width="480"></video>')


EARLY CKPT: /content/drive/MyDrive/PUBLIC/Models/ALE_Qbert-v5-best_50-20250929-0436-test_epsdec10000_rs1000_sync500.dat
LATER CKPT: /content/drive/MyDrive/PUBLIC/Models/ALE_Qbert-v5-best_2-20250930-0117-test_epsdec100000_rs20000_sync500.dat


  logger.warn(
  IMAGEMAGICK_BINARY = r"C:\Program Files\ImageMagick-6.8.8-Q16\magick.exe"
  return datetime.utcnow().replace(tzinfo=utc)


Saved video → videos/early_20250930-014348-episode-0.mp4  (return=0.0)


  logger.warn(


Saved video → videos/later_20250930-014433-episode-0.mp4  (return=125.0)

Copied to repo:
 - /content/DQN-on-New-Atari-Domains/videos/early.mp4
 - /content/DQN-on-New-Atari-Domains/videos/later.mp4

Add this to README.md:
<video src="videos/early.mp4" controls width="480"></video>
<video src="videos/later.mp4" controls width="480"></video>


In [None]:
# --- COPY MP4s INTO YOUR REPO ---
import os, shutil
from pathlib import Path

# Two likely locations:
drive_repo = Path("/content/drive/MyDrive/PUBLIC/DQN-on-New-Atari-Domains")
colab_repo = Path("/content/DQN-on-New-Atari-Domains")  # if you did: !git clone https://github.com/appababba/DQN-on-New-Atari-Domains

if drive_repo.exists():
    repo_root = drive_repo
elif colab_repo.exists():
    repo_root = colab_repo
else:
    print("Repo folder not found on this machine.")
    print("Option A (recommended): In Colab terminal, run:")
    print("  !git clone https://github.com/appababba/DQN-on-New-Atari-Domains /content/DQN-on-New-Atari-Domains")
    print("Then re-run this cell.")
    print("Option B: Download videos from /content/videos and upload via GitHub web UI.")
    raise SystemExit

videos_dir = repo_root / "videos"
videos_dir.mkdir(parents=True, exist_ok=True)

src_early = sorted(Path("videos").glob("early_*.mp4"))[-1]
src_later = sorted(Path("videos").glob("later_*.mp4"))[-1]

shutil.copy(src_early, videos_dir / "early.mp4")
shutil.copy(src_later, videos_dir / "later.mp4")

print("Copied:")
print(" -", videos_dir / "early.mp4")
print(" -", videos_dir / "later.mp4")
print("\nAdd this to your README.md:\n")
print('<video src="videos/early.mp4" controls width="480"></video>')
print('<video src="videos/later.mp4" controls width="480"></video>')


In [29]:
import os
print(os.path.exists("/content/DQN-on-New-Atari-Domains"))
print(os.path.exists("/content/drive/MyDrive/PUBLIC/DQN-on-New-Atari-Domains"))


False
False


In [32]:
ls -lh /content/DQN-on-New-Atari-Domains/videos

total 20K
-rw-r--r-- 1 root root  12K Sep 30 01:44 early.mp4
-rw-r--r-- 1 root root 7.9K Sep 30 01:44 later.mp4


[main bf27879] Add Q*bert early and later training videos
 2 files changed, 0 insertions(+), 0 deletions(-)
 create mode 100644 videos/early.mp4
 create mode 100644 videos/later.mp4
fatal: could not read Username for 'https://github.com': No such device or address


# New Section