# Deep Q-Networks (DQN)

In this notebook we implement a **Deep Q-Network (DQN)** : a powerful extension of Q-Learning that uses a neural network to approximate the Q-function. We'll apply DQN to the classic control task **CartPole-v1** using **TensorFlow / Keras** and **Gymnasium**.

### What you will learn
- How DQN approximates Q-values with a neural network
- Experience replay buffer
- Epsilon-greedy exploration
- Target network and soft/hard updates
- A simple training loop to learn to solve CartPole

This notebook is educational and intentionally compact. For production-level training use stable-baselines3 or RLlib.

## 1) Install / Import libraries
If you're running this in a fresh environment, install `gymnasium` and `tensorflow` first. (Commented install lines are provided.)

In [None]:
# Uncomment and run if needed
# !pip install gymnasium --quiet
# !pip install 'gymnasium[classic_control]' --quiet
# !pip install tensorflow --quiet

import random
from collections import deque
import numpy as np
import gymnasium as gym
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import matplotlib.pyplot as plt
import time

print('TensorFlow version:', tf.__version__)
print('Gymnasium version:', gym.__version__)

## 2) Build a simple Q-network (MLP)
The network maps state → Q-values for each action.

In [None]:
def build_q_model(state_shape, n_actions):
    model = models.Sequential([
        layers.Input(shape=state_shape),
        layers.Dense(64, activation='relu'),
        layers.Dense(64, activation='relu'),
        layers.Dense(n_actions, activation='linear')
    ])
    return model

# quick instantiation to show summary
env = gym.make('CartPole-v1')
obs, info = env.reset()
state_shape = obs.shape
n_actions = env.action_space.n
qnet = build_q_model(state_shape, n_actions)
qnet.summary()

## 3) Experience Replay Buffer
Stores recent transitions `(s, a, r, s', done)` and samples mini-batches for training.

In [None]:
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

# quick test
rb = ReplayBuffer(1000)
for i in range(10):
    rb.push(np.zeros(state_shape), 0, 1.0, np.zeros(state_shape), False)
print('ReplayBuffer length:', len(rb))

## 4) DQN Agent (with target network and epsilon-greedy policy)
We'll implement: update steps, epsilon decay, and target network (hard update).

In [None]:
class DQNAgent:
    def __init__(self, state_shape, n_actions,
                 lr=1e-3, gamma=0.99, batch_size=64,
                 buffer_capacity=50000, min_replay_size=1000,
                 target_update_freq=1000):
        self.n_actions = n_actions
        self.gamma = gamma
        self.batch_size = batch_size
        self.min_replay_size = min_replay_size
        self.target_update_freq = target_update_freq

        self.q_network = build_q_model(state_shape, n_actions)
        self.target_network = build_q_model(state_shape, n_actions)
        self.optimizer = optimizers.Adam(learning_rate=lr)
        self.loss_fn = tf.keras.losses.MeanSquaredError()

        # Initially copy weights
        self.update_target(hard=True)

        self.replay_buffer = ReplayBuffer(capacity=buffer_capacity)
        self.train_steps = 0

    def update_target(self, hard=False):
        if hard:
            self.target_network.set_weights(self.q_network.get_weights())
        else:
            # soft update (tau)
            tau = 0.005
            q_weights = np.array(self.q_network.get_weights(), dtype=object)
            t_weights = np.array(self.target_network.get_weights(), dtype=object)
            new_weights = [tau * q + (1 - tau) * t for q, t in zip(q_weights, t_weights)]
            self.target_network.set_weights(new_weights)

    def act(self, state, epsilon=0.1):
        if np.random.rand() < epsilon:
            return np.random.randint(self.n_actions)
        q_values = self.q_network.predict(state[np.newaxis], verbose=0)[0]
        return int(np.argmax(q_values))

    def push(self, s, a, r, s2, done):
        self.replay_buffer.push(s, a, r, s2, done)

    def train_step(self):
        if len(self.replay_buffer) < max(self.min_replay_size, self.batch_size):
            return None

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
        # Convert to tensors
        states = tf.convert_to_tensor(states, dtype=tf.float32)
        next_states = tf.convert_to_tensor(next_states, dtype=tf.float32)
        actions = tf.convert_to_tensor(actions, dtype=tf.int32)
        rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
        dones = tf.convert_to_tensor(dones.astype(np.float32), dtype=tf.float32)

        # Compute target Q-values
        next_q = self.target_network(next_states)
        max_next_q = tf.reduce_max(next_q, axis=1)
        target_q = rewards + (1.0 - dones) * self.gamma * max_next_q

        with tf.GradientTape() as tape:
            q_vals = self.q_network(states)
            # gather predicted q for taken actions
            indices = tf.stack([tf.range(self.batch_size), actions], axis=1)
            pred_q = tf.gather_nd(q_vals, indices)
            loss = self.loss_fn(target_q, pred_q)

        grads = tape.gradient(loss, self.q_network.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.q_network.trainable_variables))

        # target network update
        self.train_steps += 1
        if self.train_steps % self.target_update_freq == 0:
            self.update_target(hard=True)

        return float(loss.numpy())

    def save(self, path='dqn_model.h5'):
        self.q_network.save(path)

    def load(self, path='dqn_model.h5'):
        self.q_network = tf.keras.models.load_model(path)
        self.update_target(hard=True)

