# DQN Reinforcement Learning Notebook
This notebook builds, trains, evaluates, and saves a DQN-style reinforcement learning agent in Python.

## 1) Install and Import Dependencies
Install and import required packages for environment simulation, training, and plotting.

In [None]:
# If needed in a fresh environment, uncomment the next line:
# !pip install gymnasium torch numpy matplotlib

import random
from collections import deque, namedtuple

import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

## 2) Configure Environment and Hyperparameters
Set random seeds, device, and DQN training hyperparameters.

In [None]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ENV_NAME = "CartPole-v1"

LR = 1e-3
GAMMA = 0.99
BATCH_SIZE = 64
REPLAY_CAPACITY = 50_000
MIN_REPLAY_SIZE = 1_000
TARGET_UPDATE_EVERY = 200

EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 0.995

NUM_EPISODES = 250
MAX_STEPS_PER_EPISODE = 1000

print(f"Device: {DEVICE}")

## 3) Create the RL Environment Wrapper
Initialize the environment and define helper functions for reset/step and episode stats.

In [None]:
env = gym.make(ENV_NAME)
obs, info = env.reset(seed=SEED)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

print("Observation space:", env.observation_space)
print("Action space:", env.action_space)
print("state_dim:", state_dim, "action_dim:", action_dim)


def env_reset(environment, seed=None):
    state, info = environment.reset(seed=seed)
    return np.array(state, dtype=np.float32), info


def env_step(environment, action):
    next_state, reward, terminated, truncated, info = environment.step(action)
    done = terminated or truncated
    return np.array(next_state, dtype=np.float32), float(reward), done, info

## 4) Implement Replay Buffer
Store transitions and sample randomized mini-batches for stable off-policy learning.

In [None]:
Transition = namedtuple("Transition", ["state", "action", "reward", "next_state", "done"])


class ReplayBuffer:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append(
            Transition(
                np.array(state, dtype=np.float32),
                int(action),
                float(reward),
                np.array(next_state, dtype=np.float32),
                float(done),
            )
        )

    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states = torch.tensor(np.array([t.state for t in batch]), dtype=torch.float32, device=DEVICE)
        actions = torch.tensor([[t.action] for t in batch], dtype=torch.int64, device=DEVICE)
        rewards = torch.tensor([[t.reward] for t in batch], dtype=torch.float32, device=DEVICE)
        next_states = torch.tensor(np.array([t.next_state for t in batch]), dtype=torch.float32, device=DEVICE)
        dones = torch.tensor([[t.done] for t in batch], dtype=torch.float32, device=DEVICE)
        return states, actions, rewards, next_states, dones


replay_buffer = ReplayBuffer(REPLAY_CAPACITY)
print("Replay buffer initialized with capacity:", REPLAY_CAPACITY)

## 5) Build the Q-Network (PyTorch)
Define the Q-network and create policy/target networks.

In [None]:
class QNetwork(nn.Module):
    def __init__(self, in_dim: int, out_dim: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, out_dim),
        )

    def forward(self, x):
        return self.net(x)


policy_net = QNetwork(state_dim, action_dim).to(DEVICE)
target_net = QNetwork(state_dim, action_dim).to(DEVICE)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LR)
print("Networks initialized.")

## 6) Implement Action Selection (Epsilon-Greedy)
Choose random actions with probability epsilon, otherwise greedy actions from Q-values.

In [None]:
def select_action(state: np.ndarray, epsilon: float) -> int:
    if random.random() < epsilon:
        return env.action_space.sample()
    state_t = torch.tensor(state, dtype=torch.float32, device=DEVICE).unsqueeze(0)
    with torch.no_grad():
        q_vals = policy_net(state_t)
    return int(torch.argmax(q_vals, dim=1).item())

## 7) Implement Optimization Step and Target Network Sync
Compute TD targets, optimize policy network, clip gradients, and periodically sync target network.

In [None]:
global_step = 0


