<a href="https://colab.research.google.com/github/acompalas/My-Deep-RL-Journey/blob/main/notebooks/dqn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install imageio
!pip install swig
!pip install "gymnasium[box2d]"

Collecting swig
  Downloading swig-4.3.1.post0-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.1.post0-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m80.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.1.post0
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m27.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp312-cp312-linux_x86_64.whl size=2409498 sha256=30b3ed3c4524fcba0e37c2736bbb5c349284eb70183c71537167d6dbd2eeed3b
  Stored in directory: /root/.c

In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt
import imageio
import tempfile
from IPython.display import Video, display

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
# ============================
# Q-Network
# ============================
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

# ============================
# Replay Buffer
# ============================
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return (
            torch.tensor(states, dtype=torch.float32, device=device),
            torch.tensor(actions, dtype=torch.long, device=device).unsqueeze(1),
            torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1),
            torch.tensor(next_states, dtype=torch.float32, device=device),
            torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1),
        )

    def __len__(self):
        return len(self.buffer)

# ============================
# Loss
# ============================
def compute_loss(batch, q_net, target_net, gamma):
    states, actions, rewards, next_states, dones = batch

    # Q(s,a) prediction
    q_pred = q_net(states).gather(1, actions)

    # Target y = r + γ max_a' Q̂(s',a')
    with torch.no_grad():
        q_next_max = target_net(next_states).max(1, keepdim=True)[0]
        y_target = rewards + gamma * (1 - dones) * q_next_max

    return nn.MSELoss()(q_pred, y_target)

# ============================
# Soft Update Target Net
# ============================
def update_target_network(q_net, target_net, tau=1.0):
    for target_param, param in zip(target_net.parameters(), q_net.parameters()):
        target_param.data.copy_(tau * param.data + (1.0 - tau) * target_param.data)

# ============================
# Agent Learn
# ============================
def agent_learn(batch, q_net, target_net, optimizer, gamma, tau):
    """
    Perform one update step:
    - Compute TD loss
    - Backprop and update behavior net
    - Soft update target net
    """
    loss = compute_loss(batch, q_net, target_net, gamma)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    update_target_network(q_net, target_net, tau)

    return loss.item()

# ============================
# Training Function
# ============================
def train_dqn(episodes=1000, batch_size=64, gamma=0.99, tau=0.001,
              buffer_size=50000, eps_start=1.0, eps_end=0.1, lr=1e-3,
              update_freq=4, solved_score=280):

    env = gym.make("LunarLander-v3", render_mode="rgb_array")
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    q_net = QNetwork(state_dim, action_dim).to(device)
    target_net = QNetwork(state_dim, action_dim).to(device)
    update_target_network(q_net, target_net, tau=1.0)  # Hard copy at start

    optimizer = optim.Adam(q_net.parameters(), lr=lr)
    replay_buffer = ReplayBuffer(buffer_size)

    returns, avg_returns = [], []
    step_count = 0
    epsilon = eps_start

    for ep in range(episodes):
        # epsilon = eps_end + (eps_start - eps_end) * (1 - ep / episodes)
        epsilon = max(0.01, 0.995 * epsilon)
        state, _ = env.reset()
        done, ep_ret = False, 0

        while not done:
            step_count += 1

            # ε-greedy action
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q_vals = q_net(torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0))
                    action = q_vals.argmax().item()

            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            replay_buffer.push(state, action, reward, next_state, done)

            state = next_state
            ep_ret += reward

            # Train only every "update_freq" steps
            if step_count % update_freq == 0 and len(replay_buffer) >= batch_size:
                batch = replay_buffer.sample(batch_size)
                loss = agent_learn(batch, q_net, target_net, optimizer, gamma, tau)

        returns.append(ep_ret)
        avg_return = np.mean(returns[-100:])
        avg_returns.append(avg_return)

        # Print every 10 episodes or when solved
        if (ep + 1) % 10 == 0 or (avg_return >= solved_score and ep >= 100):
            solved_flag = " ✅ Solved!" if avg_return >= solved_score else ""
            print(f"Ep {ep+1}/{episodes} | Return: {ep_ret:.1f} | "
                  f"Avg100: {avg_return:.1f} | Eps {epsilon:.2f}{solved_flag}")

        # Early stopping
        if avg_return >= solved_score and ep >= 100:
            print(f"\nEnvironment solved in {ep+1} episodes!")
            break

    return q_net, returns, avg_returns

# ============================
# Demo Mode (inline video)
# ============================
def run_demo_inline(env, q_net, n_demo_episodes=3):
    q_net.eval()
    frames = []
    for ep in range(n_demo_episodes):
        state, _ = env.reset()
        done = False
        while not done:
            with torch.no_grad():
                q_vals = q_net(torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0))
                action = q_vals.argmax().item()
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            frames.append(env.render())
            state = next_state

    tmpfile = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
    imageio.mimsave(tmpfile.name, frames, fps=30)
    display(Video(tmpfile.name, embed=True, width=500))

In [None]:
q_net, returns, avg_returns = train_dqn(episodes=8000, solved_score=300)

Ep 10/8000 | Return: -84.9 | Avg100: -211.3 | Eps 0.95
Ep 20/8000 | Return: -265.7 | Avg100: -196.6 | Eps 0.90
Ep 30/8000 | Return: -104.7 | Avg100: -161.4 | Eps 0.86
Ep 40/8000 | Return: -35.9 | Avg100: -154.0 | Eps 0.82
Ep 50/8000 | Return: -103.6 | Avg100: -159.2 | Eps 0.78
Ep 60/8000 | Return: -52.0 | Avg100: -167.9 | Eps 0.74
Ep 70/8000 | Return: -360.7 | Avg100: -167.5 | Eps 0.70
Ep 80/8000 | Return: -120.7 | Avg100: -162.1 | Eps 0.67
Ep 90/8000 | Return: -102.8 | Avg100: -153.9 | Eps 0.64
Ep 100/8000 | Return: -183.7 | Avg100: -149.9 | Eps 0.61
Ep 110/8000 | Return: -77.7 | Avg100: -142.5 | Eps 0.58
Ep 120/8000 | Return: -86.4 | Avg100: -137.8 | Eps 0.55
Ep 130/8000 | Return: -111.5 | Avg100: -141.5 | Eps 0.52
Ep 140/8000 | Return: -119.3 | Avg100: -137.5 | Eps 0.50
Ep 150/8000 | Return: 37.7 | Avg100: -130.4 | Eps 0.47
Ep 160/8000 | Return: -155.5 | Avg100: -117.6 | Eps 0.45
Ep 170/8000 | Return: -397.2 | Avg100: -113.9 | Eps 0.43
Ep 180/8000 | Return: -270.8 | Avg100: -115.8 |

In [None]:
demo_env = gym.make("LunarLander-v3", render_mode="rgb_array")
run_demo_inline(demo_env, q_net, n_demo_episodes=10)




In [None]:
SAVE_PATH = "/content/drive/My Drive/Reinforcement Learning/dqn_lunarlander.pt"
torch.save(q_net.state_dict(), SAVE_PATH)
print(f"Model saved to {SAVE_PATH}")

✅ Model saved to /content/drive/My Drive/Reinforcement Learning/dqn_lunarlander.pt