# Instantiate the agent
agent = DQNAgent(state_shape=state_shape, n_actions=n_actions)
print('Agent ready.')

## 5) Training loop
We'll train for a small number of episodes to keep this demo fast. Increase `NUM_EPISODES` for a stronger agent.

We use an epsilon that decays from `eps_start` → `eps_end` over `eps_decay_steps`.
Training prints episodic reward and periodic loss statistics.

In [None]:
NUM_EPISODES = 300
MAX_STEPS = 500
eps_start = 1.0
eps_end = 0.01
eps_decay_steps = 20000

epsilon = eps_start
eps_decay = (eps_start - eps_end) / eps_decay_steps

rewards_history = []
loss_history = []

env = gym.make('CartPole-v1')

start_time = time.time()
for ep in range(1, NUM_EPISODES + 1):
    state, info = env.reset()
    ep_reward = 0
    for step in range(MAX_STEPS):
        action = agent.act(state, epsilon)
        next_state, reward, done, truncated, info = env.step(action)
        done_flag = bool(done or truncated)
        agent.push(state, action, reward, next_state, done_flag)
        loss = agent.train_step()
        if loss is not None:
            loss_history.append(loss)
        state = next_state
        ep_reward += reward

        # decay epsilon
        if epsilon > eps_end:
            epsilon -= eps_decay

        if done_flag:
            break

    rewards_history.append(ep_reward)

    if ep % 10 == 0:
        avg_reward = np.mean(rewards_history[-10:])
        avg_loss = np.mean(loss_history[-50:]) if loss_history else 0.0
        print(f'Episode {ep:04d} | AvgReward(10) {avg_reward:.2f} | Epsilon {epsilon:.3f} | AvgLoss {avg_loss:.4f}')

# final timing
print(f"Training finished in {time.time() - start_time:.1f}s")
env.close()

## 6) Plot learning curves
Plot episodic reward and training loss to inspect learning behaviour.

In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(rewards_history)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Episodic Reward')

plt.subplot(1,2,2)
plt.plot(loss_history[-1000:])
plt.xlabel('Training Steps (recent)')
plt.ylabel('Loss')
plt.title('Training Loss (recent)')
plt.tight_layout()
plt.show()

## 7) Evaluate the trained agent
Run a few episodes with epsilon=0 (greedy) to see the learned performance.

In [None]:
EVAL_EPISODES = 10
eval_rewards = []
env = gym.make('CartPole-v1', render_mode='rgb_array')
for ep in range(EVAL_EPISODES):
    state, info = env.reset()
    ep_r = 0
    for _ in range(500):
        action = agent.act(state, epsilon=0.0)  # greedy
        state, reward, done, truncated, info = env.step(action)
        ep_r += reward
        if done or truncated:
            break
    eval_rewards.append(ep_r)
env.close()
print(f'Evaluation over {EVAL_EPISODES} episodes — avg reward: {np.mean(eval_rewards):.2f}')

## 8) Save model (optional)
Save the learned Q-network for later reuse.

In [None]:
agent.save('dqn_cartpole.h5')
print('Saved model to dqn_cartpole.h5')

## Notes & Next steps
- This is a compact DQN implementation. Practical improvements include:
  - Prioritized Experience Replay
  - Double DQN (to reduce Q-value overestimation)
  - Dueling networks
  - Better exploration schedules and reward normalization
- For larger, harder environments (Atari) use established libraries (stable-baselines3, RLlib).

If you'd like, I can convert this into a standalone Python training script, add Double DQN, or produce a lightweight visualization of agent frames as a GIF.