<a href="https://colab.research.google.com/github/PaatriickC/CSCI-166-Project/blob/main/CSCI166FinalProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Space Invaders DQN + Double DQN Notebook

# Config / Imports

In [38]:
# === CONFIG ===
!pip install gymnasium[atari,accept-rom-license] autorom stable-baselines3 --quiet

[0m

In [39]:
!AutoROM --accept-license

AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/usr/local/lib/python3.12/dist-packages/AutoROM/roms

Existing ROMs will be overwritten.


In [40]:
# Python imports
from dataclasses import dataclass
import argparse, time
from datetime import datetime
import numpy as np
import collections
import typing as tt
import os
from pathlib import Path
import cv2

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.tensorboard.writer import SummaryWriter

In [41]:
# Gym + wrappers
import ale_py
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3.common import atari_wrappers

In [42]:
# Directories for saving
save_dir_drive = "/content/drive/MyDrive/PUBLIC/Models"
save_dir_local = "saved_models"
os.makedirs(save_dir_drive, exist_ok=True)
os.makedirs(save_dir_local, exist_ok=True)
os.makedirs("videos", exist_ok=True)

# Fixed model + wrappers

In [43]:
# === MODEL ===
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
        )
        # compute conv output size by forwarding a dummy
        with torch.no_grad():
            dummy = torch.zeros(1, *input_shape)
            size = self.conv(dummy).size()[-1]
        self.fc = nn.Sequential(
            nn.Linear(size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
    def forward(self, x: torch.ByteTensor):
        # input expected as ByteTensor [0..255]
        x = x.float() / 255.0
        return self.fc(self.conv(x))

In [44]:
# === WRAPPERS ===
class ImageToPyTorch(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs = self.observation_space
        assert isinstance(obs, gym.spaces.Box)
        assert len(obs.shape) == 3
        new_shape = (obs.shape[-1], obs.shape[0], obs.shape[1])
        self.observation_space = gym.spaces.Box(
            low=obs.low.min(), high=obs.high.max(),
            shape=new_shape, dtype=obs.dtype)

    def observation(self, observation):
        return np.moveaxis(observation, 2, 0)

class BufferWrapper(gym.ObservationWrapper):
    def __init__(self, env, n_steps):
        super(BufferWrapper, self).__init__(env)
        obs = env.observation_space
        assert isinstance(obs, spaces.Box)
        low = np.repeat(obs.low[np.newaxis, ...], n_steps, axis=0)
        high = np.repeat(obs.high[np.newaxis, ...], n_steps, axis=0)
        # new shape (C*n_steps, H, W)
        new_shape = (obs.shape[0]*n_steps, obs.shape[1], obs.shape[2])
        self.observation_space = gym.spaces.Box(low=low.min(), high=high.max(),
                                                shape=new_shape, dtype=obs.dtype)
        self.buffer = collections.deque(maxlen=n_steps)
        self.n_steps = n_steps

    def reset(self, *, seed: tt.Optional[int] = None, options: tt.Optional[dict[str, tt.Any]] = None):
        # initialize buffer with zeros
        self.buffer.clear()
        for _ in range(self.n_steps):
            self.buffer.append(np.zeros_like(self.env.observation_space.low))
        obs, extras = self.env.reset(seed=seed)
        return self.observation(obs), extras

    def observation(self, observation: np.ndarray) -> np.ndarray:
        self.buffer.append(observation)
        stacked = np.concatenate(list(self.buffer), axis=0)
        return stacked

def make_env(env_name: str, n_steps=4, render_mode=None, clip_reward=False, noop_max=0):
    print(f"Creating environment {env_name} (render_mode={render_mode})")
    env = gym.make(env_name, render_mode=render_mode)
    env = atari_wrappers.AtariWrapper(env, clip_reward=clip_reward, noop_max=noop_max)
    env = ImageToPyTorch(env)  # -> (C, H, W)
    env = BufferWrapper(env, n_steps=n_steps)
    return env

# Experience Buffer & Agent

In [45]:
# === EXPERIENCE & REPLAY ===
State = np.ndarray
Action = int
BatchTensors = tt.Tuple[
    torch.ByteTensor, torch.LongTensor, torch.Tensor, torch.BoolTensor, torch.ByteTensor
]

@dataclass
class Experience:
    state: State
    action: Action
    reward: float
    done_trunc: bool
    new_state: State

class ExperienceBuffer:
    def __init__(self, capacity: int):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience: Experience):
        self.buffer.append(experience)

    def sample(self, batch_size: int) -> tt.List[Experience]:
        indices = np.random.choice(len(self), batch_size, replace=False)
        return [self.buffer[idx] for idx in indices]

# === AGENT (fixed env->self.env) ===
class Agent:
    def __init__(self, env: gym.Env, exp_buffer: ExperienceBuffer):
        self.env = env
        self.exp_buffer = exp_buffer
        self.state: tt.Optional[np.ndarray] = None
        self._reset()

    def _reset(self):
        self.state, _ = self.env.reset()
        self.total_reward = 0.0

    @torch.no_grad()
    def play_step(self, net: DQN, device: torch.device,
                  epsilon: float = 0.0) -> tt.Optional[float]:
        done_reward = None

        if np.random.random() < epsilon:
            action = self.env.action_space.sample()
        else:
            state_v = torch.as_tensor(self.state).to(device)
            state_v = state_v.unsqueeze(0)  # add batch dim
            q_vals_v = net(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            action = int(act_v.item())

        # do step in the environment
        new_state, reward, is_done, is_tr, _ = self.env.step(action)
        self.total_reward += float(reward)

        exp = Experience(
            state=self.state, action=action, reward=float(reward),
            done_trunc=is_done or is_tr, new_state=new_state
        )
        self.exp_buffer.append(exp)
        self.state = new_state
        if is_done or is_tr:
            done_reward = self.total_reward
            self._reset()
        return done_reward

In [46]:
# === AGENT (fixed env -> self.env) ===
class Agent:
  def __init__(self, env: gym.Env, exp_buffer: ExperienceBuffer):
    self.env = env
    self.exp_buffer = exp_buffer
    self.state: tt.Optional[np.ndarray] = None
    self._reset()

  def _reset(self):
    self.state, _ = self.env.reset()
    self.total_reward = 0.0

  @torch.no_grad()
  def play_step(self, net: DQN, device: torch.device,
                epsilon: float = 0.0) -> tt.Optional[float]:
      done_reward = None

      if np.random.random() < epsilon:
        action = self.env.action_space.sample()
      else:
        state_v = torch.as_tensor(self.state).to(device)
        state_v = state_v.unsqueeze(0) # add batch dim
        q_vals_v = net(state_v)
        _, act_v = torch.max(q_vals_v, dim=1)
        action = int(act_v.item())

      # do step in the environment
      new_state, reward, is_done, is_tr, _ = self.env.step(action)
      self.total_reward += float(reward)

      exp = Experience(
          state=self.state, action=action, reward=float(reward),
          done_trunc=is_done or is_tr, new_state=new_state
      )
      self.exp_buffer.append(exp)
      self.state = new_state
      if is_done or is_tr:
        done_reward = self.total_reward
        self._reset()
      return done_reward

# Loss Function

In [47]:
# === LOSS: supports baseline DQN and Double DQN via `double_dqn` flag ===
def batch_to_tensors(batch: tt.List[Experience], device: torch.device) -> BatchTensors:
    states, actions, rewards, dones, new_state = [], [], [], [], []
    for e in batch:
        states.append(e.state)
        actions.append(e.action)
        rewards.append(e.reward)
        dones.append(e.done_trunc)
        new_state.append(e.new_state)
    states_t = torch.as_tensor(np.asarray(states))
    actions_t = torch.LongTensor(actions)
    rewards_t = torch.FloatTensor(rewards)
    dones_t = torch.BoolTensor(dones)
    new_states_t = torch.as_tensor(np.asarray(new_state))
    return states_t.to(device), actions_t.to(device), rewards_t.to(device), \
           dones_t.to(device),  new_states_t.to(device)

def calc_loss(batch: tt.List[Experience], net: DQN, tgt_net: DQN,
              device: torch.device, gamma=0.99, double_dqn: bool=False) -> torch.Tensor:
    states_t, actions_t, rewards_t, dones_t, new_states_t = batch_to_tensors(batch, device)

    state_action_values = net(states_t).gather(1, actions_t.unsqueeze(-1)).squeeze(-1)

    with torch.no_grad():
        if double_dqn:
            # Double DQN: action selected by online net, value taken from target net
            next_actions = net(new_states_t).argmax(dim=1, keepdim=True)  # (B,1)
            next_state_values = tgt_net(new_states_t).gather(1, next_actions).squeeze(-1)
        else:
            # vanilla DQN
            next_state_values = tgt_net(new_states_t).max(1)[0]

        next_state_values[dones_t] = 0.0
        next_state_values = next_state_values.detach()

    expected_state_action_values = rewards_t + gamma * next_state_values
    return nn.MSELoss()(state_action_values, expected_state_action_values)

# Training Loop

In [None]:
# === TRAINING LOOP (improved logging + checkpointing + Double DQN toggle) ===
# Hyperparams (you can tweak)
DEFAULT_ENV_NAME = "ALE/Pong-v5"
env_name = DEFAULT_ENV_NAME
MEAN_REWARD_BOUND = 19

GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 100000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 10000
REPLAY_START_SIZE = 10000

EPSILON_DECAY_LAST_FRAME = 1_000_000
EPSILON_START = 1.0
EPSILON_FINAL = 0.01

# Quick test config (if you want very short local test)
# REPLAY_SIZE = 5000
# REPLAY_START_SIZE = 1000
# EPSILON_DECAY_LAST_FRAME = 10_000
# SYNC_TARGET_FRAMES = 500

# Variant switch
USE_DOUBLE_DQN = True   # <-- flip this to False to run vanilla DQN

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = make_env(env_name, n_steps=4, render_mode=None)
net = DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net = DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net.load_state_dict(net.state_dict())

writer = SummaryWriter(comment=f"-{env_name}-{'DDQN' if USE_DOUBLE_DQN else 'DQN'}")

buffer = ExperienceBuffer(REPLAY_SIZE)
agent = Agent(env, buffer)
epsilon = EPSILON_START

optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
total_rewards = []
frame_idx = 0
ts_frame = 0
ts = time.time()
best_m_reward = None
loss_history = []

start_time = time.time()
while True:
    frame_idx += 1
    epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)

    reward = agent.play_step(net, device, epsilon)
    if reward is not None:
        total_rewards.append(reward)
        speed = (frame_idx - ts_frame) / (time.time() - ts + 1e-8)
        elapsed = time.time() - start_time
        ts_frame = frame_idx
        ts = time.time()
        m_reward = np.mean(total_rewards[-100:])
        writer.add_scalar("epsilon", epsilon, frame_idx)
        writer.add_scalar("speed", speed, frame_idx)
        writer.add_scalar("reward_100", m_reward, frame_idx)
        writer.add_scalar("reward", reward, frame_idx)

        # print and maybe save model
        if best_m_reward is None or m_reward > best_m_reward + 0.5:
            print(f"{frame_idx}: done {len(total_rewards)} games, reward {m_reward:.3f}, "
                  f"eps {epsilon:.3f}, speed {speed:.2f} f/s, time {elapsed/60:.1f} min")
            timestamp = datetime.now().strftime("%Y%m%d-%H%M")
            safe_env_name = env_name.replace("/", "_")
            model_filename = f"{safe_env_name}-best_{int(m_reward)}-{timestamp}.dat"
            torch.save(net.state_dict(), os.path.join(save_dir_local, model_filename))
            torch.save(net.state_dict(), os.path.join(save_dir_drive, model_filename))
            best_m_reward = m_reward

        if m_reward > MEAN_REWARD_BOUND:
            print("Solved in %d frames!" % frame_idx)
            break

    if len(buffer) < REPLAY_START_SIZE:
        continue

    if frame_idx % SYNC_TARGET_FRAMES == 0:
        tgt_net.load_state_dict(net.state_dict())

    # optimize
    optimizer.zero_grad()
    batch = buffer.sample(BATCH_SIZE)
    loss_t = calc_loss(batch, net, tgt_net, device, gamma=GAMMA, double_dqn=USE_DOUBLE_DQN)
    loss_t.backward()
    # optional grad clip
    nn.utils.clip_grad_norm_(net.parameters(), 10.0)
    optimizer.step()
    loss_history.append(float(loss_t.detach().cpu().numpy()))

# close
env.close()
writer.close()

# Save training artifacts
import matplotlib.pyplot as plt
plt.figure(figsize=(8,4))
plt.plot(total_rewards, label='episodic reward')
if len(total_rewards) >= 100:
    import numpy as np
    rol = np.convolve(total_rewards, np.ones(100)/100, mode='valid')
    plt.plot(range(99,99+len(rol)), rol, label='100-ep rolling')
plt.xlabel("Episodes")
plt.ylabel("Reward")
plt.legend()
plt.tight_layout()
plt.savefig("results_learning_curve.png", dpi=150)
plt.show()

Creating environment ALE/Pong-v5 (render_mode=None)
211: done 1 games, reward -21.000, eps 1.000, speed 340.78 f/s, time 0.0 min


  return datetime.utcnow().replace(tzinfo=utc)


# Evaluation function & Video Recording

In [None]:
# === EVALUATION & VIDEO RECORDING ===
from gymnasium.wrappers import RecordVideo

def evaluate_and_record(model, env_name, model_path=None, out_path="videos/eval.mp4",
                        steps=1000, greedy=True, n_steps=4):
    # create env with rgb_array to record frames
    eval_env = make_env(env_name, n_steps=n_steps, render_mode="rgb_array")
    # gymnasium RecordVideo requires the older wrapper name; we use stable method below:
    eval_env = RecordVideo(eval_env, video_folder="videos", name_prefix="eval")
    state, _ = eval_env.reset()
    total_rw = 0.0
    for t in range(steps):
        state_v = torch.as_tensor(state).to(device).unsqueeze(0)
        q = model(state_v)
        if greedy:
            action = int(q.argmax(dim=1).item())
        else:
            action = eval_env.action_space.sample()
        state, reward, done, trunc, _ = eval_env.step(action)
        total_rw += float(reward)
        if done or trunc:
            state, _ = eval_env.reset()
            break
    eval_env.close()
    print("Eval total reward:", total_rw)

# Usage example (after you have some saved checkpoint):
# net.load_state_dict(torch.load("saved_models/your_model.dat"))
# evaluate_and_record(net, env_name, steps=600, greedy=False)  # early randomish
# evaluate_and_record(net, env_name, steps=600, greedy=True)   # later learned
