# Using a Double Dueling DQN to train an agent that can achieve a superhuman level at the Atari Breakout game

In [None]:
import os
import cv2
import time
import random
import numpy as np
import gymnasium as gym
import tensorflow as tf

from tensorflow.keras import layers
from collections import deque, namedtuple

In [2]:
import ale_py
import gymnasium as gym
gym.register_envs(ale_py)

env = gym.make("ALE/Breakout-v5")

A.L.E: Arcade Learning Environment (version 0.11.2+ecc1138)
[Powered by Stella]


In [3]:
EnvName = "ALE/Breakout-v5"
Transition = namedtuple("Transition", ("State", "Action", "Reward", "NextState", "Done"))

StackSize = 4                   # number of frames per state
InputHeight = 84                # height after crop+resize
InputWidth = 84                 # width after crop+resize
ReplayBufferSize = 100000
Gamma = 0.99
LearningRate = 2.5e-4
MaxStepsPerEpisode = 10000
ClipRewards = True              # clip rewards to {-1, 0, 1}

MaxEpisodes = 200              # hard limit
MinReplaySize = 1000           # start training sooner
TrainEveryNSteps = 2           # train more often
BatchSize = 64                 # larger batches
TargetUpdateEvery = 500        # update target net more frequently

EpsilonStart = 1.0
EpsilonEnd = 0.05
EpsilonDecayFrames = MaxEpisodes * MaxStepsPerEpisode // 1.5  # reach min by ~episode 150

In [4]:
# Preprocessings

def PreprocessFrame(frame):
    # convert to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)

    # TODO: (If needed) Adjust crop indices if visually needed. Here: rows 34:194 is common (160 high).
    # crop — for Breakout typical useful crop: remove top black border and bottom score area.
    cropped = gray[34:194, :]  # H:160 x W:160 for standard 210x160 input; adapt if env different

    resized = cv2.resize(cropped, (InputWidth, InputHeight), interpolation=cv2.INTER_AREA)

    return resized

def StackFrames(frameDeque, frame, isNewEpisode):
    processed = PreprocessFrame(frame)
    if isNewEpisode:
        # at new episode replicate same frame StackSize times
        frameDeque.clear()
        for _ in range(StackSize):
            frameDeque.append(processed)
    else:
        frameDeque.append(processed)

    stacked = np.stack(frameDeque, axis=-1)  # shape H x W x StackSize, dtype uint8
    return np.array(stacked, dtype=np.float32) / 255.0

In [5]:
# Helper Classes

class ReplayBuffer:
    def __init__(self, capacity: int):
        self.buffer = deque(maxlen=capacity)

    def Push(self, state, action, reward, next_state, done):
        self.buffer.append(Transition(state, action, reward, next_state, done))

    def Sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states = np.array([t.State for t in batch], dtype=np.float32)
        actions = np.array([t.Action for t in batch], dtype=np.int32)
        rewards = np.array([t.Reward for t in batch], dtype=np.float32)
        next_states = np.array([t.NextState for t in batch], dtype=np.float32)
        dones = np.array([t.Done for t in batch], dtype=np.float32)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

class EpsilonGreedyPolicy:
    def __init__(self, n_actions: int):
        self.n_actions = n_actions
        self.frameCount = 0

    def GetEpsilon(self):
        fraction = min(float(self.frameCount) / float(EpsilonDecayFrames), 1.0)
        eps = EpsilonStart + fraction * (EpsilonEnd - EpsilonStart)  # from 1.0 to 0.05
        return eps

    def SelectAction(self, model: tf.keras.Model, state: np.ndarray):
        self.frameCount += 1
        if random.random() < self.GetEpsilon():
            return random.randrange(self.n_actions)
        stateInput = np.expand_dims(state, axis=0).astype(np.float32)
        qvals = model.predict(stateInput, verbose=0)[0]
        return int(np.argmax(qvals))

In [6]:
# Helper Methods

def BuildDuelingDqn(input_shape, n_actions):
    inputs = layers.Input(shape=input_shape, name="InputFrames")

    # Convolutional feature extractor (Nature DQN)
    x = layers.Conv2D(32, kernel_size=8, strides=4, activation="relu")(inputs)
    x = layers.Conv2D(64, kernel_size=4, strides=2, activation="relu")(x)
    x = layers.Conv2D(64, kernel_size=3, strides=1, activation="relu")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(512, activation="relu")(x)

    # Dueling streams
    value = layers.Dense(1, name="StateValue")(x)                 # V(s)
    advantage = layers.Dense(n_actions, name="RawAdvantages")(x)  # A(s,a)

    # mean(A) across actions as a Keras op
    advantageMean = layers.Lambda(
        lambda a: tf.reduce_mean(a, axis=1, keepdims=True),
        output_shape=(1,)
    )(advantage)

    advCentered = layers.Subtract()([advantage, advantageMean])
    qValues = layers.Add(name="QValues")([value, advCentered])

    return tf.keras.Model(inputs=inputs, outputs=qValues)

