# Dependancies

In [None]:
#%pip install --upgrade --force-reinstall "gymnasium[atari]" autorom
#%pip install "gymnasium[other]"
#%pip install -r ../requirements.txt

# Imports

In [1]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStackObservation, TransformObservation
from gymnasium.spaces import Box

import numpy as np
import tensorflow as tf
import ale_py

#Need to adjust checkpoint and model saving when working locally instead of Colab env
#from google.colab import files, drive

#Setup and Hyperparameters



In [2]:
#drive.mount("/content/drive")

# Training duration (frames)
total_timesteps = 5_010_000
max_steps_per_episode = 10_000

# Replay buffer
replay_buffer_size = 300_000
learning_starts = 50_000

# Optimization
batch_size = 64
learning_rate = 0.00005
target_update_freq = 20_000   # steps

# Exploration (epsilon-greedy)
epsilon_start = 0.15
epsilon_end = 0.1
epsilon_decay_steps = 500_000
epsilon_decay = (epsilon_start - epsilon_end) / epsilon_decay_steps

# Gradient stabilization
max_grad_norm = 1.0

#Other
loss_function = keras.losses.Huber()
optimizer = keras.optimizers.Adam(learning_rate=learning_rate, clipnorm=max_grad_norm)

In [3]:
class FireResetWrapper(gym.Wrapper):
    def __init__(self, env):
      super().__init__(env)
      self.prev_lives = 0

    def reset(self, **kwargs):
      obs, info = self.env.reset(**kwargs)
      # Update lives tracking on reset
      self.prev_lives = info.get('lives', 0)

      # Initial FIRE to start the game
      obs, _, terminated, truncated, step_info = self.env.step(1)
      if terminated or truncated:
          return self.env.reset(**kwargs)

      info.update(step_info)
      return obs, info

    def step(self, action):
      obs, reward, terminated, truncated, info = self.env.step(action)

      # Check if we lost a life but the game is NOT over
      current_lives = info.get('lives', 0)
      if 0 < current_lives < self.prev_lives and not (terminated or truncated):
        # The ball was lost, game is waiting for FIRE.
        # We perform the FIRE step automatically.
        obs, fire_reward, fire_term, fire_trunc, fire_info = self.env.step(1)

        # Accumulate rewards/flags if the FIRE step somehow changes them
        reward += fire_reward
        terminated = terminated or fire_term
        truncated = truncated or fire_trunc
        info.update(fire_info)

      self.prev_lives = current_lives
      return obs, reward, terminated, truncated, info


# --- High-Speed NumPy Replay Buffer ---
class ReplayBuffer:
    def __init__(self, capacity, state_shape):
        self.capacity = capacity
        self.ptr = 0
        self.size = 0
        self.states = np.empty((capacity, *state_shape), dtype=np.uint8)
        self.next_states = np.empty((capacity, *state_shape), dtype=np.uint8)
        self.actions = np.empty(capacity, dtype=np.int32)
        self.rewards = np.empty(capacity, dtype=np.float32)
        self.dones = np.empty(capacity, dtype=np.float32)

    def add(self, state, action, reward, next_state, done):
        self.states[self.ptr] = state
        self.next_states[self.ptr] = next_state
        self.actions[self.ptr] = action
        self.rewards[self.ptr] = reward
        self.dones[self.ptr] = done
        self.ptr = (self.ptr + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)

    def sample(self, batch_size):
        idxs = np.random.randint(0, self.size, size=batch_size)
        return (self.states[idxs], self.next_states[idxs],
                self.rewards[idxs], self.actions[idxs], self.dones[idxs])

# Environment

In [4]:
stack_frames = 4
frame_skip = 4
gamma = 0.99
seed = 42
num_actions = 4

env = gym.make("ALE/Breakout-v5", render_mode="rgb_array",frameskip= 1)  # updated env name

env = FireResetWrapper(env)

# Preprocessing wrapper
env = AtariPreprocessing(
    env,
    noop_max=30,
    frame_skip=frame_skip,
    terminal_on_life_loss=False,
    grayscale_obs=True,
    scale_obs=False
)

# Frame stacking wrapper
env = FrameStackObservation(env, stack_size=stack_frames)

