# DQN (Deep Q-Networks) Tutorial

Welcome to the comprehensive DQN tutorial! This notebook will guide you through all the essential components of Deep Q-Networks for reinforcement learning.

## 📚 What You'll Learn
1. **Q-Learning Review and Limitations** - Why neural networks are needed
2. **Neural Network Architecture** - CNN design for visual inputs
3. **Experience Replay Buffer** - Breaking correlation in training data
4. **Target Network Mechanism** - Stabilizing Q-learning
5. **Epsilon-Greedy Strategy** - Balancing exploration and exploitation
6. **Loss Function and Optimization** - Training the neural network

## 🚀 Getting Started
Make sure you have activated the virtual environment and installed all dependencies:
```bash
source dqn_racing_env/bin/activate  # or activate_env.sh
```

In [None]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
import random
from collections import deque
import cv2
from typing import Tuple, List, Optional
import time

# Set style for plots
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = [12, 8]

print("🎯 DQN Tutorial Environment Setup Complete!")
print(f"PyTorch version: {torch.__version__}")
print(f"Device available: {'GPU' if torch.cuda.is_available() else 'CPU'}")

---
# Section 1: Q-Learning Review and Limitations

Let's start by understanding why we need Deep Q-Networks instead of traditional Q-Tables.

In [None]:
class QLearningDemo:
    """Demonstrates Q-Learning limitations with large state spaces."""
    
    def __init__(self):
        self.q_table_size_examples = {
            "GridWorld 4x4": 16,
            "GridWorld 10x10": 100,
            "Atari (84x84 grayscale)": 256**(84*84),
            "CarRacing (84x84x3)": 256**(84*84*3)
        }
        
    def demonstrate_state_space_explosion(self):
        """Show how state space grows exponentially."""
        print("Q-Table size for different environments:")
        print("-" * 40)
        
        for env_name, size in self.q_table_size_examples.items():
            if size < 1e6:
                print(f"{env_name:<25}: {size:,} states")
            else:
                print(f"{env_name:<25}: {size:.2e} states (IMPOSSIBLE!)")
                
        print("\n🔑 Key Insights:")
        print("📊 Q-Tables work for small, discrete state spaces")
        print("❌ Q-Tables fail for large, continuous, or high-dimensional states")
        print("✅ Neural Networks can approximate Q-functions for complex states")

# Run the demonstration
demo = QLearningDemo()
demo.demonstrate_state_space_explosion()

### 💡 Why This Matters

The CarRacing environment has **continuous visual input** (84x84x3 pixels). A Q-table would need to store values for every possible pixel combination - that's more states than atoms in the observable universe!

**Solution**: Use a neural network to approximate the Q-function: `Q(s,a) ≈ Q_θ(s,a)`

---
# Section 2: Neural Network Architecture

Let's design a CNN-based network for processing visual input from CarRacing.