def PlayOneStep(env, agent, frameDeque, state):
    action = agent.policy.SelectAction(agent.onlineModel, state)
    nextFrame, reward, terminated, truncated, info = env.step(action)

    if ClipRewards:
        clippedReward = np.sign(reward)
    else:
        clippedReward = reward

    nextState = StackFrames(frameDeque, nextFrame, False)

    done = bool(terminated or truncated)
    return nextState, clippedReward, done, info, action

In [7]:
class DqnAgent:
    def __init__(self, env: gym.Env):
        self.env = env
        self.nActions = env.action_space.n
        self.inputShape = (InputHeight, InputWidth, StackSize)
        self.onlineModel = BuildDuelingDqn(self.inputShape, self.nActions)
        self.targetModel = BuildDuelingDqn(self.inputShape, self.nActions)
        self.targetModel.set_weights(self.onlineModel.get_weights())

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=LearningRate)
        self.lossFn = tf.keras.losses.Huber()   # stable than MSE
        self.replayBuffer = ReplayBuffer(ReplayBufferSize)
        self.policy = EpsilonGreedyPolicy(self.nActions)
        self.trainStepCounter = 0

    def UpdateTargetNetwork(self):
        self.targetModel.set_weights(self.onlineModel.get_weights())

    def TrainStep(self):
        if len(self.replayBuffer) < BatchSize:
            return 0.0

        states, actions, rewards, next_states, dones = self.replayBuffer.Sample(BatchSize)

        # Convert to tensors
        statesTensor = tf.convert_to_tensor(states, dtype=tf.float32)
        nextStatesTensor = tf.convert_to_tensor(next_states, dtype=tf.float32)
        actionsTensor = tf.convert_to_tensor(actions, dtype=tf.int32)
        rewardsTensor = tf.convert_to_tensor(rewards, dtype=tf.float32)
        donesTensor = tf.convert_to_tensor(dones, dtype=tf.float32)

        # Double DQN target computation:
        # a_max = argmax_a Q_online(next_state, a)
        # target_q = r + gamma * Q_target(next_state, a_max) * (1 - done)
        # Use tape to compute online prediction gradients
        with tf.GradientTape() as tape:
            # predicted Q for chosen actions from online network
            qValues = self.onlineModel(statesTensor, training=True)
            batchIndices = tf.range(tf.shape(qValues)[0])
            # q for actions taken
            qTaken = tf.gather_nd(qValues, tf.stack([batchIndices, actionsTensor], axis=1))

            # compute next actions from online network
            qNextOnline = self.onlineModel(nextStatesTensor, training=False)
            nextActions = tf.argmax(qNextOnline, axis=1, output_type=tf.int32)

            # compute Q-values of next states from target network and pick according to nextActions
            qNextTarget = self.targetModel(nextStatesTensor, training=False)
            qNextTargetChosen = tf.gather_nd(qNextTarget, tf.stack([batchIndices, nextActions], axis=1))

            # backup targets
            targets = rewardsTensor + (1.0 - donesTensor) * Gamma * qNextTargetChosen

            # loss
            loss = self.lossFn(targets, qTaken)

        grads = tape.gradient(loss, self.onlineModel.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.onlineModel.trainable_variables))

        self.trainStepCounter += 1
        return float(loss)

    def SaveModel(self, path: str):
        self.onlineModel.save_weights(path)

    def LoadModel(self, path: str):
        self.onlineModel.load_weights(path)
        self.targetModel.set_weights(self.onlineModel.get_weights())

In [8]:
env = gym.make(EnvName, render_mode=None)
agent = DqnAgent(env)
frameDeque = deque(maxlen=StackSize)

totalEnvSteps = 0
losses = []
episodeRewards = []

for episode in range(1, MaxEpisodes + 1):
    obs, info = env.reset()
    state = StackFrames(frameDeque, obs, True)
    episodeReward = 0.0

    for step in range(1, MaxStepsPerEpisode + 1):
        # interact
        nextState, reward, done, info, action = PlayOneStep(env, agent, frameDeque, state)
        agent.replayBuffer.Push(state, action, reward, nextState, float(done))
        episodeReward += reward
        state = nextState
        totalEnvSteps += 1

        # After enough transitions, train every TrainEveryNSteps
        if totalEnvSteps > MinReplaySize and totalEnvSteps % TrainEveryNSteps == 0:
            loss = agent.TrainStep()
            losses.append(loss)

        # update target network periodically
        if totalEnvSteps % TargetUpdateEvery == 0:
            agent.UpdateTargetNetwork()

        if done:
            break

    episodeRewards.append(episodeReward)
    avgReward = np.mean(episodeRewards[-100:])
    print(
        f"Episode {episode:4d} | Steps {step:4d} | EpisodeReward {episodeReward:6.1f} "
        f"| Avg100 {avgReward:6.2f} | ReplaySize {len(agent.replayBuffer):6d} "
        f"| Eps {agent.policy.GetEpsilon():.3f}"
    )

    if episode % 100 == 0:
        os.makedirs("checkpoints", exist_ok=True)
        agent.SaveModel(f"checkpoints/dueling_dqn_ep{episode}.weights.h5")