# Define the new observation space for the transposed version (84, 84, 4)
new_obs_space = Box(
    low=0,
    high=255,
    shape=(84, 84, 4),
    dtype=np.uint8
)

#Channels first to channels last
env = TransformObservation(
    env,
    lambda obs: np.moveaxis(obs, 0, -1),
    observation_space = new_obs_space
)

# Reset with seed
obs, info = env.reset(seed=seed)

print(type(obs),"Observation shape:", obs.shape)


<class 'numpy.ndarray'> Observation shape: (84, 84, 4)


# Deep Q-Network

In [5]:
def create_q_model():
    # Input is now (84, 84, 4) because of our env wrapper
    inputs = layers.Input(shape=(84, 84, 4))

    # Normalize 0-255 to 0-1
    x = layers.Rescaling(1.0 / 255.0)(inputs)

    # Convolutional layers (Standard DeepMind architecture)
    x = layers.Conv2D(32, 8, strides=4, activation="relu", kernel_initializer="he_normal")(x)
    x = layers.Conv2D(64, 4, strides=2, activation="relu", kernel_initializer="he_normal")(x)
    x = layers.Conv2D(64, 3, strides=1, activation="relu", kernel_initializer="he_normal")(x)
    x = layers.Flatten()(x)

    # --- Dueling Head ---
    # 1. Advantage stream
    adv = layers.Dense(512, activation="relu", kernel_initializer="he_normal")(x)
    adv = layers.Dense(num_actions, activation="linear")(adv)

    # 2. Value stream
    val = layers.Dense(512, activation="relu", kernel_initializer="he_normal")(x)
    val = layers.Dense(1, activation="linear")(val)

    # Combine Advantage and Value: Q(s,a) = V(s) + (A(s,a) - Mean(A))
    # This formula is mathematically more stable than simple addition
    def combine_layer(args):
        v, a = args
        return v + (a - tf.reduce_mean(a, axis=1, keepdims=True))

    outputs = layers.Lambda(combine_layer)([val, adv])

    return keras.Model(inputs=inputs, outputs=outputs)

# Online and Target networks
model = create_q_model()
model_target = create_q_model()
model_target.set_weights(model.get_weights())




#Search for Checkpoints and initialize Epsilon

In [6]:
import os
import re

def combine_layer(args):
    v, a = args
    return v + (a - tf.reduce_mean(a, axis=1, keepdims=True))

# 1. Drive path to checkpoints
checkpoint_dir = "/content/drive/MyDrive/checkpoints3/"
os.makedirs(checkpoint_dir, exist_ok=True)

# 2. Find the latest checkpoint
checkpoint_files = [f for f in os.listdir(checkpoint_dir) if f.endswith('.keras') and 'dqn_' in f]

if checkpoint_files:
    # Sort files by frame number using regex
    checkpoint_files.sort(key=lambda f: int(re.findall(r'\d+', f)[0]), reverse=True)
    latest_checkpoint = os.path.join(checkpoint_dir, checkpoint_files[0])

    # Extract the frame count from the filename
    frame_count = int(re.findall(r'\d+', checkpoint_files[0])[0])

    # Load weights into both models
    print(f"Resuming from {latest_checkpoint} at frame {frame_count}")
    model = keras.models.load_model(latest_checkpoint,custom_objects={'combine_layer': combine_layer})

    optimizer = keras.optimizers.Adam(learning_rate=learning_rate, clipnorm=max_grad_norm)

    model_target = create_q_model()
    model_target.set_weights(model.get_weights())

    # Recalculate epsilon based on current frame_count
    # This ensures agent doesn't start at 100% randomness again
    epsilon = max(epsilon_end, epsilon_start - (frame_count * epsilon_decay))

else:
    print("No checkpoints found. Starting from scratch.")
    frame_count = 0
    epsilon = epsilon_start

No checkpoints found. Starting from scratch.


# Training

In [7]:
buffer = ReplayBuffer(capacity=replay_buffer_size, state_shape=(84, 84, 4))

episode_reward_history = []
running_reward = 0.0
episode_count = 0

