# Deep Q-Network Implementation
Here I have tried to include the Double DQN with experience replay in batches. The loss update equation is as follows:

$$
Q(s_t, a_t; \theta) \leftarrow Q(s_t, a_t; \theta) + \alpha [r_t + \gamma \max_{a} Q(s_{t+1}, a; \theta^-) - Q(s_t, a_t; \theta)]
$$  
   Where:  
   - $\theta$ represents the parameters of the main network  
   - $\theta^-$ represents the parameters of the target network

Here we have `self.model` and `self.target_model` as the $\theta$ and $\theta^-$ 

In [2]:
import numpy as np
import random
from collections import deque
import gymnasium as gym
import tensorflow as tf
from tensorflow.keras import Model, layers
import os

In [3]:
env = gym.make("CartPole-v1")
env.reset(seed=42)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

print("State size:", state_size)
print("Action size:", action_size)

State size: 4
Action size: 2


In [4]:
class DQN(Model):
    def __init__(self, state_size, action_size, **kwargs):
        super(DQN, self).__init__(**kwargs)
        self.state_size = state_size
        self.action_size = action_size
        self.dense1 = layers.Dense(24, activation='relu', input_shape=(state_size,))
        self.dense2 = layers.Dense(24, activation='relu')
        self.out = layers.Dense(action_size, activation='linear')

    def call(self, x):
        x = self.dense1(x)
        x = self.dense2(x)
        return self.out(x)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'state_size': self.state_size,
            'action_size': self.action_size
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [5]:
memory = deque(maxlen=2000)

## The Agent
The agent tries to balance a pole on a moving cart. I have used the OpenAI Gymnasium for this environment.

Within this environment, we have two actions for the cart:
1. Moving Left (0)
2. Moving Right (1)

With the state observations:
1. **Cart Position**: -4.8 to 4.8
2. **Cart Velocity**: -inf to inf
3. **Pole Angle**: -24° to 24°
4. **Pole Angular Velocity**: -inf to inf

And termination conditions:
1. **Termination**: Pole Angle > ±12°
2. **Termination**: Cart Position > ±2.4
3. **Truncation**: Episode length > 500

In [6]:
class DQNAgent:
    def __init__(self, state_size, action_size, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, learning_rate=0.001, batch_size=64):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma  
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.memory = deque(maxlen=2000)
        
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

    def _build_model(self):
        model = DQN(self.state_size, self.action_size)
        return model

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        q_values = self.model.predict(state, verbose=0)
        return np.argmax(q_values[0])

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            with tf.GradientTape() as tape:
                q_values = self.model(np.array([state]), training=True)
                q_value = q_values[0][action]

                if done:
                    target = reward
                else:
                    next_action = np.argmax(self.model(np.array([next_state]))[0].numpy())
                    t = self.target_model(np.array([next_state]))[0][next_action]
                    target = reward + self.gamma * t

                loss = tf.reduce_mean(tf.square(target - q_value))

            grads = tape.gradient(loss, self.model.trainable_variables)
            self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

    def save_model(self, filepath):
        self.model.save(filepath)

    def load_model(self, filepath):
        self.model = tf.keras.models.load_model(filepath, custom_objects={'DQN': DQN})
        self.target_model = tf.keras.models.load_model(filepath, custom_objects={'DQN': DQN})

In [7]:
batch_size = 64
n_episodes = 500
gamma = 0.95
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
learning_rate = 0.001
target_update_freq = 10

In [86]:
output_dir = "./saved_models/"

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

agent = DQNAgent(state_size, action_size, gamma, epsilon, epsilon_min, epsilon_decay, learning_rate, batch_size)
done = False

for e in range(n_episodes):
    state = env.reset()[0]
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    for time in range(500):
        action = agent.act(state)
        next_state, reward, done, truncated, _ = env.step(action)

        done = done or truncated
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state[0], action, reward, next_state[0], done)
        state = next_state
        total_reward += reward

        if done:
            print(f"Episode: {e+1}/{n_episodes}, Score: {time}, Epsilon: {agent.epsilon:.2}")
            break
            
    if len(agent.memory) > batch_size:
        agent.replay()

    if agent.epsilon > agent.epsilon_min:
        agent.epsilon *= agent.epsilon_decay

    if e % target_update_freq == 0:
        agent.update_target_model()

    if (e + 1) % 50 == 0:
        agent.model.save_weights(os.path.join(output_dir, f"weights_{e+1:04d}.weights.h5"))

