# ðŸ§  Unit 5.2: Deep Reinforcement Learning (DQN)

**Course:** Advanced Machine Learning (AICC 303)  
**Topic:** 5.6 Deep Reinforcement Learning (Deep Q-Network)

**Why Deep RL?**
Tabular Q-Learning works for small state spaces (like grids). But what about a self-driving car? The state space (camera pixels) is infinite.
We use a **Neural Network** to approximate the Q-Function: $Q(s, a; \theta) \approx Q^*(s, a)$.

---

In [None]:
import numpy as np
import gymnasium as gym
import random
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Setup
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print("State Size:", state_size)  # [Pos, Vel, Angle, AngVel]
print("Action Size:", action_size)  # Left, Right

## 1. The DQN Agent

Key components:
1.  **Model**: NN that takes State â†’ Predicts Q-values for all Actions.
2.  **Memory (Replay Buffer)**: Stores $(s, a, r, s', done)$ tuples. We train on random batches from here to break correlations.
3.  **Epsilon-Greedy**: Policy for exploration.

In [None]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_size)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = QNetwork(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        self.model.eval()
        with torch.no_grad():
            act_values = self.model(state)
        self.model.train()
        return torch.argmax(act_values[0]).item()

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
            
        minibatch = random.sample(self.memory, batch_size)
        
        states = torch.FloatTensor([t[0] for t in minibatch]).to(device)
        actions = torch.LongTensor([t[1] for t in minibatch]).to(device)
        rewards = torch.FloatTensor([t[2] for t in minibatch]).to(device)
        next_states = torch.FloatTensor([t[3] for t in minibatch]).to(device)
        dones = torch.FloatTensor([t[4] for t in minibatch]).to(device)
        
        # Current Q values
        # gather picks the Q-value for the specific action taken
        current_q = self.model(states).gather(1, actions.unsqueeze(1))
        
        # Target Q values
        with torch.no_grad():
            max_next_q = self.model(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * max_next_q
        
        # Compute loss
        loss = self.criterion(current_q.squeeze(), target_q)
        
        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

## 2. Training Loop
We train the agent to balance a pole on a moving cart (`CartPole-v1`).

In [None]:
agent = DQNAgent(state_size, action_size)
EPISODES = 50  # Kept low for demo speed. Real training needs ~1000

for e in range(EPISODES):
    state, _ = env.reset()
    
    for time in range(500):
        action = agent.act(state)
        
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        reward = reward if not done else -10  # Penalize dropping the pole
        
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        
        if done:
            print(f"episode: {e}/{EPISODES}, score: {time}, e: {agent.epsilon:.2f}")
            break
    
    # Train model after episode
    agent.replay(32)