episode_reward_history = []
running_reward = 0.0
episode_count = 0
starting_framecount = frame_count

# @tf.function makes this run much faster by compiling it into a graph
@tf.function
def train_step(states, next_states, rewards, actions, dones, model, model_target, optimizer, loss_function, gamma):
    # Just cast to float32; the model's Rescaling layer (1/255) handles the rest
    states = tf.cast(states, tf.float32)
    next_states = tf.cast(next_states, tf.float32)

    # Double DQN Logic
    next_q_online = model(next_states, training=False)
    next_actions = tf.argmax(next_q_online, axis=1, output_type=tf.int32)

    next_q_target = model_target(next_states, training=False)
    indices = tf.stack([tf.range(tf.shape(next_actions)[0]), next_actions], axis=1)
    next_q_values = tf.gather_nd(next_q_target, indices)

    # Reward clipping (Standard for Atari to keep gradients stable)
    clipped_rewards = tf.clip_by_value(rewards, -1.0, 1.0)
    targets = clipped_rewards + gamma * (1.0 - dones) * next_q_values

    with tf.GradientTape() as tape:
        q_values = model(states, training=True)
        masks = tf.one_hot(actions, num_actions)
        q_action = tf.reduce_sum(q_values * masks, axis=1)
        loss = loss_function(targets, q_action)

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

# -------------------------
# Training Loop
# -------------------------
while frame_count < total_timesteps:
    observation, info = env.reset()
    # Ensure state is uint8 to save memory
    state = np.array(observation, dtype=np.uint8)
    episode_reward = 0

    prev_lives = info.get('lives', 5) # Breakout starts with 5 lives

    for step in range(1, max_steps_per_episode):
        frame_count += 1

        # --- Epsilon-Greedy Action ---
        if frame_count < learning_starts or np.random.rand() < epsilon:
            action = np.random.choice(num_actions)
        else:
            # Convert to float and normalize just for inference
            state_tensor = tf.convert_to_tensor(state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            state_tensor = tf.cast(state_tensor, tf.float32)

            q_values = model(state_tensor, training=False)
            action = tf.argmax(q_values[0]).numpy()

        # Decay epsilon
        epsilon -= epsilon_decay
        epsilon = max(epsilon, epsilon_end)

        # --- Environment Step ---
        state_next, reward, terminated, truncated, info = env.step(action)

        current_lives = info.get('lives', 0)
        if current_lives < prev_lives:
            # We "overwrite" the 0 reward with a -1 penalty
            reward = -1.0

        # Update lives for the next step comparison
        prev_lives = current_lives

        done = terminated or truncated
        state_next = np.array(state_next, dtype=np.uint8)

        episode_reward += reward

        # --- Store in Buffer ---
        buffer.add(state, action, reward, state_next, done)

        state = state_next

        # --- Train Step ---
        if frame_count > learning_starts + starting_framecount and frame_count % 4 == 0:

            state_sample, state_next_sample, rewards_sample, action_sample, done_sample = buffer.sample(batch_size)

            # Call the optimized training function
            train_step(
              state_sample,
              state_next_sample,
              rewards_sample,
              action_sample,
              done_sample,
              model,
              model_target,
              optimizer,
              loss_function,
              gamma
          )

        # --- Target Network Update ---
        if frame_count % target_update_freq == 0:
            model_target.set_weights(model.get_weights())
            print(f"Frames: {frame_count:,} | Episode: {episode_count} | "
                  f"Reward(avg100): {running_reward:.2f} | Epsilon: {epsilon:.3f}")

        # --- Checkpointing ---
        if frame_count % 200_000 == 0:
            print(f"Saving checkpoint at {frame_count} frames")
            save_path = f"/content/drive/MyDrive/checkpoints3/breakout_dqn_{frame_count}.keras"
            model.save(save_path)

        if done:
            break

    # --- Episode End Updates ---
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        episode_reward_history.pop(0) # Standard list pop is fine here (short list)

    running_reward = np.mean(episode_reward_history)
    episode_count += 1

# Final Save
model.save("/content/drive/MyDrive/checkpoints3/breakout_dqn_final.keras")
print("Training finished.")

KeyboardInterrupt: 