agent.save_model(os.path.join(output_dir, "dqn_model.keras"))

Episode: 1/500, Score: 28, Epsilon: 1.0
Episode: 2/500, Score: 33, Epsilon: 0.99
Episode: 3/500, Score: 14, Epsilon: 0.99
Episode: 4/500, Score: 14, Epsilon: 0.99
Episode: 5/500, Score: 27, Epsilon: 0.98
Episode: 6/500, Score: 12, Epsilon: 0.98
Episode: 7/500, Score: 19, Epsilon: 0.97
Episode: 8/500, Score: 13, Epsilon: 0.97
Episode: 9/500, Score: 23, Epsilon: 0.96
Episode: 10/500, Score: 11, Epsilon: 0.96
Episode: 11/500, Score: 8, Epsilon: 0.95
Episode: 12/500, Score: 24, Epsilon: 0.95
Episode: 13/500, Score: 10, Epsilon: 0.94
Episode: 14/500, Score: 20, Epsilon: 0.94
Episode: 15/500, Score: 9, Epsilon: 0.93
Episode: 16/500, Score: 11, Epsilon: 0.93
Episode: 17/500, Score: 12, Epsilon: 0.92
Episode: 18/500, Score: 24, Epsilon: 0.92
Episode: 19/500, Score: 13, Epsilon: 0.91
Episode: 20/500, Score: 70, Epsilon: 0.91
Episode: 21/500, Score: 19, Epsilon: 0.9
Episode: 22/500, Score: 14, Epsilon: 0.9
Episode: 23/500, Score: 11, Epsilon: 0.9
Episode: 24/500, Score: 15, Epsilon: 0.89
Episode

In [8]:
def render_episode(agent, model_path, num_episodes=1):
    # Load the model
    agent.load_model(model_path)
    
    env = gym.make('CartPole-v1', render_mode='human')
    for episode in range(num_episodes):
        state, _ = env.reset()
        state = state.reshape(1, -1)
        done = False
        total_reward = 0
        while not done:
            env.render()
            action = agent.act(state)
            next_state, reward, done, truncated, _ = env.step(action)
            next_state = next_state.reshape(1, -1)
            total_reward += reward
            state = next_state
            if truncated:
                done = True
        print(f"Episode {episode + 1} reward: {total_reward}")
    env.close()

# Initializing saved model
state_size = 4
action_size = 2
agent = DQNAgent(state_size, action_size)
agent.epsilon = 0.0  # 0.0 for pure exploitation

# Load the model and render episodes
model_path = "./saved_models/dqn_model.keras"
render_episode(agent, model_path, num_episodes=3)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)



Episode 1 reward: 134.0
Episode 2 reward: 131.0
Episode 3 reward: 131.0


Here we can observe that the rewards are not as we had hoped because the model is simple and has little regularizations and no dropout layers in between. 

So I have tried the DQN from `stable_baseline3` to get the same model and it returns a better trained agent. 

In [11]:
import os
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.vec_env import DummyVecEnv

LOG_DIR = "DQN_logs"
MODEL_DIR = "DQN_models"
BEST_DIR = "DQN_best_model"

os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(BEST_DIR, exist_ok=True)

def make_env():
    def _thunk():
        env = gym.make("CartPole-v1")
        env = Monitor(env, filename=os.path.join(LOG_DIR, "monitor.csv"))
        return env
    return _thunk

venv = DummyVecEnv([make_env()])

# DQN hyperparams 
model = DQN(
    policy="MlpPolicy",
    env=venv,
    learning_rate=1e-3,
    buffer_size=50_000,
    learning_starts=1_000,
    batch_size=64,
    gamma=0.99,
    train_freq=4,
    target_update_interval=1_000,
    exploration_fraction=0.2,
    exploration_final_eps=0.02,
    verbose=1,
    tensorboard_log=LOG_DIR
)