def optimize_model():
    if len(replay_buffer) < max(BATCH_SIZE, MIN_REPLAY_SIZE):
        return None

    states, actions, rewards, next_states, dones = replay_buffer.sample(BATCH_SIZE)

    q_values = policy_net(states).gather(1, actions)

    with torch.no_grad():
        next_q_values = target_net(next_states).max(1, keepdim=True)[0]
        targets = rewards + (1.0 - dones) * GAMMA * next_q_values

    loss = nn.SmoothL1Loss()(q_values, targets)

    optimizer.zero_grad()
    loss.backward()
    nn.utils.clip_grad_norm_(policy_net.parameters(), max_norm=10.0)
    optimizer.step()

    return float(loss.item())


def maybe_sync_target(step_idx: int):
    if step_idx % TARGET_UPDATE_EVERY == 0:
        target_net.load_state_dict(policy_net.state_dict())

## 8) Run the Training Loop
Train the agent across episodes with epsilon decay, replay sampling, and reward/loss logging.

In [None]:
episode_rewards = []
loss_history = []
epsilon = EPS_START

global_step = 0
for episode in range(1, NUM_EPISODES + 1):
    state, _ = env_reset(env, seed=SEED + episode)
    episode_reward = 0.0

    for _ in range(MAX_STEPS_PER_EPISODE):
        action = select_action(state, epsilon)
        next_state, reward, done, _ = env_step(env, action)

        replay_buffer.push(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward

        loss = optimize_model()
        if loss is not None:
            loss_history.append(loss)

        global_step += 1
        maybe_sync_target(global_step)

        if done:
            break

    episode_rewards.append(episode_reward)
    epsilon = max(EPS_END, epsilon * EPS_DECAY)

    if episode % 20 == 0:
        recent_mean = float(np.mean(episode_rewards[-20:]))
        print(f"Episode {episode:4d} | epsilon={epsilon:.3f} | mean reward (last 20)={recent_mean:.2f}")

print("Training complete.")

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(episode_rewards, label="Episode reward")
if len(episode_rewards) >= 20:
    mov_avg = np.convolve(episode_rewards, np.ones(20) / 20, mode="valid")
    plt.plot(range(19, len(episode_rewards)), mov_avg, label="20-ep moving avg")
plt.title("Reward Curve")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(loss_history)
plt.title("Loss History")
plt.xlabel("Optimization step")
plt.ylabel("Loss")
plt.tight_layout()
plt.show()

## 9) Evaluate the Trained Agent
Run deterministic episodes and report mean return.

In [None]:
def evaluate_agent(environment, episodes: int = 10):
    returns = []
    for ep in range(episodes):
        state, _ = env_reset(environment, seed=1000 + ep)
        done = False
        ep_return = 0.0
        while not done:
            action = select_action(state, epsilon=0.0)
            state, reward, done, _ = env_step(environment, action)
            ep_return += reward
        returns.append(ep_return)
    return returns


eval_returns = evaluate_agent(env, episodes=10)
print("Evaluation returns:", eval_returns)
print("Mean evaluation return:", float(np.mean(eval_returns)))

## 10) Save, Load, and Reuse the Model
Persist checkpoints, reload them, and run a quick inference episode.

In [None]:
checkpoint_path = "dqn_cartpole_checkpoint.pt"

torch.save(
    {
        "policy_net": policy_net.state_dict(),
        "target_net": target_net.state_dict(),
        "optimizer": optimizer.state_dict(),
        "config": {
            "env_name": ENV_NAME,
            "state_dim": state_dim,
            "action_dim": action_dim,
            "lr": LR,
            "gamma": GAMMA,
        },
    },
    checkpoint_path,
)
print("Saved checkpoint:", checkpoint_path)

# PyTorch 2.6+ defaults torch.load(..., weights_only=True), which can reject
# non-tensor metadata in checkpoints. This file is created in this notebook,
# so using weights_only=False is safe here.
loaded = torch.load(checkpoint_path, map_location=DEVICE, weights_only=False)
policy_net.load_state_dict(loaded["policy_net"])
target_net.load_state_dict(loaded["target_net"])
optimizer.load_state_dict(loaded["optimizer"])

state, _ = env_reset(env, seed=2026)
done = False
rollout_return = 0.0
while not done:
    action = select_action(state, epsilon=0.0)
    state, reward, done, _ = env_step(env, action)
    rollout_return += reward

print("Post-load deterministic rollout return:", rollout_return)
env.close()