# üî• Reinforcement Learning with PyTorch: Zero to Hero

Welcome! This notebook will teach you Reinforcement Learning from the ground up, using **PyTorch** for all neural network implementations.

**What you'll learn:**
- What RL is and how it differs from other ML approaches
- The exploration vs exploitation dilemma
- Core algorithms: Bandits, Q-Learning, DQN, Policy Gradients, and more
- How to implement everything in PyTorch

**Prerequisites:** Basic Python, some familiarity with probability, and basic PyTorch knowledge.

---

## Table of Contents
1. [What is Reinforcement Learning?](#section1)
2. [Multi-Armed Bandits](#section2)
3. [Markov Decision Processes](#section3)
4. [Temporal Difference Learning & Q-Learning](#section4)
5. [Deep Q-Networks (DQN)](#section5)
6. [Advanced DQN: Double & Dueling](#section6)
7. [Policy Gradient Methods (REINFORCE)](#section7)
8. [Actor-Critic Methods](#section8)
9. [Proximal Policy Optimization (PPO)](#section9)
10. [Summary & Next Steps](#section10)

In [None]:
# First, let's import the libraries we need
import numpy as np
import matplotlib.pyplot as plt
from collections import deque, namedtuple
import random
from typing import List, Tuple

# PyTorch imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Set seeds for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
random.seed(SEED)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"üî• PyTorch version: {torch.__version__}")
print(f"üì± Using device: {device}")
print("‚úÖ Libraries loaded successfully!")

---
<a id='section1'></a>
# 1. What is Reinforcement Learning?

## The Big Picture

Imagine teaching a dog to fetch. You don't show it thousands of labeled examples of "correct fetching" (that would be supervised learning). Instead:

1. The dog **tries something** (runs toward the ball)
2. You give **feedback** ("Good boy!" + treat, or nothing)
3. The dog **learns** which actions lead to treats

This is **Reinforcement Learning (RL)**: learning through trial and error by receiving rewards or penalties.

## How RL Differs from Other Machine Learning

| Approach | What it needs | How it learns | Example |
|----------|--------------|---------------|--------|
| **Supervised Learning** | Labeled data (input ‚Üí correct output) | Learns to map inputs to outputs | Email spam filter: "This email is spam" |
| **Unsupervised Learning** | Unlabeled data | Finds patterns/structure | Customer segmentation: group similar buyers |
| **Reinforcement Learning** | Environment + rewards | Trial and error | Game AI: +1 for winning, -1 for losing |

## The RL Framework: Agent and Environment

Every RL problem has two main components:

- **Agent**: The learner/decision-maker (like the dog, or a game-playing AI)
- **Environment**: Everything the agent interacts with (the world, the game)

They interact in a loop:

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ                                         ‚îÇ
‚îÇ    Agent ‚îÄ‚îÄaction‚îÄ‚îÄ> Environment        ‚îÇ
‚îÇ      ‚Üë                    ‚îÇ             ‚îÇ
‚îÇ      ‚îî‚îÄ‚îÄreward + state‚îÄ‚îÄ‚îÄ‚îÄ‚îò             ‚îÇ
‚îÇ                                         ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

1. Agent observes the current **state** of the environment
2. Agent chooses an **action**
3. Environment returns a **reward** and the new **state**
4. Repeat!

Let's see this in code with a super simple example.

## üéØ Understanding RL: A Simple Analogy

**Think of RL like learning to ride a bike:**

1. **You (the Agent)** try different things - lean left, lean right, pedal faster
2. **The World (Environment)** responds - you stay balanced or you fall
3. **Feedback (Reward)** tells you how you did - staying up = good, falling = bad
4. **You Learn** - next time, you remember what worked!

**The RL Loop in Plain English:**
```
1. Look at the situation (observe STATE)
2. Decide what to do (choose ACTION)
3. See what happens (get REWARD + new STATE)
4. Learn from it (update your strategy)
5. Repeat!
```

**Key Terms (Don't worry, we'll explain each one!):**
- **Agent**: The learner (like you learning to bike)
- **Environment**: Everything else (the bike, the road, gravity)
- **State**: The current situation (your balance, speed, position)
- **Action**: What you can do (lean, pedal, brake)
- **Reward**: Feedback signal (+1 for staying up, -1 for falling)
- **Policy**: Your strategy ("when tilting left, lean right")

In [None]:
# EXAMPLE: A Simple Guessing Game
# The agent must learn to guess the secret number (0 or 1)

class SimpleEnvironment:
    """
    A tiny environment where the agent must guess a secret number.
    - State: None (no state in this simple game)
    - Actions: 0 or 1 (the guess)
    - Reward: +1 if correct, -1 if wrong
    """
    def __init__(self):
        self.secret = 1  # The correct answer
    
    def step(self, action):
        """Agent takes an action, environment returns reward"""
        if action == self.secret:
            return +1  # Correct! Positive reward
        else:
            return -1  # Wrong! Negative reward

# Let's test it manually
env = SimpleEnvironment()

print("Testing the environment:")
print(f"  Guess 0 ‚Üí reward: {env.step(0)}")
print(f"  Guess 1 ‚Üí reward: {env.step(1)}")
print("\nThe agent needs to learn that action=1 gives positive reward!")

## Building a Learning Agent with PyTorch

Now let's create an agent that **learns** which action is better.

**The agent's strategy:**
1. Try each action at least once
2. Keep track of the average reward for each action
3. Pick the action with the highest average reward

This is called **value estimation** - the agent estimates how valuable each action is.

**Why PyTorch?** Even for this simple example, we'll use PyTorch tensors. This builds good habits for when we need neural networks later!

In [None]:
class SimpleAgent:
    """
    An agent that learns by tracking average rewards.
    Uses PyTorch tensors for Q-value storage.
    """
    def __init__(self, n_actions=2):
        # Track Q-values (estimated value of each action) using PyTorch
        self.q_values = torch.zeros(n_actions)
        self.action_count = torch.zeros(n_actions)
    
    def choose_action(self, epsilon=0.1):
        """Epsilon-greedy: explore with probability epsilon"""
        if random.random() < epsilon:
            return random.randint(0, 1)  # Random exploration
        return self.q_values.argmax().item()  # Exploit best action
    
    def learn(self, action, reward):
        """Update Q-value using incremental mean"""
        self.action_count[action] += 1
        # Incremental mean update: Q = Q + (1/n)(r - Q)
        self.q_values[action] += (reward - self.q_values[action]) / self.action_count[action]

In [None]:
# Watch the agent learn!
agent = SimpleAgent()
env = SimpleEnvironment()

print("The RL Loop in action (using PyTorch tensors):")
print("=" * 50)

for step in range(5):
    # 1. Agent chooses an action
    action = agent.choose_action(epsilon=0.5)  # High exploration at first
    
    # 2. Environment returns reward
    reward = env.step(action)
    
    # 3. Agent learns from the reward
    agent.learn(action, reward)
    
    # Show what happened
    print(f"Step {step + 1}:")
    print(f"  Action chosen: {action}")
    print(f"  Reward received: {reward}")
    print(f"  Q-values: {agent.q_values.tolist()}")
    print()

print("=" * 50)
print(f"üéØ Best action learned: {agent.q_values.argmax().item()}")

---
<a id='section2'></a>
# 2. Multi-Armed Bandits

## üé∞ Why Start with Bandits?

Before we tackle complex RL, let's understand the **core dilemma** with a simple problem.

**Imagine this scenario:**

You walk into a casino with 3 slot machines. Each machine has a different (hidden) win rate:
- Machine A: Wins 20% of the time
- Machine B: Wins 50% of the time  
- Machine C: Wins 75% of the time (the best!)

**But you don't know these rates!** You only find out by playing.

**The Dilemma:**
- If you always play the machine that *seems* best so far ‚Üí you might miss a better one!
- If you keep trying new machines ‚Üí you waste plays on bad ones!

This is the **Exploration vs Exploitation** tradeoff:
- **Explore**: Try new things to learn more
- **Exploit**: Use what you already know works

**Real-world examples:**
- üçï Restaurant: Try new places (explore) or go to your favorite (exploit)?
- üì∫ Netflix: Watch something new (explore) or rewatch a favorite (exploit)?
- üíº Career: Learn new skills (explore) or deepen existing ones (exploit)?

## The Problem: Slot Machines

Imagine you're in a casino with **K different slot machines** (called "one-armed bandits"). Each machine has a different probability of paying out, but you don't know what those probabilities are.

**Your goal:** Maximize your total winnings over N plays.

**The challenge:** You face a fundamental dilemma:

- **Exploration**: Try different machines to discover which ones pay better
- **Exploitation**: Stick with the machine that seems best so far

If you only exploit, you might miss a better machine. If you only explore, you waste plays on bad machines.

## Why This Matters

This isn't just about casinos! The exploration-exploitation tradeoff appears everywhere:

- **A/B testing**: Which website design converts better?
- **Clinical trials**: Which treatment is most effective?
- **Recommendations**: Show content you know the user likes, or try something new?

Let's build a bandit environment and try different strategies.

In [None]:
class MultiArmedBandit:
    """
    K slot machines, each with a hidden win probability.
    
    When you pull arm i:
    - You win (reward=1) with probability p[i]
    - You lose (reward=0) with probability 1-p[i]
    """
    def __init__(self, probabilities=[0.2, 0.5, 0.75]):
        # Hidden probabilities - the agent doesn't know these!
        self.probabilities = torch.tensor(probabilities)
        self.n_arms = len(probabilities)
    
    def pull(self, arm):
        """Pull an arm and get a reward (0 or 1)"""
        if random.random() < self.probabilities[arm].item():
            return 1  # Win!
        else:
            return 0  # Lose

# Test the bandit
bandit = MultiArmedBandit([0.2, 0.5, 0.75])

print("Testing each arm 10 times:")
print("(Remember: arm 2 has 75% win rate, so it should win most often)\n")

for arm in range(3):
    wins = sum([bandit.pull(arm) for _ in range(10)])
    print(f"Arm {arm}: {wins}/10 wins (true probability: {bandit.probabilities[arm].item()})")

## Strategy 1: Pure Greedy (Always Exploit)

**The idea:** Always pick the arm with the highest estimated value.

**How we estimate value:** Track the average reward for each arm:

$Q(a) = \frac{\text{total reward from arm } a}{\text{number of times we pulled arm } a}$

**The problem:** If we get unlucky early (a good arm gives a bad result), we might never try it again!

Let's see this failure in action.

In [None]:
def run_greedy_strategy(bandit, n_steps=200):
    """
    Pure greedy: always pick the arm with highest estimated value.
    Returns the history of rewards and which arms were pulled.
    """
    n_arms = bandit.n_arms
    
    # Track estimates for each arm using PyTorch tensors
    Q = torch.zeros(n_arms)        # Estimated value of each arm
    N = torch.zeros(n_arms)        # Number of times each arm was pulled
    
    rewards = []                # History of rewards
    arms_pulled = []            # History of which arm was pulled
    
    for step in range(n_steps):
        # GREEDY: Pick arm with highest Q value
        arm = Q.argmax().item()
        
        # Pull the arm and get reward
        reward = bandit.pull(arm)
        
        # Update our estimate using incremental average
        N[arm] += 1
        Q[arm] = Q[arm] + (reward - Q[arm]) / N[arm]
        
        rewards.append(reward)
        arms_pulled.append(arm)
    
    return rewards, arms_pulled, N

# Run with a specific seed to show the problem
torch.manual_seed(0)
random.seed(0)
bandit = MultiArmedBandit([0.2, 0.5, 0.75])

rewards, arms, counts = run_greedy_strategy(bandit, n_steps=200)

print("Greedy Strategy Results:")
print(f"  Times each arm was pulled: {counts.tolist()}")
print(f"  Total reward: {sum(rewards)} out of 200 possible")
print(f"\n‚ö†Ô∏è Problem: The agent might get stuck on a suboptimal arm!")

## Strategy 2: Epsilon-Greedy (Explore + Exploit)

**The fix:** Sometimes explore randomly!

**The algorithm:**
- With probability **Œµ** (epsilon): pick a **random** arm (explore)
- With probability **1-Œµ**: pick the **best** arm (exploit)

Mathematically:

$
\text{action} = 
\begin{cases}
\text{random arm} & \text{with probability } \epsilon \\
\arg\max_a Q(a) & \text{with probability } 1-\epsilon
\end{cases}
$

**Common values for Œµ:**
- Œµ = 0.1 means 10% exploration, 90% exploitation
- Œµ = 0.01 means 1% exploration (more exploitation)
- Œµ = 0 is pure greedy (no exploration)

In [None]:
def run_epsilon_greedy(bandit, epsilon=0.1, n_steps=200):
    """
    Epsilon-greedy: explore with probability epsilon,
    otherwise exploit the best arm.
    """
    n_arms = bandit.n_arms
    Q = torch.zeros(n_arms)  # Estimated values
    N = torch.zeros(n_arms)  # Pull counts
    rewards = []
    
    for step in range(n_steps):
        # Flip a coin: explore or exploit?
        if random.random() < epsilon:
            # EXPLORE: pick a random arm
            arm = random.randint(0, n_arms - 1)
        else:
            # EXPLOIT: pick the best arm
            arm = Q.argmax().item()
        
        # Pull and update
        reward = bandit.pull(arm)
        N[arm] += 1
        Q[arm] = Q[arm] + (reward - Q[arm]) / N[arm]
        rewards.append(reward)
    
    return rewards, N, Q

# Compare greedy vs epsilon-greedy
torch.manual_seed(42)
random.seed(42)
bandit = MultiArmedBandit([0.2, 0.5, 0.75])

greedy_rewards, _, greedy_counts = run_greedy_strategy(bandit, 1000)
eps_rewards, eps_counts, eps_Q = run_epsilon_greedy(bandit, epsilon=0.1, n_steps=1000)

print("Results over 1000 steps:")
print("\nGreedy:")
print(f"  Arm pulls: {greedy_counts.tolist()}")
print(f"  Total reward: {sum(greedy_rewards)}")

print("\nEpsilon-Greedy (Œµ=0.1):")
print(f"  Arm pulls: {eps_counts.tolist()}")
print(f"  Total reward: {sum(eps_rewards)}")
print(f"  Learned Q-values: {eps_Q.tolist()}")

print("\n‚úÖ Epsilon-greedy explores and finds the best arm!")

## Strategy 3: Upper Confidence Bound (UCB)

**The smartest approach:** Explore arms we're **uncertain** about!

**The intuition:**
- If we've pulled an arm many times, we're confident about its value
- If we've barely tried an arm, we're uncertain - it might be great!
- UCB adds a "bonus" for uncertainty

**The formula:**

$A_t = \arg\max_a \left[ Q(a) + c \sqrt{\frac{\ln t}{N(a)}} \right]$

Where:
- $Q(a)$ = estimated value of arm a
- $N(a)$ = number of times we pulled arm a  
- $t$ = total number of steps so far
- $c$ = exploration parameter (typically 2)

**The bonus term** $\sqrt{\frac{\ln t}{N(a)}}$:
- Gets smaller as we pull arm a more (less uncertainty)
- Gets larger as total time increases (encourages trying neglected arms)

In [None]:
def run_ucb(bandit, c=2.0, n_steps=200):
    """
    Upper Confidence Bound: balance exploitation and exploration
    using uncertainty estimates.
    """
    n_arms = bandit.n_arms
    Q = torch.zeros(n_arms)
    N = torch.zeros(n_arms)
    rewards = []
    
    for t in range(1, n_steps + 1):
        # First, try each arm once
        if t <= n_arms:
            arm = t - 1
        else:
            # UCB selection: Q(a) + exploration bonus
            ucb_values = Q + c * torch.sqrt(torch.log(torch.tensor(float(t))) / (N + 1e-5))
            arm = ucb_values.argmax().item()
        
        reward = bandit.pull(arm)
        N[arm] += 1
        Q[arm] = Q[arm] + (reward - Q[arm]) / N[arm]
        rewards.append(reward)
    
    return rewards, N, Q

# Test UCB
torch.manual_seed(42)
random.seed(42)
bandit = MultiArmedBandit([0.2, 0.5, 0.75])

ucb_rewards, ucb_counts, ucb_Q = run_ucb(bandit, c=2.0, n_steps=1000)

print("UCB Results:")
print(f"  Arm pulls: {ucb_counts.tolist()}")
print(f"  Total reward: {sum(ucb_rewards)}")
print(f"  Learned Q-values: {[round(q, 3) for q in ucb_Q.tolist()]}")

In [None]:
# Compare ALL strategies
torch.manual_seed(42)
random.seed(42)
bandit = MultiArmedBandit([0.2, 0.5, 0.75])
n_steps = 1000

greedy_r, _, _ = run_greedy_strategy(bandit, n_steps)
eps_r, _, _ = run_epsilon_greedy(bandit, epsilon=0.1, n_steps=n_steps)
ucb_r, _, _ = run_ucb(bandit, c=2.0, n_steps=n_steps)

# Plot comparison
plt.figure(figsize=(12, 5))
plt.plot(np.cumsum(greedy_r), label='Greedy', alpha=0.8)
plt.plot(np.cumsum(eps_r), label='Œµ-Greedy (Œµ=0.1)', alpha=0.8)
plt.plot(np.cumsum(ucb_r), label='UCB (c=2)', alpha=0.8)

plt.xlabel('Steps')
plt.ylabel('Cumulative Reward')
plt.title('Comparison of Bandit Strategies')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print("\nFinal Rewards:")
print(f"  Greedy:     {sum(greedy_r)}")
print(f"  Œµ-Greedy:   {sum(eps_r)}")
print(f"  UCB:        {sum(ucb_r)}")

---
<a id='section3'></a>
# 3. Markov Decision Processes (MDPs)

## Beyond Bandits: Adding States

In bandits, there's no "state" - every decision is independent. But most real problems have **states**:

- Chess: The board position is the state
- Robot navigation: The robot's location is the state
- Game: Health, inventory, position are all part of the state

**Markov Decision Process (MDP)** is the mathematical framework for RL with states.

## MDP Components

An MDP is defined by:

- **S**: Set of states (where can the agent be?)
- **A**: Set of actions (what can the agent do?)
- **P(s'|s,a)**: Transition probabilities (if I do action a in state s, where do I end up?)
- **R(s,a,s')**: Reward function (what reward do I get?)
- **Œ≥** (gamma): Discount factor (how much do we care about future rewards?)

## The Markov Property

**Key assumption:** The future only depends on the current state, not the history.

$P(S_{t+1} | S_t, A_t) = P(S_{t+1} | S_0, A_0, S_1, A_1, ..., S_t, A_t)$

In plain English: "Where I go next depends only on where I am now, not how I got here."

## Value Functions

**Key idea:** Some states are better than others. We want to quantify this.

### State-Value Function V(s)

"How good is it to be in state s?"

$V^\pi(s) = \mathbb{E}_\pi[G_t | S_t = s]$

The expected return starting from state s and following policy œÄ.

### Action-Value Function Q(s, a)

"How good is it to take action a in state s?"

$Q^\pi(s, a) = \mathbb{E}_\pi[G_t | S_t = s, A_t = a]$

The expected return starting from state s, taking action a, then following policy œÄ.

## The Bellman Equation

The **Bellman equation** is the foundation of RL. It says:

$V(s) = \mathbb{E}[R + \gamma V(s')]$

In words: "The value of a state = immediate reward + discounted value of next state"

This creates a **recursive relationship** between state values.

### The Bellman Optimality Equation

For the optimal policy:

$V^*(s) = \max_a \sum_{s'} P(s'|s,a)[R(s,a,s') + \gamma V^*(s')]$

"The optimal value = best action's expected (reward + discounted future value)"

## Discounted Return

**Question:** How do we measure how good a sequence of actions is?

**Answer:** Sum up the rewards, but discount future rewards:

$G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + ... = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$

Where **Œ≥ (gamma)** is the discount factor (0 ‚â§ Œ≥ ‚â§ 1):
- Œ≥ = 0: Only care about immediate reward
- Œ≥ = 1: Care equally about all future rewards
- Œ≥ = 0.9: Typical value - future rewards matter but less than immediate

**Why discount?**
1. Uncertainty: Future is less certain
2. Math: Makes infinite sums converge
3. Economics: Money now is worth more than money later

In [None]:
# A Simple Grid World MDP
# The agent moves in a 4x4 grid trying to reach the goal

class GridWorld:
    """
    A 4x4 grid world:
    
    [0 ][1 ][2 ][3 ]
    [4 ][5 ][6 ][7 ]
    [8 ][9 ][10][11]
    [12][13][14][G ]  <- 15 is the goal!
    
    Actions: 0=up, 1=right, 2=down, 3=left
    Reward: +10 for reaching goal, -1 for each step
    """
    def __init__(self, size=4):
        self.size = size
        self.n_states = size * size
        self.n_actions = 4
        self.goal = self.n_states - 1
        self.state = 0  # Start at top-left
    
    def reset(self):
        """Reset to starting position"""
        self.state = 0
        return self.state
    
    def step(self, action):
        """Take an action, return (new_state, reward, done)"""
        row, col = self.state // self.size, self.state % self.size
        
        # Move based on action
        if action == 0 and row > 0:              # Up
            row -= 1
        elif action == 1 and col < self.size - 1:  # Right
            col += 1
        elif action == 2 and row < self.size - 1:  # Down
            row += 1
        elif action == 3 and col > 0:              # Left
            col -= 1
        
        self.state = row * self.size + col
        
        # Check if we reached the goal
        if self.state == self.goal:
            return self.state, +10, True  # Big reward, episode done
        else:
            return self.state, -1, False  # Small penalty, continue
    
    def get_state_tensor(self, state=None):
        """One-hot encode state for neural networks"""
        if state is None:
            state = self.state
        tensor = torch.zeros(self.n_states)
        tensor[state] = 1.0
        return tensor
    
    def render(self):
        """Print the grid with agent position"""
        for i in range(self.n_states):
            if i == self.state:
                print('A', end=' ')  # Agent
            elif i == self.goal:
                print('G', end=' ')  # Goal
            else:
                print('.', end=' ')
            if (i + 1) % self.size == 0:
                print()

# Test the environment
env = GridWorld()
print("Starting position:")
env.render()

print("\nTaking actions: right, right, down, down, down, right")
actions = [1, 1, 2, 2, 2, 1]  # right, right, down, down, down, right
for a in actions:
    state, reward, done = env.step(a)
    print(f"Action {['up','right','down','left'][a]}: state={state}, reward={reward}, done={done}")

print("\nFinal position:")
env.render()

In [None]:
def calculate_return(rewards, gamma=0.9):
    """
    Calculate discounted return from a sequence of rewards.
    G = r0 + Œ≥*r1 + Œ≥¬≤*r2 + ...
    """
    G = 0
    for t, r in enumerate(rewards):
        G += (gamma ** t) * r
    return G

# Example: rewards from a 6-step episode
rewards = [-1, -1, -1, -1, -1, 10]  # 5 steps of -1, then +10 at goal

print("Rewards:", rewards)
print("\nDiscounted returns with different Œ≥:")
for gamma in [0.0, 0.5, 0.9, 1.0]:
    G = calculate_return(rewards, gamma)
    print(f"  Œ≥={gamma}: G = {G:.2f}")

print("\nNotice: Higher Œ≥ values the +10 goal reward more!")

---
<a id='section4'></a>
# 4. Temporal Difference Learning & Q-Learning

## ‚ö° TD Learning: The Key Insight

**The Problem with Monte Carlo:**

Monte Carlo waits until the END of an episode to learn. But what if:
- Episodes are very long?
- Episodes never end (continuing tasks)?
- We want to learn faster?

**TD's Brilliant Idea: Learn from EVERY step!**

Instead of waiting for the true return G, we **estimate** it:

```
True return:      G = r‚ÇÅ + Œ≥r‚ÇÇ + Œ≥¬≤r‚ÇÉ + ... (need whole episode)
TD estimate:      G ‚âà r‚ÇÅ + Œ≥V(next_state)   (just need one step!)
```

**Why does this work?**

If V(s) is a good estimate of future rewards from state s, then:
- r + Œ≥V(s') is a good estimate of total return
- We "bootstrap" - use our own estimates to improve our estimates!

**The TD Update (in plain English):**
```
1. I thought state s was worth V(s)
2. I took an action and got reward r
3. I ended up in state s', which I think is worth V(s')
4. So maybe s is actually worth: r + Œ≥V(s')
5. Update V(s) a little bit toward this new estimate
```

**Formula:**
$V(s) \leftarrow V(s) + \alpha \cdot [r + \gamma V(s') - V(s)]$

Where:
- $\alpha$ = learning rate (how much to update)
- $r + \gamma V(s')$ = TD target (what we think V(s) should be)
- $r + \gamma V(s') - V(s)$ = TD error (how wrong we were)

## üéÆ Q-Learning: Learning the Best Actions

**Q-Learning is TD Learning for action-values (Q-values).**

**The Key Idea:**

We want to learn Q(s, a) - how good is action a in state s?

**The Q-Learning Update:**
$Q(s, a) \leftarrow Q(s, a) + \alpha \cdot [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]$

**Let's break this down:**

1. **Current estimate:** Q(s, a) - what we currently think

2. **TD Target:** r + Œ≥ max Q(s', a')
   - r = immediate reward we got
   - Œ≥ = discount factor (0.9 means future is 90% as important)
   - max Q(s', a') = value of best action in next state
   
3. **TD Error:** target - current = how wrong we were

4. **Update:** Move Q(s,a) a little toward the target

**Why "max"?**

Q-learning is **optimistic** - it assumes we'll act optimally in the future.
Even if we explore randomly now, we learn the value of acting optimally.

**Simple Example:**
```
State: s=0, Action: a=right, Reward: r=-1, Next state: s'=1
Current: Q(0, right) = 0
Next state values: Q(1, up)=2, Q(1, right)=5, Q(1, down)=3
Max Q(s', a') = 5

Target = -1 + 0.9 √ó 5 = 3.5
Error = 3.5 - 0 = 3.5
New Q(0, right) = 0 + 0.1 √ó 3.5 = 0.35
```

In [None]:
class TabularQLearning:
    """
    Tabular Q-Learning with PyTorch tensors.
    
    This is the foundation of all value-based RL!
    """
    
    def __init__(self, n_states, n_actions, lr=0.1, gamma=0.99, epsilon=0.1):
        # Q-table: stores Q(s,a) for all state-action pairs
        self.Q = torch.zeros(n_states, n_actions)
        self.lr = lr           # Learning rate (Œ±)
        self.gamma = gamma     # Discount factor (Œ≥)
        self.epsilon = epsilon # Exploration rate (Œµ)
        self.n_actions = n_actions
    
    def act(self, state, training=True):
        """Epsilon-greedy action selection"""
        if training and random.random() < self.epsilon:
            return random.randrange(self.n_actions)  # Explore
        return self.Q[state].argmax().item()  # Exploit
    
    def learn(self, state, action, reward, next_state, done):
        """
        Q-Learning update:
        Q(s,a) ‚Üê Q(s,a) + Œ±[r + Œ≥ max Q(s',a') - Q(s,a)]
        """
        # TD Target: r + Œ≥ * max Q(s', a')
        if done:
            target = reward  # No future rewards if episode ended
        else:
            target = reward + self.gamma * self.Q[next_state].max().item()
        
        # TD Error: target - current
        td_error = target - self.Q[state, action].item()
        
        # Update Q-value
        self.Q[state, action] += self.lr * td_error
        
        return td_error

In [None]:
def train_tabular_q(env, agent, n_episodes=500):
    """Train Q-Learning agent on GridWorld"""
    rewards_history = []
    
    for ep in range(n_episodes):
        state = env.reset()
        total_reward = 0
        
        for _ in range(100):  # Max steps per episode
            # 1. Choose action
            action = agent.act(state)
            
            # 2. Take action, observe result
            next_state, reward, done = env.step(action)
            
            # 3. Learn from experience
            agent.learn(state, action, reward, next_state, done)
            
            total_reward += reward
            state = next_state
            
            if done:
                break
        
        rewards_history.append(total_reward)
    
    return rewards_history

# Train!
env = GridWorld()
agent = TabularQLearning(env.n_states, env.n_actions, lr=0.1, gamma=0.9, epsilon=0.1)
rewards = train_tabular_q(env, agent, n_episodes=500)

print("Tabular Q-Learning Results:")
print(f"Average reward (last 50): {np.mean(rewards[-50:]):.2f}")

# Show learned policy
symbols = ['‚Üë', '‚Üí', '‚Üì', '‚Üê']
print("\nLearned Policy:")
for i in range(env.n_states):
    if i == env.goal:
        print('G', end=' ')
    else:
        print(symbols[agent.Q[i].argmax().item()], end=' ')
    if (i + 1) % env.size == 0:
        print()

In [None]:
# Plot learning curve
plt.figure(figsize=(10, 4))
window = 20
smoothed = np.convolve(rewards, np.ones(window)/window, mode='valid')
plt.plot(smoothed)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Tabular Q-Learning on GridWorld')
plt.grid(True, alpha=0.3)
plt.show()

print("The agent learns to reach the goal efficiently!")
print(f"Optimal path gives reward: -1 √ó 6 + 10 = 4 (6 steps to goal)")

## SARSA: On-Policy TD Control

**SARSA** is similar to Q-Learning but **on-policy**.

**Update rule:**

$Q(S_t, A_t) \leftarrow Q(S_t, A_t) + \alpha [R_{t+1} + \gamma Q(S_{t+1}, A_{t+1}) - Q(S_t, A_t)]$

**Difference from Q-Learning:**
- Q-Learning: Uses max_a Q(s', a) - learns optimal policy
- SARSA: Uses Q(s', a') where a' is the actual next action - learns the policy being followed

**Name:** SARSA = State, Action, Reward, State, Action

| Algorithm | Update Target | Type |
|-----------|--------------|------|
| Q-Learning | r + Œ≥ max Q(s', a') | Off-policy |
| SARSA | r + Œ≥ Q(s', a') | On-policy |

In [None]:
class SARSA:
    """SARSA: On-policy TD control"""
    
    def __init__(self, n_states, n_actions, lr=0.1, gamma=0.99, epsilon=0.1):
        self.Q = torch.zeros(n_states, n_actions)
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.n_actions = n_actions
    
    def act(self, state):
        if random.random() < self.epsilon:
            return random.randrange(self.n_actions)
        return self.Q[state].argmax().item()
    
    def learn(self, state, action, reward, next_state, next_action, done):
        """SARSA update: uses actual next action, not max"""
        if done:
            target = reward
        else:
            # Key difference: use Q(s', a') not max Q(s', a')
            target = reward + self.gamma * self.Q[next_state, next_action].item()
        
        td_error = target - self.Q[state, action].item()
        self.Q[state, action] += self.lr * td_error

# Train SARSA
env = GridWorld()
sarsa_agent = SARSA(env.n_states, env.n_actions, lr=0.1, gamma=0.9, epsilon=0.1)
sarsa_rewards = []

for ep in range(500):
    state = env.reset()
    action = sarsa_agent.act(state)
    total_reward = 0
    
    for _ in range(100):
        next_state, reward, done = env.step(action)
        next_action = sarsa_agent.act(next_state)
        sarsa_agent.learn(state, action, reward, next_state, next_action, done)
        
        total_reward += reward
        state, action = next_state, next_action
        if done:
            break
    
    sarsa_rewards.append(total_reward)

print(f"SARSA Average reward (last 50): {np.mean(sarsa_rewards[-50:]):.2f}")

---
<a id='section5'></a>
# 5. Deep Q-Networks (DQN)

## From Tables to Neural Networks

**The Problem with Tabular Q-Learning:**

When state spaces are large (or continuous), we can't store Q-values in a table:
- Atari games: ~10^70 possible states
- Robot control: Continuous positions, velocities
- Go: More states than atoms in the universe!

**The Solution:** Use a neural network to **approximate** Q(s,a)!

Instead of:
```
Q[state][action] = value  (table lookup)
```

We use:
```
Q(state, action) = neural_network(state)[action]  (function approximation)
```

## Key DQN Innovations (DeepMind, 2015)

Training neural networks with RL is tricky! DQN introduced two key ideas:

### 1. Experience Replay

**Problem:** Sequential experiences are correlated (state t is similar to state t+1).
Neural networks don't like correlated data!

**Solution:** Store experiences in a buffer, sample randomly to break correlations.

```
Buffer: [(s‚ÇÅ,a‚ÇÅ,r‚ÇÅ,s‚ÇÅ'), (s‚ÇÇ,a‚ÇÇ,r‚ÇÇ,s‚ÇÇ'), ...]
Training: Sample random batch from buffer
```

### 2. Target Network

**Problem:** We're updating Q toward a moving target (Q itself changes during training).
This causes instability!

**Solution:** Use a separate "target network" that updates slowly.

```
Q_network: Updated every step
Q_target: Copied from Q_network every N steps
```

## The DQN Loss Function

$\mathcal{L} = \mathbb{E}\left[\left(r + \gamma \max_{a'} Q_{\text{target}}(s',a') - Q(s,a)\right)^2\right]$

This is just MSE between our prediction and the TD target!

In [None]:
# Experience Replay Buffer
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayBuffer:
    """
    Fixed-size buffer to store experience tuples.
    
    Why replay?
    1. Breaks correlation between consecutive experiences
    2. Allows reusing rare experiences multiple times
    3. Smooths out learning over many past experiences
    """
    
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, *args):
        """Save a transition"""
        self.buffer.append(Transition(*args))
    
    def sample(self, batch_size):
        """Sample a random batch of transitions"""
        transitions = random.sample(self.buffer, batch_size)
        batch = Transition(*zip(*transitions))
        
        # Convert to tensors
        states = torch.stack(batch.state)
        actions = torch.tensor(batch.action, dtype=torch.long)
        rewards = torch.tensor(batch.reward, dtype=torch.float32)
        next_states = torch.stack(batch.next_state)
        dones = torch.tensor(batch.done, dtype=torch.float32)
        
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)

# Test replay buffer
buffer = ReplayBuffer(1000)
env = GridWorld()
state = env.reset()
state_tensor = env.get_state_tensor(state)

# Fill buffer with random experiences
for _ in range(100):
    action = random.randrange(4)
    next_state, reward, done = env.step(action)
    next_state_tensor = env.get_state_tensor(next_state)
    buffer.push(state_tensor, action, reward, next_state_tensor, done)
    state = next_state if not done else env.reset()
    state_tensor = env.get_state_tensor(state)

print(f"Buffer size: {len(buffer)}")
states, actions, rewards, next_states, dones = buffer.sample(5)
print(f"Sample batch shapes: states={states.shape}, actions={actions.shape}")

In [None]:
class DQN(nn.Module):
    """
    Deep Q-Network: A neural network that outputs Q-values.
    
    Input: State (one-hot encoded)
    Output: Q-value for each action
    
    Architecture:
    state ‚Üí [Linear ‚Üí ReLU] ‚Üí [Linear ‚Üí ReLU] ‚Üí [Linear] ‚Üí Q-values
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(n_states, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, n_actions)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)  # Output Q-values for all actions

# Test the network
net = DQN(16, 4)
test_state = torch.randn(1, 16)
q_values = net(test_state)
print(f"DQN output shape: {q_values.shape}")
print(f"Q-values: {q_values.detach().numpy().round(3)}")

In [None]:
class DQNAgent:
    """
    DQN Agent with experience replay and target network.
    
    This is the complete DQN algorithm from DeepMind!
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64, lr=1e-3, 
                 gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.n_actions = n_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        
        # Q-Network (the one we train)
        self.q_network = DQN(n_states, n_actions, hidden_size).to(device)
        
        # Target Network (for stable targets)
        self.target_network = DQN(n_states, n_actions, hidden_size).to(device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.buffer = ReplayBuffer(10000)
    
    def act(self, state, training=True):
        """Epsilon-greedy action selection"""
        if training and random.random() < self.epsilon:
            return random.randrange(self.n_actions)
        
        with torch.no_grad():
            state = state.unsqueeze(0).to(device)
            q_values = self.q_network(state)
            return q_values.argmax(dim=1).item()
    
    def store(self, state, action, reward, next_state, done):
        """Store experience in replay buffer"""
        self.buffer.push(state, action, reward, next_state, done)
    
    def learn(self, batch_size=64):
        """Sample from buffer and update Q-network"""
        if len(self.buffer) < batch_size:
            return 0
        
        # Sample batch
        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
        states = states.to(device)
        actions = actions.to(device)
        rewards = rewards.to(device)
        next_states = next_states.to(device)
        dones = dones.to(device)
        
        # Current Q values: Q(s, a)
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Target Q values: r + Œ≥ max Q_target(s', a')
        with torch.no_grad():
            next_q = self.target_network(next_states).max(dim=1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)
        
        # Loss: MSE between current and target
        loss = F.mse_loss(current_q, target_q)
        
        # Backpropagation
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def update_target(self):
        """Copy Q-network weights to target network"""
        self.target_network.load_state_dict(self.q_network.state_dict())
    
    def decay_epsilon(self):
        """Reduce exploration over time"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

# Create DQN agent
env = GridWorld()
agent = DQNAgent(env.n_states, env.n_actions, hidden_size=64, lr=1e-3)
print(f"DQN Agent created!")
print(f"Q-Network: {agent.q_network}")

In [None]:
def train_dqn(env, agent, n_episodes=300, target_update=10, batch_size=64):
    """Train DQN agent"""
    rewards_history = []
    losses = []
    
    for episode in range(n_episodes):
        state = env.reset()
        state_tensor = env.get_state_tensor(state)
        total_reward = 0
        
        for step in range(100):
            # 1. Choose action
            action = agent.act(state_tensor)
            
            # 2. Take action
            next_state, reward, done = env.step(action)
            next_state_tensor = env.get_state_tensor(next_state)
            
            # 3. Store experience
            agent.store(state_tensor, action, reward, next_state_tensor, done)
            
            # 4. Learn from replay buffer
            loss = agent.learn(batch_size)
            if loss > 0:
                losses.append(loss)
            
            total_reward += reward
            state_tensor = next_state_tensor
            
            if done:
                break
        
        rewards_history.append(total_reward)
        agent.decay_epsilon()
        
        # Update target network periodically
        if episode % target_update == 0:
            agent.update_target()
        
        if (episode + 1) % 50 == 0:
            avg = np.mean(rewards_history[-50:])
            print(f"Episode {episode+1}: avg_reward={avg:.2f}, epsilon={agent.epsilon:.3f}")
    
    return rewards_history, losses

# Train DQN
env = GridWorld()
agent = DQNAgent(env.n_states, env.n_actions, hidden_size=64, lr=1e-3)
rewards, losses = train_dqn(env, agent, n_episodes=300)

print(f"\nFinal average reward: {np.mean(rewards[-50:]):.2f}")

In [None]:
# Plot DQN results
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Rewards
window = 20
smoothed = np.convolve(rewards, np.ones(window)/window, mode='valid')
axes[0].plot(smoothed)
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Total Reward')
axes[0].set_title('DQN Learning Curve')
axes[0].grid(True, alpha=0.3)

# Loss
if losses:
    loss_smooth = np.convolve(losses, np.ones(100)/100, mode='valid')
    axes[1].plot(loss_smooth)
    axes[1].set_xlabel('Training Step')
    axes[1].set_ylabel('Loss')
    axes[1].set_title('DQN Loss')
    axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Show learned policy
print("\nDQN Learned Policy:")
symbols = ['‚Üë', '‚Üí', '‚Üì', '‚Üê']
for i in range(env.n_states):
    if i == env.goal:
        print('G', end=' ')
    else:
        state_tensor = env.get_state_tensor(i)
        with torch.no_grad():
            q_values = agent.q_network(state_tensor.unsqueeze(0).to(device))
            action = q_values.argmax().item()
        print(symbols[action], end=' ')
    if (i + 1) % env.size == 0:
        print()

---
<a id='section6'></a>
# 6. Advanced DQN: Double & Dueling

## The Problem with Standard DQN

Standard DQN tends to **overestimate** Q-values. Why?

The max operator in the target:
$\max_{a'} Q(s', a')$

If Q-values have noise (they always do during learning), max picks the noisiest one!

## Double DQN: Fixing Overestimation

**Key Idea:** Separate action selection from action evaluation.

**Standard DQN:**
- Use target network to both SELECT and EVALUATE the best action

**Double DQN:**
- Use **online network** to SELECT the best action
- Use **target network** to EVALUATE that action

$Q_{\text{target}} = r + \gamma Q_{\text{target}}(s', \arg\max_{a'} Q_{\text{online}}(s', a'))$

This reduces overestimation because the selection and evaluation use different networks!

In [None]:
class DoubleDQNAgent(DQNAgent):
    """
    Double DQN - uses online network to select actions,
    target network to evaluate them.
    
    Only the learn() method changes!
    """
    
    def learn(self, batch_size=64):
        if len(self.buffer) < batch_size:
            return 0
        
        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
        states = states.to(device)
        actions = actions.to(device)
        rewards = rewards.to(device)
        next_states = next_states.to(device)
        dones = dones.to(device)
        
        # Current Q values
        current_q = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Double DQN: use online network to SELECT action, target to EVALUATE
        with torch.no_grad():
            # Step 1: Online network selects best action
            best_actions = self.q_network(next_states).argmax(dim=1, keepdim=True)
            # Step 2: Target network evaluates that action
            next_q = self.target_network(next_states).gather(1, best_actions).squeeze(1)
            target_q = rewards + self.gamma * next_q * (1 - dones)
        
        loss = F.mse_loss(current_q, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

# Train Double DQN
env = GridWorld()
double_agent = DoubleDQNAgent(env.n_states, env.n_actions, hidden_size=64, lr=1e-3)
double_rewards, _ = train_dqn(env, double_agent, n_episodes=300)

print(f"\nDouble DQN final avg reward: {np.mean(double_rewards[-50:]):.2f}")

## Dueling DQN: Separating Value and Advantage

**Key Insight:** Not all states require knowing the value of each action.

Sometimes it's obvious that all actions are bad (or good) regardless of which one you pick.

**Dueling Architecture:**

Split Q into two streams:
- **Value stream V(s):** How good is this state overall?
- **Advantage stream A(s,a):** How much better is action a than average?

Then combine:
$Q(s,a) = V(s) + A(s,a) - \frac{1}{|A|}\sum_{a'} A(s,a')$

The subtraction ensures identifiability (otherwise V and A could shift arbitrarily).

```
         ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
         ‚îÇ   Shared    ‚îÇ
State ‚îÄ‚îÄ>‚îÇ   Layers    ‚îÇ
         ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                ‚îÇ
         ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
         ‚îÇ             ‚îÇ
    ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îê   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îê
    ‚îÇ Value   ‚îÇ   ‚îÇAdvantage‚îÇ
    ‚îÇ Stream  ‚îÇ   ‚îÇ Stream  ‚îÇ
    ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îò   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îò
         ‚îÇ             ‚îÇ
         ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                ‚îÇ
         Q = V + (A - mean(A))
```

In [None]:
class DuelingDQN(nn.Module):
    """
    Dueling DQN Architecture.
    
    Separates Q into Value and Advantage streams.
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64):
        super(DuelingDQN, self).__init__()
        
        # Shared feature extractor
        self.shared = nn.Sequential(
            nn.Linear(n_states, hidden_size),
            nn.ReLU()
        )
        
        # Value stream: V(s)
        self.value_stream = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Linear(hidden_size // 2, 1)
        )
        
        # Advantage stream: A(s,a)
        self.advantage_stream = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Linear(hidden_size // 2, n_actions)
        )
    
    def forward(self, x):
        features = self.shared(x)
        
        # Value: single number for the state
        value = self.value_stream(features)
        
        # Advantage: one number per action
        advantage = self.advantage_stream(features)
        
        # Combine: Q = V + (A - mean(A))
        q_values = value + (advantage - advantage.mean(dim=1, keepdim=True))
        return q_values

# Test Dueling DQN
dueling_net = DuelingDQN(16, 4)
test_state = torch.randn(1, 16)
q_values = dueling_net(test_state)
print(f"Dueling DQN output shape: {q_values.shape}")
print(f"Q-values: {q_values.detach().numpy().round(3)}")
print("\nThe architecture separates 'how good is this state' from 'which action is best'")

## Comparison: DQN Variants

| Variant | Key Innovation | Benefit |
|---------|---------------|--------|
| **DQN** | Replay + Target Network | Stable training |
| **Double DQN** | Separate selection/evaluation | Reduces overestimation |
| **Dueling DQN** | V/A decomposition | Better state evaluation |
| **Rainbow** | All of the above + more | State-of-the-art |

In practice, these improvements are often combined!

---
<a id='section7'></a>
# 7. Policy Gradient Methods (REINFORCE)

## A Different Approach: Learning Policies Directly

So far, we've learned **value functions** (Q-values) and derived policies from them.

**Policy Gradient methods** take a different approach: learn the **policy directly**!

**Value-Based (DQN):**
```
Learn Q(s,a) ‚Üí Policy: pick action with highest Q
```

**Policy-Based (REINFORCE):**
```
Learn œÄ(a|s) directly ‚Üí Policy outputs action probabilities
```

## Why Policy Gradients?

**Advantages:**
1. **Continuous actions:** Can output any action, not just discrete choices
2. **Stochastic policies:** Can learn to randomize (useful in games)
3. **Simpler:** No need for max operation, target networks, etc.

**Disadvantages:**
1. **High variance:** Learning can be noisy
2. **Sample inefficient:** Need lots of data
3. **Local optima:** Can get stuck

## The Policy Gradient Theorem

**Goal:** Maximize expected return $J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}[R(\tau)]$

**The Magic Formula:**

$\nabla J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot G_t\right]$

**In Plain English:**
- If an action led to high return ‚Üí increase its probability
- If an action led to low return ‚Üí decrease its probability
- The gradient tells us how to adjust the policy

## REINFORCE Algorithm

1. **Collect** a full episode using current policy
2. **Compute returns** $G_t$ for each step
3. **Update:** $\theta \leftarrow \theta + \alpha \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot G_t$

The name comes from: actions that lead to good outcomes get **reinforced**!

In [None]:
class PolicyNetwork(nn.Module):
    """
    Neural network that outputs action probabilities.
    
    Unlike DQN which outputs Q-values, this outputs a probability
    distribution over actions using softmax.
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(n_states, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, n_actions)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        # Softmax converts to probabilities (sum to 1)
        return F.softmax(self.fc3(x), dim=-1)

# Test policy network
policy_net = PolicyNetwork(16, 4)
test_state = torch.randn(1, 16)
probs = policy_net(test_state)
print(f"Policy output: {probs.detach().numpy().round(3)}")
print(f"Sum of probabilities: {probs.sum().item():.3f} (should be 1.0)")

In [None]:
class REINFORCE:
    """
    REINFORCE (Monte Carlo Policy Gradient).
    
    The simplest policy gradient algorithm:
    1. Collect full episode
    2. Compute returns
    3. Update policy to increase probability of good actions
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64, lr=1e-3, gamma=0.99):
        self.gamma = gamma
        self.policy = PolicyNetwork(n_states, n_actions, hidden_size).to(device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        
        # Episode storage
        self.log_probs = []  # log œÄ(a|s) for each step
        self.rewards = []    # rewards for each step
    
    def act(self, state):
        """Sample action from policy distribution"""
        state = state.unsqueeze(0).to(device)
        probs = self.policy(state)
        
        # Create categorical distribution and sample
        dist = Categorical(probs)
        action = dist.sample()
        
        # Store log probability for later
        self.log_probs.append(dist.log_prob(action))
        
        return action.item()
    
    def store_reward(self, reward):
        """Store reward from environment"""
        self.rewards.append(reward)
    
    def learn(self):
        """
        Update policy after episode ends.
        
        Loss = -sum(log_prob * return)
        Negative because we want to MAXIMIZE return
        """
        # Calculate returns (backwards from end of episode)
        returns = []
        G = 0
        for r in reversed(self.rewards):
            G = r + self.gamma * G
            returns.insert(0, G)
        
        returns = torch.tensor(returns, dtype=torch.float32).to(device)
        
        # Normalize returns (reduces variance, helps learning)
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        
        # Policy gradient loss
        loss = 0
        for log_prob, G in zip(self.log_probs, returns):
            # Negative because optimizer minimizes, but we want to maximize
            loss -= log_prob * G
        
        # Backpropagation
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Clear episode data
        self.log_probs = []
        self.rewards = []
        
        return loss.item()

# Create REINFORCE agent
env = GridWorld()
reinforce_agent = REINFORCE(env.n_states, env.n_actions, hidden_size=64, lr=1e-3)
print("REINFORCE Agent created!")
print(f"Policy Network: {reinforce_agent.policy}")

In [None]:
def train_reinforce(env, agent, n_episodes=1000):
    """Train REINFORCE agent"""
    rewards_history = []
    
    for episode in range(n_episodes):
        state = env.reset()
        state_tensor = env.get_state_tensor(state)
        total_reward = 0
        
        # Collect full episode
        for _ in range(100):
            action = agent.act(state_tensor)
            next_state, reward, done = env.step(action)
            agent.store_reward(reward)
            total_reward += reward
            state_tensor = env.get_state_tensor(next_state)
            if done:
                break
        
        # Learn from episode (only after it's complete!)
        agent.learn()
        rewards_history.append(total_reward)
        
        if (episode + 1) % 100 == 0:
            avg = np.mean(rewards_history[-100:])
            print(f"Episode {episode+1}: avg_reward={avg:.2f}")
    
    return rewards_history

# Train REINFORCE
env = GridWorld()
reinforce_agent = REINFORCE(env.n_states, env.n_actions, hidden_size=64, lr=1e-3)
reinforce_rewards = train_reinforce(env, reinforce_agent, n_episodes=1000)

print(f"\nFinal average reward: {np.mean(reinforce_rewards[-100:]):.2f}")

In [None]:
# Plot REINFORCE results
plt.figure(figsize=(10, 4))
window = 50
smoothed = np.convolve(reinforce_rewards, np.ones(window)/window, mode='valid')
plt.plot(smoothed)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('REINFORCE on GridWorld')
plt.grid(True, alpha=0.3)
plt.show()

# Show learned policy
print("\nREINFORCE Learned Policy:")
symbols = ['‚Üë', '‚Üí', '‚Üì', '‚Üê']
for i in range(env.n_states):
    if i == env.goal:
        print('G', end=' ')
    else:
        state_tensor = env.get_state_tensor(i)
        with torch.no_grad():
            probs = reinforce_agent.policy(state_tensor.unsqueeze(0).to(device))
            action = probs.argmax().item()
        print(symbols[action], end=' ')
    if (i + 1) % env.size == 0:
        print()

---
<a id='section8'></a>
# 8. Actor-Critic Methods

## The Problem with REINFORCE

REINFORCE has **high variance** because:
- We use the full episode return $G_t$
- Returns can vary wildly between episodes
- This makes learning slow and unstable

**Example:** Imagine two episodes with the same good action:
- Episode 1: Good action ‚Üí lucky outcomes ‚Üí G = 100
- Episode 2: Good action ‚Üí unlucky outcomes ‚Üí G = 10

The same action gets very different credit!

## The Solution: Use a Critic

**Key Idea:** Instead of using raw returns, use a **baseline** to reduce variance.

**Actor-Critic Architecture:**
- **Actor**: Policy network œÄ(a|s) - decides what to do
- **Critic**: Value network V(s) - evaluates how good states are

**The Advantage Function:**

$A(s,a) = Q(s,a) - V(s)$

"How much better is this action than average?"

We can estimate this with the **TD error**:

$A(s,a) \approx r + \gamma V(s') - V(s)$

**Why does this help?**
- If action is better than expected: A > 0 ‚Üí increase probability
- If action is worse than expected: A < 0 ‚Üí decrease probability
- The baseline V(s) removes the "luck" factor!

In [None]:
class ActorCritic(nn.Module):
    """
    Actor-Critic network with shared feature layers.
    
    Outputs both:
    - Policy (action probabilities)
    - Value (state value estimate)
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64):
        super(ActorCritic, self).__init__()
        
        # Shared feature extractor
        self.shared = nn.Sequential(
            nn.Linear(n_states, hidden_size),
            nn.ReLU()
        )
        
        # Actor head: outputs action probabilities
        self.actor = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions),
            nn.Softmax(dim=-1)
        )
        
        # Critic head: outputs state value
        self.critic = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, x):
        features = self.shared(x)
        policy = self.actor(features)
        value = self.critic(features)
        return policy, value
    
    def act(self, state):
        policy, value = self.forward(state)
        dist = Categorical(policy)
        action = dist.sample()
        return action.item(), dist.log_prob(action), value

In [None]:
class A2CAgent:
    """Advantage Actor-Critic Agent"""
    
    def __init__(self, n_states, n_actions, hidden_size=64, lr=1e-3, gamma=0.99):
        self.gamma = gamma
        self.network = ActorCritic(n_states, n_actions, hidden_size).to(device)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
    
    def act(self, state):
        state = state.unsqueeze(0).to(device)
        action, log_prob, value = self.network.act(state)
        return action, log_prob, value.squeeze()
    
    def learn(self, log_prob, value, reward, next_value, done):
        # TD error (advantage estimate)
        target = reward + (0 if done else self.gamma * next_value.item())
        advantage = target - value.item()
        
        # Actor loss (policy gradient with advantage)
        actor_loss = -log_prob * advantage
        
        # Critic loss (value function error)
        critic_loss = F.mse_loss(value, torch.tensor([target]).to(device))
        
        # Combined loss
        loss = actor_loss + 0.5 * critic_loss
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

In [None]:
def train_a2c(env, agent, n_episodes=1000):
    """Train A2C agent (online, step-by-step)"""
    rewards_history = []
    
    for episode in range(n_episodes):
        state = env.reset()
        state_tensor = env.get_state_tensor(state)
        total_reward = 0
        
        for _ in range(100):
            action, log_prob, value = agent.act(state_tensor)
            next_state, reward, done = env.step(action)
            next_state_tensor = env.get_state_tensor(next_state)
            
            # Get next value for TD target
            with torch.no_grad():
                _, next_value = agent.network(next_state_tensor.unsqueeze(0).to(device))
                next_value = next_value.squeeze()
            
            # Learn from this step
            agent.learn(log_prob, value, reward, next_value, done)
            
            total_reward += reward
            state_tensor = next_state_tensor
            
            if done:
                break
        
        rewards_history.append(total_reward)
        
        if (episode + 1) % 100 == 0:
            avg = np.mean(rewards_history[-100:])
            print(f"Episode {episode+1}: avg_reward={avg:.2f}")
    
    return rewards_history

# Train A2C
env = GridWorld()
a2c_agent = A2CAgent(env.n_states, env.n_actions, hidden_size=64, lr=1e-3)
a2c_rewards = train_a2c(env, a2c_agent, n_episodes=1000)

print(f"\nFinal average reward: {np.mean(a2c_rewards[-100:]):.2f}")

In [None]:
# Compare REINFORCE vs A2C
plt.figure(figsize=(10, 4))
window = 50

reinforce_smooth = np.convolve(reinforce_rewards, np.ones(window)/window, mode='valid')
a2c_smooth = np.convolve(a2c_rewards, np.ones(window)/window, mode='valid')

plt.plot(reinforce_smooth, label='REINFORCE', alpha=0.8)
plt.plot(a2c_smooth, label='A2C', alpha=0.8)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('REINFORCE vs Actor-Critic')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print("A2C typically learns faster due to lower variance!")

---
<a id='section9'></a>
# 9. Proximal Policy Optimization (PPO)

## The Problem with Vanilla Policy Gradients

Large policy updates can **destroy** learning:
- If we update too much, the policy can become terrible
- Then we collect bad data, which makes learning worse
- This creates a death spiral!

## PPO's Solution: Constrained Updates

**Key Idea:** Keep the new policy "close" to the old policy.

**How?** Use a clipped objective that prevents too-large updates.

**The PPO Clipped Objective:**

$L^{CLIP}(\theta) = \mathbb{E}\left[\min\left(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t\right)\right]$

Where:
- $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}$ is the probability ratio
- $\epsilon$ is typically 0.2 (allows 20% change)
- $A_t$ is the advantage

**In Plain English:**
- If the new policy is too different from the old one, clip the objective
- This prevents the policy from changing too drastically
- Result: stable, reliable learning!

## Why PPO is Popular

1. **Simple:** Easy to implement (compared to TRPO)
2. **Stable:** Doesn't blow up during training
3. **Effective:** Works well on many tasks
4. **Scalable:** Used by OpenAI for ChatGPT RLHF!

In [None]:
class PPOMemory:
    """Storage for PPO trajectories"""
    
    def __init__(self):
        self.states = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.log_probs = []
        self.values = []
    
    def store(self, state, action, reward, done, log_prob, value):
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)
        self.dones.append(done)
        self.log_probs.append(log_prob)
        self.values.append(value)
    
    def clear(self):
        self.states = []
        self.actions = []
        self.rewards = []
        self.dones = []
        self.log_probs = []
        self.values = []
    
    def get_batch(self):
        return (
            torch.stack(self.states),
            torch.tensor(self.actions),
            torch.tensor(self.rewards, dtype=torch.float32),
            torch.tensor(self.dones, dtype=torch.float32),
            torch.stack(self.log_probs),
            torch.stack(self.values).squeeze()
        )

In [None]:
class PPOAgent:
    """
    Proximal Policy Optimization Agent.
    
    Key features:
    1. Clipped objective for stable updates
    2. Multiple epochs of updates per batch
    3. GAE for advantage estimation
    """
    
    def __init__(self, n_states, n_actions, hidden_size=64, lr=3e-4, 
                 gamma=0.99, gae_lambda=0.95, clip_epsilon=0.2, 
                 value_coef=0.5, entropy_coef=0.01):
        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        
        self.network = ActorCritic(n_states, n_actions, hidden_size).to(device)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
        self.memory = PPOMemory()
    
    def act(self, state):
        state = state.unsqueeze(0).to(device)
        with torch.no_grad():
            policy, value = self.network(state)
        
        dist = Categorical(policy)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        
        return action.item(), log_prob, value.squeeze()
    
    def store(self, state, action, reward, done, log_prob, value):
        self.memory.store(state, action, reward, done, log_prob, value)
    
    def compute_gae(self, rewards, values, dones, next_value):
        """Compute Generalized Advantage Estimation"""
        advantages = []
        gae = 0
        
        values = torch.cat([values, next_value.unsqueeze(0)])
        
        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t+1] * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * gae
            advantages.insert(0, gae)
        
        return torch.tensor(advantages, dtype=torch.float32)
    
    def learn(self, next_value, n_epochs=4):
        states, actions, rewards, dones, old_log_probs, values = self.memory.get_batch()
        
        # Compute advantages
        advantages = self.compute_gae(rewards, values, dones, next_value)
        returns = advantages + values
        
        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # Move to device
        states = states.to(device)
        actions = actions.to(device)
        old_log_probs = old_log_probs.to(device)
        advantages = advantages.to(device)
        returns = returns.to(device)
        
        # PPO update for multiple epochs
        for _ in range(n_epochs):
            # Get current policy
            policy, values = self.network(states)
            dist = Categorical(policy)
            new_log_probs = dist.log_prob(actions)
            entropy = dist.entropy().mean()
            
            # Probability ratio
            ratio = torch.exp(new_log_probs - old_log_probs.squeeze())
            
            # Clipped surrogate objective
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            
            # Value loss
            critic_loss = F.mse_loss(values.squeeze(), returns)
            
            # Total loss
            loss = actor_loss + self.value_coef * critic_loss - self.entropy_coef * entropy
            
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.network.parameters(), 0.5)
            self.optimizer.step()
        
        self.memory.clear()
        return loss.item()

In [None]:
def train_ppo(env, agent, n_episodes=1000, update_freq=20):
    """Train PPO agent"""
    rewards_history = []
    steps = 0
    
    for episode in range(n_episodes):
        state = env.reset()
        state_tensor = env.get_state_tensor(state)
        total_reward = 0
        
        for _ in range(100):
            action, log_prob, value = agent.act(state_tensor)
            next_state, reward, done = env.step(action)
            next_state_tensor = env.get_state_tensor(next_state)
            
            agent.store(state_tensor, action, reward, done, log_prob, value)
            steps += 1
            
            total_reward += reward
            state_tensor = next_state_tensor
            
            # Update every update_freq steps
            if steps % update_freq == 0:
                with torch.no_grad():
                    _, next_value = agent.network(next_state_tensor.unsqueeze(0).to(device))
                agent.learn(next_value.squeeze())
            
            if done:
                break
        
        rewards_history.append(total_reward)
        
        if (episode + 1) % 100 == 0:
            avg = np.mean(rewards_history[-100:])
            print(f"Episode {episode+1}: avg_reward={avg:.2f}")
    
    return rewards_history

# Train PPO
env = GridWorld()
ppo_agent = PPOAgent(env.n_states, env.n_actions, hidden_size=64, lr=3e-4)
ppo_rewards = train_ppo(env, ppo_agent, n_episodes=1000)

print(f"\nFinal average reward: {np.mean(ppo_rewards[-100:]):.2f}")

In [None]:
# Compare all policy gradient methods
plt.figure(figsize=(12, 5))
window = 50

methods = [
    ('REINFORCE', reinforce_rewards),
    ('A2C', a2c_rewards),
    ('PPO', ppo_rewards)
]

for name, rewards in methods:
    smoothed = np.convolve(rewards, np.ones(window)/window, mode='valid')
    plt.plot(smoothed, label=name, alpha=0.8)

plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Policy Gradient Methods Comparison')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

---
<a id='section10'></a>
# 10. Summary & Next Steps

## What We Covered

| Method | Type | Key Idea | PyTorch Component |
|--------|------|----------|------------------|
| **Q-Learning** | Value-based | Learn Q(s,a), act greedily | `torch.zeros` for Q-table |
| **DQN** | Deep Value-based | Neural network Q-function | `nn.Module`, replay buffer |
| **Double DQN** | Deep Value-based | Reduce overestimation | Two networks |
| **Dueling DQN** | Deep Value-based | Separate V(s) and A(s,a) | Split architecture |
| **REINFORCE** | Policy Gradient | Direct policy optimization | `Categorical` distribution |
| **Actor-Critic** | Hybrid | Policy + value function | Shared network |
| **PPO** | Policy Gradient | Clipped objective | Ratio clipping |

## Key PyTorch Patterns for RL

```python
# 1. Neural Network Definition
class Network(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(...)
    
    def forward(self, x):
        return self.layers(x)

# 2. Training Loop
optimizer = optim.Adam(network.parameters(), lr=1e-3)
loss = F.mse_loss(predicted, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()

# 3. Action Sampling (Policy Gradients)
dist = Categorical(policy_probs)
action = dist.sample()
log_prob = dist.log_prob(action)
```

## Next Steps

1. **Try Gymnasium environments**: `pip install gymnasium`
2. **Continuous actions**: Use Gaussian policies (SAC, TD3)
3. **Advanced algorithms**: Rainbow DQN, SAC, TD3
4. **Multi-agent RL**: MARL, self-play
5. **Model-based RL**: World models, MuZero
6. **RLHF**: Reinforcement Learning from Human Feedback (used in ChatGPT!)

In [None]:
print("="*60)
print("üéì Reinforcement Learning with PyTorch: Complete!")
print("="*60)
print("\nAlgorithms implemented:")
print("  ‚úì Multi-Armed Bandits (Œµ-greedy, UCB)")
print("  ‚úì Tabular Q-Learning")
print("  ‚úì SARSA")
print("  ‚úì Deep Q-Network (DQN)")
print("  ‚úì Double DQN")
print("  ‚úì Dueling DQN")
print("  ‚úì REINFORCE")
print("  ‚úì Actor-Critic (A2C)")
print("  ‚úì Proximal Policy Optimization (PPO)")
print("\nüî• All using PyTorch!")
print("\nüìö Key concepts covered:")
print("  ‚Ä¢ Exploration vs Exploitation")
print("  ‚Ä¢ Markov Decision Processes")
print("  ‚Ä¢ Value Functions & Bellman Equations")
print("  ‚Ä¢ Temporal Difference Learning")
print("  ‚Ä¢ Experience Replay & Target Networks")
print("  ‚Ä¢ Policy Gradients & Advantage Functions")