eval_env = gym.make("CartPole-v1")
eval_env = Monitor(eval_env)
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path=BEST_DIR,
    log_path=LOG_DIR,
    eval_freq=5_000,
    deterministic=True,
    render=False,
)
checkpoint_callback = CheckpointCallback(
    save_freq=10_000,
    save_path=MODEL_DIR,
    name_prefix="dqn_cartpole"
)

# Train
total_timesteps = 200_000
model.learn(total_timesteps=total_timesteps, callback=[eval_callback, checkpoint_callback])

# Save final model
model.save(os.path.join(MODEL_DIR, "dqn_cartpole_final"))
print("Training complete. Saved final model and (if better) best model.")


Using cpu device
Logging to DQN_logs\DQN_2
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 19       |
|    ep_rew_mean      | 19       |
|    exploration_rate | 0.998    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 9678     |
|    time_elapsed     | 0        |
|    total_timesteps  | 76       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 16       |
|    ep_rew_mean      | 16       |
|    exploration_rate | 0.997    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 11363    |
|    time_elapsed     | 0        |
|    total_timesteps  | 128      |
----------------------------------


----------------------------------
| rollout/            |          |
|    ep_len_mean      | 18.8     |
|    ep_rew_mean      | 18.8     |
|    exploration_rate | 0.994    |
| time/               |          |
|    episodes         | 12       |
|    fps              | 14331    |
|    time_elapsed     | 0        |
|    total_timesteps  | 225      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 18.4     |
|    ep_rew_mean      | 18.4     |
|    exploration_rate | 0.993    |
| time/               |          |
|    episodes         | 16       |
|    fps              | 14560    |
|    time_elapsed     | 0        |
|    total_timesteps  | 295      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 18.1     |
|    ep_rew_mean      | 18.1     |
|    exploration_rate | 0.991    |
| time/               |          |
|    episodes       

In [12]:
import time

MODEL_PATHS = [
    os.path.join("DQN_best_model", "best_model.zip"),
    os.path.join("DQN_models", "dqn_cartpole_final.zip"),
]

def load_first_available(paths):
    for p in paths:
        if os.path.exists(p):
            return p
    raise FileNotFoundError("No trained model found. Train your model first.")

model_path = load_first_available(MODEL_PATHS)
print(f"Loading model: {model_path}")
model = DQN.load(model_path)

env = gym.make("CartPole-v1", render_mode="human")
obs, info = env.reset(seed=178)

n_episodes = 3
returns = []

for ep in range(n_episodes):
    done = False
    total_reward = 0.0
    while not done:
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env.step(int(action))
        total_reward += reward
        done = terminated or truncated
        # slow down a touch for visibility (optional)
        time.sleep(0.01)
    returns.append(total_reward)
    print(f"Episode {ep+1}: return={total_reward:.1f}")
    obs, info = env.reset()

env.close()
print(f"Average return over {n_episodes} episodes: {np.mean(returns):.1f} ± {np.std(returns):.1f}")


Loading model: DQN_best_model\best_model.zip
Episode 1: return=500.0
Episode 2: return=500.0
Episode 3: return=500.0
Average return over 3 episodes: 500.0 ± 0.0


In [13]:
import imageio

def record_cartpole_video(model, video_path="videos/cartpole_run.mp4", episode_length=500):
    os.makedirs(os.path.dirname(video_path), exist_ok=True)
    env = gym.make("CartPole-v1", render_mode="rgb_array")
    obs, info = env.reset(seed=42)
    frames = []
    done = False
    for _ in range(episode_length):
        frame = env.render()
        frames.append(frame)
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env.step(int(action))
        done = terminated or truncated
        if done:
            break
    env.close()
    imageio.mimsave(video_path, frames, fps=30)
    print(f"Video saved to {video_path}")

record_cartpole_video(model, video_path="videos/cartpole_run.mp4")



Video saved to videos/cartpole_run.mp4