env.close()

Episode    1 | Steps  136 | EpisodeReward    0.0 | Avg100   0.00 | ReplaySize    136 | Eps 1.000
Episode    2 | Steps  137 | EpisodeReward    0.0 | Avg100   0.00 | ReplaySize    273 | Eps 1.000
Episode    3 | Steps  183 | EpisodeReward    1.0 | Avg100   0.33 | ReplaySize    456 | Eps 1.000
Episode    4 | Steps  228 | EpisodeReward    3.0 | Avg100   1.00 | ReplaySize    684 | Eps 1.000
Episode    5 | Steps  134 | EpisodeReward    0.0 | Avg100   0.80 | ReplaySize    818 | Eps 0.999
Episode    6 | Steps  188 | EpisodeReward    1.0 | Avg100   0.83 | ReplaySize   1006 | Eps 0.999
Episode    7 | Steps  276 | EpisodeReward    3.0 | Avg100   1.14 | ReplaySize   1282 | Eps 0.999
Episode    8 | Steps  223 | EpisodeReward    3.0 | Avg100   1.38 | ReplaySize   1505 | Eps 0.999
Episode    9 | Steps  283 | EpisodeReward    3.0 | Avg100   1.56 | ReplaySize   1788 | Eps 0.999
Episode   10 | Steps  225 | EpisodeReward    2.0 | Avg100   1.60 | ReplaySize   2013 | Eps 0.999
Episode   11 | Steps  140 | Ep

In [9]:
def EvaluateAgent(env, agent, nEpisodes=5):
    totalReward = 0
    for ep in range(nEpisodes):
        obs, _ = env.reset()
        frameDeque = deque(maxlen=StackSize)
        state = StackFrames(frameDeque, obs, True)
        done = False
        episodeReward = 0
        while not done:
            action = agent.policy.SelectAction(agent.onlineModel, state)
            nextObs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = StackFrames(frameDeque, nextObs, False)
            episodeReward += reward
        totalReward += episodeReward
        print(f"Eval Episode {ep+1}: Reward {episodeReward}")
    print(f"\nAverage Eval Reward over {nEpisodes} episodes: {totalReward / nEpisodes:.2f}")

print("\n=== Final Evaluation ===")
EvaluateAgent(gym.make(EnvName, render_mode=None), agent)


=== Final Evaluation ===
Eval Episode 1: Reward 5.0
Eval Episode 2: Reward 0.0
Eval Episode 3: Reward 3.0
Eval Episode 4: Reward 0.0
Eval Episode 5: Reward 0.0

Average Eval Reward over 5 episodes: 1.60


In [None]:
evalEnv = gym.make(EnvName, render_mode="human")
agent.policy.frameCount = EpsilonDecayFrames

numEvalEpisodes = 5
print("\n=== Final Evaluation with Visualization ===")

for evalEpisode in range(1, numEvalEpisodes + 1):
    obs, info = evalEnv.reset()
    frameDeque.clear()
    state = StackFrames(frameDeque, obs, True)
    episodeReward = 0.0
    done = False
    step = 0

    while not done:
        step += 1
        action = agent.policy.SelectAction(agent.onlineModel, state)
        obs, reward, terminated, truncated, info = evalEnv.step(action)
        done = terminated or truncated
        nextState = StackFrames(frameDeque, obs, False)
        episodeReward += reward
        state = nextState

        print(f"Eval Episode {evalEpisode} Step {step}: Action {action}")

        time.sleep(0.02)  # adjust speed, 20ms delay for smoothness

    print(f"Eval Episode {evalEpisode} Reward: {episodeReward}\n")

evalEnv.close()


=== Final Evaluation with Visualization ===
Eval Episode 1 Step 1: Action 1
Eval Episode 1 Step 2: Action 0
Eval Episode 1 Step 3: Action 0
Eval Episode 1 Step 4: Action 1
Eval Episode 1 Step 5: Action 1
Eval Episode 1 Step 6: Action 0
Eval Episode 1 Step 7: Action 0
Eval Episode 1 Step 8: Action 1
Eval Episode 1 Step 9: Action 0
Eval Episode 1 Step 10: Action 0
Eval Episode 1 Step 11: Action 1
Eval Episode 1 Step 12: Action 1
Eval Episode 1 Step 13: Action 0
Eval Episode 1 Step 14: Action 0
Eval Episode 1 Step 15: Action 2
Eval Episode 1 Step 16: Action 2
Eval Episode 1 Step 17: Action 2
Eval Episode 1 Step 18: Action 3
Eval Episode 1 Step 19: Action 3
Eval Episode 1 Step 20: Action 1
Eval Episode 1 Step 21: Action 2
Eval Episode 1 Step 22: Action 1
Eval Episode 1 Step 23: Action 1
Eval Episode 1 Step 24: Action 1
Eval Episode 1 Step 25: Action 0
Eval Episode 1 Step 26: Action 3
Eval Episode 1 Step 27: Action 2
Eval Episode 1 Step 28: Action 0
Eval Episode 1 Step 29: Action 3
Eval Ep

: 