In [None]:
class DQNNetwork(nn.Module):
    """CNN-based Deep Q-Network for CarRacing environment."""
    
    def __init__(self, action_dim: int = 3, input_channels: int = 4):
        """
        Initialize DQN network.
        
        Args:
            action_dim: Number of possible actions
            input_channels: Number of input channels (frame stack)
        """
        super(DQNNetwork, self).__init__()
        
        # Convolutional layers for feature extraction
        self.conv1 = nn.Conv2d(input_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        
        # Calculate size of flattened features
        self._conv_output_size = self._get_conv_output_size((input_channels, 84, 84))
        
        # Fully connected layers
        self.fc1 = nn.Linear(self._conv_output_size, 512)
        self.fc2 = nn.Linear(512, action_dim)
        
        # Initialize weights
        self._initialize_weights()
        
    def _get_conv_output_size(self, input_shape: Tuple[int, int, int]) -> int:
        """Calculate the output size after convolutional layers."""
        with torch.no_grad():
            dummy_input = torch.zeros(1, *input_shape)
            dummy_output = self._forward_conv(dummy_input)
            return dummy_output.numel()
            
    def _forward_conv(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass through convolutional layers only."""
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return x.flatten(1)
        
    def _initialize_weights(self):
        """Initialize network weights using Xavier initialization."""
        for module in self.modules():
            if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)
                    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the network.
        
        Args:
            x: Input tensor of shape (batch_size, channels, height, width)
            
        Returns:
            Q-values for each action
        """
        # Convolutional feature extraction
        x = self._forward_conv(x)
        
        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x
        
    def get_network_info(self):
        """Get information about the network architecture."""
        total_params = sum(p.numel() for p in self.parameters() if p.requires_grad)
        
        info = {
            "total_parameters": total_params,
            "conv_output_size": self._conv_output_size,
            "input_shape": (4, 84, 84),
            "output_shape": 3
        }
        return info

# Create and analyze the network
dqn = DQNNetwork(action_dim=3, input_channels=4)
info = dqn.get_network_info()

print("🧠 DQN Network Architecture:")
print("-" * 30)
print(f"Input: {info['input_shape']} (4 stacked frames of 84x84)")
print("Conv1: 32 filters, 8x8 kernel, stride 4")
print("Conv2: 64 filters, 4x4 kernel, stride 2") 
print("Conv3: 64 filters, 3x3 kernel, stride 1")
print(f"Flattened features: {info['conv_output_size']}")
print("FC1: 512 neurons")
print(f"Output: {info['output_shape']} Q-values")
print(f"\nTotal parameters: {info['total_parameters']:,}")

# Test forward pass
dummy_input = torch.randn(1, 4, 84, 84)
with torch.no_grad():
    output = dqn(dummy_input)
    
print(f"\n🧪 Example forward pass:")
print(f"Input shape: {dummy_input.shape}")
print(f"Output shape: {output.shape}")
print(f"Output Q-values: {output.squeeze().numpy()}")

### 🔍 Architecture Analysis

Our CNN processes **4 stacked frames** (84x84 each) to capture motion and temporal information:

1. **Conv1**: Large receptive field (8x8) to detect basic features
2. **Conv2**: Medium receptive field (4x4) for pattern combinations  
3. **Conv3**: Small receptive field (3x3) for fine details
4. **FC layers**: Combine features and output Q-values for each action

The network outputs **Q-values** for each action, not action probabilities!

---
# Section 3: Experience Replay Buffer

Experience replay is crucial for stable DQN training. Let's see why!

In [None]:
class ReplayBuffer:
    """Experience Replay Buffer for storing and sampling transitions."""
    
    def __init__(self, capacity: int = 10000):
        """
        Initialize replay buffer.
        
        Args:
            capacity: Maximum number of transitions to store
        """
        self.buffer = deque(maxlen=capacity)
        self.capacity = capacity
        
    def push(self, state, action, reward, next_state, done):
        """
        Add a transition to the buffer.
        
        Args:
            state: Current state
            action: Action taken
            reward: Reward received
            next_state: Next state
            done: Whether episode ended
        """
        transition = (state, action, reward, next_state, done)
        self.buffer.append(transition)
        
    def sample(self, batch_size: int) -> Tuple:
        """
        Sample a batch of transitions.
        
        Args:
            batch_size: Number of transitions to sample
            
        Returns:
            Batch of transitions as separate tensors
        """
        if len(self.buffer) < batch_size:
            raise ValueError(f"Buffer has only {len(self.buffer)} samples, need {batch_size}")
            
        batch = random.sample(self.buffer, batch_size)
        
        # Unpack batch
        states, actions, rewards, next_states, dones = zip(*batch)
        
        # Convert to tensors
        states = torch.FloatTensor(np.array(states))
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(np.array(next_states))
        dones = torch.BoolTensor(dones)
        
        return states, actions, rewards, next_states, dones
        
    def __len__(self):
        """Return current buffer size."""
        return len(self.buffer)
        
    def get_statistics(self):
        """Get buffer statistics."""
        if len(self.buffer) == 0:
            return {"size": 0, "capacity": self.capacity, "utilization": 0.0}
            
        rewards = [transition[2] for transition in self.buffer]
        
        stats = {
            "size": len(self.buffer),
            "capacity": self.capacity,
            "utilization": len(self.buffer) / self.capacity,
            "avg_reward": np.mean(rewards),
            "reward_std": np.std(rewards),
            "min_reward": np.min(rewards),
            "max_reward": np.max(rewards)
        }
        return stats

# Demonstrate experience replay
print("🗃️ Experience Replay Buffer Demo")
print("=" * 40)

# Create buffer
buffer = ReplayBuffer(capacity=1000)

print("Adding sample experiences to buffer...")

# Add some dummy experiences
for i in range(150):
    state = np.random.random((4, 84, 84))
    action = np.random.randint(0, 3)
    reward = np.random.normal(0, 1)  # Random reward
    next_state = np.random.random((4, 84, 84))
    done = np.random.random() < 0.1  # 10% chance of episode end
    
    buffer.push(state, action, reward, next_state, done)
    
# Show buffer statistics
stats = buffer.get_statistics()
print(f"\n📊 Buffer Statistics:")
print(f"  Size: {stats['size']}/{stats['capacity']}")
print(f"  Utilization: {stats['utilization']:.1%}")
print(f"  Average reward: {stats['avg_reward']:.3f}")
print(f"  Reward std: {stats['reward_std']:.3f}")

# Demonstrate sampling
print(f"\n🎲 Sampling batch of 32 experiences...")
states, actions, rewards, next_states, dones = buffer.sample(32)

print(f"Batch shapes:")
print(f"  States: {states.shape}")
print(f"  Actions: {actions.shape}")
print(f"  Rewards: {rewards.shape}")
print(f"  Next states: {next_states.shape}")
print(f"  Dones: {dones.shape}")

print("\n✅ Benefits of Experience Replay:")
print("• Breaks correlation between consecutive experiences")
print("• Enables multiple learning updates from same experience")
print("• Improves sample efficiency")
print("• Stabilizes training by mixing old and new experiences")

In [None]:
# Visualize the importance of experience replay
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

# Simulate correlated vs uncorrelated data
steps = np.arange(100)
correlated_rewards = np.cumsum(np.random.randn(100) * 0.1) + np.sin(steps * 0.1) * 2
uncorrelated_rewards = np.random.randn(100) * 2

ax1.plot(steps, correlated_rewards, 'r-', label='Sequential (Correlated)', linewidth=2)
ax1.set_title('Without Experience Replay')
ax1.set_xlabel('Training Step')
ax1.set_ylabel('Reward')
ax1.legend()
ax1.grid(True, alpha=0.3)

ax2.scatter(steps, uncorrelated_rewards, c='blue', alpha=0.6, label='Random Sampling')
ax2.set_title('With Experience Replay')
ax2.set_xlabel('Training Step')
ax2.set_ylabel('Reward')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.suptitle('Experience Replay Breaks Temporal Correlation', fontsize=16)
plt.tight_layout()
plt.show()

print("💡 The left plot shows correlated sequential data that can bias learning.")
print("💡 The right plot shows how random sampling breaks these correlations.")

---
# Section 4: Target Network Mechanism

Target networks prevent the "moving target" problem in Q-learning.

In [None]:
class TargetNetworkDemo:
    """Demonstrates the target network concept."""
    
    def __init__(self):
        self.main_network = DQNNetwork()
        self.target_network = DQNNetwork()
        
        # Copy main network weights to target network
        self.hard_update()
        
    def hard_update(self):
        """Copy main network weights to target network."""
        self.target_network.load_state_dict(self.main_network.state_dict())
        
    def soft_update(self, tau: float = 0.001):
        """
        Soft update target network weights.
        
        Args:
            tau: Soft update parameter (0 = no update, 1 = hard update)
        """
        for target_param, main_param in zip(
            self.target_network.parameters(), 
            self.main_network.parameters()
        ):
            target_param.data.copy_(
                tau * main_param.data + (1.0 - tau) * target_param.data
            )
            
    def compare_networks(self) -> float:
        """Compare parameter differences between networks."""
        total_diff = 0.0
        total_params = 0
        
        for target_param, main_param in zip(
            self.target_network.parameters(),
            self.main_network.parameters()
        ):
            diff = torch.norm(target_param - main_param).item()
            total_diff += diff
            total_params += target_param.numel()
            
        return total_diff / total_params

# Demonstrate target network mechanism
print("🎯 Target Network Demonstration")
print("=" * 40)

demo = TargetNetworkDemo()

print("Target Network Concept:")
print("-" * 25)
print("Main Network:   Used for action selection and learning")
print("Target Network: Used for Q-target calculation (stable)")
print()

# Show initial state
initial_diff = demo.compare_networks()
print(f"Initial parameter difference: {initial_diff:.6f}")

# Simulate training updates to main network
optimizer = optim.Adam(demo.main_network.parameters(), lr=0.001)

print("\nSimulating training updates...")
differences = []
steps = []

for step in range(20):
    # Dummy loss to update main network
    dummy_input = torch.randn(1, 4, 84, 84)
    loss = demo.main_network(dummy_input).sum()
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # Measure difference
    diff = demo.compare_networks()
    differences.append(diff)
    steps.append(step)
    
    if step % 5 == 0:
        print(f"Step {step:2d}: Parameter difference = {diff:.6f}")
        
    # Hard update every 10 steps
    if step == 10:
        demo.hard_update()
        print(f"        🔄 Hard update performed!")
        
print("\n✅ Target Network Benefits:")
print("• Prevents moving target problem in Q-learning")
print("• Stabilizes training by providing consistent targets")
print("• Reduces correlation between Q-values and targets")
print("• Hard updates every N steps maintain stability")

In [None]:
# Visualize target network updates
plt.figure(figsize=(12, 6))
plt.plot(steps, differences, 'b-o', linewidth=2, markersize=6)
plt.axvline(x=10, color='red', linestyle='--', alpha=0.7, label='Hard Update')
plt.xlabel('Training Step')
plt.ylabel('Parameter Difference')
plt.title('Target Network Parameter Divergence')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print("💡 Notice how the parameter difference grows as the main network learns,")
print("💡 then resets to zero when we perform a hard update at step 10.")

### 🎯 Target Network Math

**Without Target Network** (unstable):
```
Target = r + γ * max Q_θ(s', a')
Loss = (Q_θ(s,a) - Target)²
```

**With Target Network** (stable):
```
Target = r + γ * max Q_θ'(s', a')  ← θ' is fixed!
Loss = (Q_θ(s,a) - Target)²
```

The target network θ' is updated every few thousand steps, providing stable targets.

---
# Section 5: Epsilon-Greedy Strategy

Balancing exploration vs exploitation is crucial for learning.

In [None]:
class EpsilonGreedyStrategy:
    """Implements epsilon-greedy exploration strategy."""
    
    def __init__(self, epsilon_start: float = 1.0, epsilon_end: float = 0.01, 
                 epsilon_decay: float = 0.995):
        """
        Initialize epsilon-greedy strategy.
        
        Args:
            epsilon_start: Initial exploration rate
            epsilon_end: Final exploration rate
            epsilon_decay: Decay factor per episode
        """
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.epsilon = epsilon_start
        self.episode = 0
        
    def get_action(self, q_values: torch.Tensor) -> int:
        """
        Select action using epsilon-greedy policy.
        
        Args:
            q_values: Q-values for all actions
            
        Returns:
            Selected action index
        """
        if np.random.random() < self.epsilon:
            # Explore: random action
            return np.random.randint(len(q_values))
        else:
            # Exploit: best action
            return q_values.argmax().item()
            
    def update_epsilon(self):
        """Update epsilon for next episode."""
        self.epsilon = max(
            self.epsilon_end,
            self.epsilon * self.epsilon_decay
        )
        self.episode += 1
        
    def get_epsilon_schedule(self, num_episodes: int) -> List[float]:
        """Get epsilon values for given number of episodes."""
        epsilons = []
        epsilon = self.epsilon_start
        
        for _ in range(num_episodes):
            epsilons.append(epsilon)
            epsilon = max(self.epsilon_end, epsilon * self.epsilon_decay)
            
        return epsilons

# Demonstrate epsilon-greedy strategy
print("🎲 Epsilon-Greedy Strategy Demo")
print("=" * 40)

strategy = EpsilonGreedyStrategy(
    epsilon_start=1.0,
    epsilon_end=0.01,
    epsilon_decay=0.995
)

# Get epsilon schedule
num_episodes = 500
epsilon_schedule = strategy.get_epsilon_schedule(num_episodes)

# Show key points
print("Epsilon Decay Schedule:")
print("-" * 22)
milestones = [0, 50, 100, 200, 300, 400, 499]
for ep in milestones:
    print(f"Episode {ep:3d}: ε = {epsilon_schedule[ep]:.4f}")
    
# Simulate action selection
print(f"\n🎯 Action Selection Simulation (Episode 0, ε = {epsilon_schedule[0]:.3f}):")
dummy_q_values = torch.tensor([0.1, 0.8, 0.3])  # [steering, gas, brake]

actions = []
for _ in range(100):
    action = strategy.get_action(dummy_q_values)
    actions.append(action)
    
action_counts = np.bincount(actions, minlength=3)
action_names = ['Steering', 'Gas', 'Brake']

print("Action distribution (100 selections):")
for i, (name, count) in enumerate(zip(action_names, action_counts)):
    percentage = count / 100 * 100
    q_val = dummy_q_values[i].item()
    print(f"  {name:8}: {count:2d}/100 ({percentage:4.1f}%) [Q={q_val:.1f}]")
    
print(f"\n💡 Gas has highest Q-value ({dummy_q_values[1]:.1f}) but random")
print("💡 exploration still selects other actions frequently.")

print("\n✅ Exploration vs Exploitation Trade-off:")
print("• High ε: More exploration, discovers new strategies")
print("• Low ε:  More exploitation, uses learned knowledge")
print("• Decay:  Gradually shift from exploration to exploitation")

In [None]:
# Visualize epsilon decay schedule
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

# Epsilon decay over episodes
episodes = range(len(epsilon_schedule))
ax1.plot(episodes, epsilon_schedule, 'b-', linewidth=2)
ax1.set_xlabel('Episode')
ax1.set_ylabel('Epsilon (ε)')
ax1.set_title('Epsilon Decay Schedule')
ax1.grid(True, alpha=0.3)
ax1.axhline(y=0.1, color='red', linestyle='--', alpha=0.7, label='ε = 0.1')
ax1.legend()

# Exploration vs Exploitation over time
exploration_rate = np.array(epsilon_schedule)
exploitation_rate = 1 - exploration_rate

ax2.fill_between(episodes, 0, exploration_rate, alpha=0.6, color='red', label='Exploration')
ax2.fill_between(episodes, exploration_rate, 1, alpha=0.6, color='blue', label='Exploitation')
ax2.set_xlabel('Episode')
ax2.set_ylabel('Probability')
ax2.set_title('Exploration vs Exploitation Balance')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("💡 Early episodes: High exploration to discover the environment")
print("💡 Later episodes: High exploitation to use learned knowledge")

---
# Section 6: Loss Function and Optimization

Let's see how DQN actually learns from experience!

In [None]:
def demonstrate_loss_function():
    """Demonstrate DQN loss function and optimization."""
    print("📊 DQN Loss Function Demo")
    print("=" * 40)
    
    print("DQN Loss Function (Temporal Difference Error):")
    print("-" * 48)
    print("Target: y = r + γ * max(Q_target(s', a'))")
    print("Loss:   L = Huber(Q_main(s, a) - y)")
    print()
    
    # Create networks
    main_network = DQNNetwork()
    target_network = DQNNetwork()
    target_network.load_state_dict(main_network.state_dict())
    
    # Dummy batch
    batch_size = 4
    states = torch.randn(batch_size, 4, 84, 84)
    actions = torch.LongTensor([0, 1, 2, 1])  # [left, straight, right, straight]
    rewards = torch.FloatTensor([0.1, 1.0, -0.5, 0.8])
    next_states = torch.randn(batch_size, 4, 84, 84)
    dones = torch.BoolTensor([False, False, True, False])
    gamma = 0.99
    
    print("📦 Example Batch:")
    print(f"  Batch size: {batch_size}")
    print(f"  Actions: {actions.tolist()}")
    print(f"  Rewards: {rewards.tolist()}")
    print(f"  Dones: {dones.tolist()}")
    print()
    
    # Forward pass
    with torch.no_grad():
        # Current Q-values
        current_q_values = main_network(states)
        current_q_values_selected = current_q_values.gather(1, actions.unsqueeze(1))
        
        # Next Q-values from target network
        next_q_values = target_network(next_states)
        next_q_values_max = next_q_values.max(1)[0]
        
        # Compute targets
        targets = rewards + (gamma * next_q_values_max * (~dones))
        
    print("🧮 Q-Value Computation:")
    print(f"  Current Q-values shape: {current_q_values.shape}")
    print(f"  Selected Q-values: {current_q_values_selected.squeeze().detach().numpy()}")
    print(f"  Next max Q-values: {next_q_values_max.detach().numpy()}")
    print(f"  Targets: {targets.detach().numpy()}")
    
    # Compute loss
    td_errors = current_q_values_selected.squeeze() - targets
    huber_loss = F.smooth_l1_loss(current_q_values_selected.squeeze(), targets)
    mse_loss = F.mse_loss(current_q_values_selected.squeeze(), targets)
    
    print(f"\n📉 Loss Computation:")
    print(f"  TD errors: {td_errors.detach().numpy()}")
    print(f"  Huber loss: {huber_loss.item():.4f}")
    print(f"  MSE loss: {mse_loss.item():.4f}")
    
    print("\n✅ Why Huber Loss?")
    print("• Less sensitive to outliers than MSE")
    print("• Provides stable gradients for large errors")
    print("• Behaves like MSE for small errors, MAE for large errors")
    print("• Improves training stability")
    
    return td_errors.detach().numpy(), huber_loss.item(), mse_loss.item()

# Run the demonstration
td_errors, huber_loss, mse_loss = demonstrate_loss_function()

In [None]:
# Visualize different loss functions
errors = np.linspace(-3, 3, 100)
mse_losses = errors ** 2
mae_losses = np.abs(errors)
huber_losses = np.where(np.abs(errors) <= 1, 0.5 * errors**2, np.abs(errors) - 0.5)

plt.figure(figsize=(12, 6))
plt.plot(errors, mse_losses, 'r-', label='MSE Loss', linewidth=2)
plt.plot(errors, mae_losses, 'g-', label='MAE Loss', linewidth=2)
plt.plot(errors, huber_losses, 'b-', label='Huber Loss', linewidth=2)
plt.xlabel('TD Error')
plt.ylabel('Loss')
plt.title('Comparison of Loss Functions')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axvline(x=0, color='black', linestyle='--', alpha=0.3)
plt.axhline(y=0, color='black', linestyle='--', alpha=0.3)
plt.show()

print("💡 Huber loss combines the best of both worlds:")
print("💡 • Quadratic for small errors (like MSE)")
print("💡 • Linear for large errors (like MAE)")
print("💡 This prevents large outliers from dominating the gradient updates.")

### 🔍 Training Algorithm Summary

Here's the complete DQN training algorithm:

```python
for episode in range(num_episodes):
    state = env.reset()
    
    for step in range(max_steps):
        # 1. Select action using ε-greedy
        if random() < epsilon:
            action = random_action()
        else:
            action = argmax(Q_main(state))
        
        # 2. Take action and observe
        next_state, reward, done = env.step(action)
        
        # 3. Store in replay buffer
        buffer.store(state, action, reward, next_state, done)
        
        # 4. Sample batch and train
        if len(buffer) > batch_size:
            batch = buffer.sample(batch_size)
            
            # Compute targets using target network
            targets = rewards + γ * max(Q_target(next_states))
            
            # Compute loss and update main network
            loss = huber_loss(Q_main(states, actions), targets)
            loss.backward()
            optimizer.step()
        
        # 5. Update target network periodically
        if step % target_update_freq == 0:
            Q_target = copy(Q_main)
        
        state = next_state
        if done: break
    
    # 6. Decay epsilon
    epsilon = max(epsilon_min, epsilon * decay)
```

---
# 🎉 Tutorial Complete!

You now understand all the key components of DQN:

✅ **Neural networks** for Q-function approximation  
✅ **Experience replay** for stable learning  
✅ **Target networks** for training stability  
✅ **Epsilon-greedy** for exploration  
✅ **Huber loss** for robust optimization  

## 🚀 Next Steps

1. **Run the training script**: `python training/dqn_training.py`
2. **Test manual gameplay**: `python games/test_manual_play.py`
3. **Demo trained agent**: `python games/demo_trained_agent.py`

## 📚 Further Reading

- [Original DQN Paper](https://arxiv.org/abs/1312.5602)
- [Human-level control through deep RL](https://www.nature.com/articles/nature14236)
- [Rainbow DQN](https://arxiv.org/abs/1710.02298)

Happy learning! 🏎️💨

In [None]:
# Final summary visualization
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

# 1. Network architecture
layers = ['Input\n(4×84×84)', 'Conv1\n(32 filters)', 'Conv2\n(64 filters)', 'Conv3\n(64 filters)', 'FC1\n(512)', 'Output\n(3 Q-values)']
ax1.barh(range(len(layers)), [1, 0.8, 0.6, 0.4, 0.3, 0.1], color='skyblue')
ax1.set_yticks(range(len(layers)))
ax1.set_yticklabels(layers)
ax1.set_title('DQN Architecture')
ax1.set_xlabel('Relative Size')

# 2. Epsilon decay
episodes = np.arange(500)
epsilon = np.maximum(0.01, 1.0 * (0.995 ** episodes))
ax2.plot(episodes, epsilon, 'b-', linewidth=2)
ax2.set_title('Epsilon Decay')
ax2.set_xlabel('Episode')
ax2.set_ylabel('Epsilon')
ax2.grid(True, alpha=0.3)

# 3. Loss function comparison
x = np.linspace(-2, 2, 100)
huber = np.where(np.abs(x) <= 1, 0.5 * x**2, np.abs(x) - 0.5)
mse = x**2
ax3.plot(x, mse, 'r--', label='MSE', linewidth=2)
ax3.plot(x, huber, 'b-', label='Huber', linewidth=2)
ax3.set_title('Loss Functions')
ax3.set_xlabel('TD Error')
ax3.set_ylabel('Loss')
ax3.legend()
ax3.grid(True, alpha=0.3)

# 4. DQN components
components = ['Neural\nNetwork', 'Experience\nReplay', 'Target\nNetwork', 'ε-Greedy', 'Huber\nLoss']
importance = [0.9, 0.8, 0.7, 0.6, 0.5]
colors = ['red', 'blue', 'green', 'orange', 'purple']
ax4.pie(importance, labels=components, colors=colors, autopct='%1.0f%%', startangle=90)
ax4.set_title('DQN Components')

plt.suptitle('DQN Tutorial Summary', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

print("🎓 Congratulations! You've completed the DQN tutorial.")
print("🏁 Ready to train your own racing agent!")