# Chapter 18: Reinforcement Learning
## Comprehensive Theory, Implementation, and Analysis

**Based on "Hands-On Machine Learning" by Aurélien Géron**

---

### Table of Contents
1. [Introduction to Reinforcement Learning](#introduction)
2. [Policy Search](#policy-search)
3. [OpenAI Gym Environment](#openai-gym)
4. [Neural Network Policies](#neural-policies)
5. [Credit Assignment Problem](#credit-assignment)
6. [Policy Gradients (REINFORCE)](#policy-gradients)
7. [Markov Decision Processes](#mdp)
8. [Temporal Difference Learning](#td-learning)
9. [Q-Learning](#q-learning)
10. [Deep Q-Learning (DQN)](#deep-q-learning)
11. [DQN Variants](#dqn-variants)
12. [TF-Agents Library](#tf-agents)
13. [Exercise Solutions](#exercises)

---

## Setup and Installation

First, let's install all necessary dependencies for our Reinforcement Learning experiments.

In [None]:
# Install required packages
!pip install --upgrade pip
!pip install tensorflow==2.13.0
!pip install gym==0.21.0
!pip install 'gym[atari]'
!pip install tf-agents
!pip install matplotlib
!pip install numpy
!pip install pandas
!pip install imageio
!pip install pillow

# For rendering environments in Colab
!apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install pyvirtualdisplay > /dev/null 2>&1

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import gym
from collections import deque
import random
import warnings
warnings.filterwarnings('ignore')

# For rendering in Colab
from pyvirtualdisplay import Display
import matplotlib.animation as animation
from IPython.display import HTML

# Set up virtual display for rendering
display = Display(visible=0, size=(1400, 900))
display.start()

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
random.seed(42)

print("TensorFlow version:", tf.__version__)
print("Gym version:", gym.__version__)
print("GPU Available:", tf.config.list_physical_devices('GPU'))

# 1. Introduction to Reinforcement Learning

## Theoretical Foundation

**Reinforcement Learning (RL)** is a paradigm of machine learning where an agent learns to make decisions by interacting with an environment. Unlike supervised learning, where we have labeled examples, or unsupervised learning, where we find patterns in unlabeled data, RL learns through trial and error, receiving rewards or penalties for actions.

### Key Components of RL:

1. **Agent**: The learner or decision maker
2. **Environment**: Everything the agent interacts with
3. **State (s)**: Current situation of the agent
4. **Action (a)**: Choice made by the agent
5. **Reward (r)**: Feedback from the environment
6. **Policy (π)**: Strategy used by the agent to determine actions

### Mathematical Framework:

At each time step t:
- Agent observes state $s_t$
- Takes action $a_t$ according to policy $π(a_t|s_t)$
- Receives reward $r_{t+1}$ and new state $s_{t+1}$

**Objective**: Maximize expected cumulative reward:
$$G_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k+1}$$

Where $\gamma \in [0,1]$ is the discount factor.

### Types of RL Problems:

1. **Episodic**: Tasks with clear beginning and end (e.g., games)
2. **Continuing**: Ongoing tasks without natural termination
3. **Discrete**: Finite action spaces
4. **Continuous**: Infinite action spaces

In [None]:
# Demonstration: Simple RL Environment
class SimpleEnvironment:
    """
    A simple grid world environment for demonstration.
    Agent starts at (0,0) and tries to reach goal at (2,2).
    """
    def __init__(self):
        self.grid_size = 3
        self.reset()

    def reset(self):
        """Reset environment to initial state"""
        self.agent_pos = [0, 0]
        self.goal_pos = [2, 2]
        return self.get_state()

    def get_state(self):
        """Return current state as tuple"""
        return tuple(self.agent_pos)

    def step(self, action):
        """
        Take action and return (new_state, reward, done)
        Actions: 0=up, 1=right, 2=down, 3=left
        """
        # Define action effects
        actions = [[-1, 0], [0, 1], [1, 0], [0, -1]]  # up, right, down, left

        # Apply action
        new_pos = [
            self.agent_pos[0] + actions[action][0],
            self.agent_pos[1] + actions[action][1]
        ]

        # Check boundaries
        new_pos[0] = max(0, min(self.grid_size - 1, new_pos[0]))
        new_pos[1] = max(0, min(self.grid_size - 1, new_pos[1]))

        self.agent_pos = new_pos

        # Calculate reward
        if self.agent_pos == self.goal_pos:
            reward = 10  # Goal reached
            done = True
        else:
            reward = -1  # Step penalty
            done = False

        return self.get_state(), reward, done

    def render(self):
        """Visualize current state"""
        grid = np.zeros((self.grid_size, self.grid_size))
        grid[self.agent_pos[0], self.agent_pos[1]] = 1  # Agent
        grid[self.goal_pos[0], self.goal_pos[1]] = 2   # Goal
        return grid

# Test the environment
env = SimpleEnvironment()
print("Initial state:", env.get_state())
print("Grid visualization:")
print(env.render())
print("\nTaking action 1 (right):")
state, reward, done = env.step(1)
print(f"New state: {state}, Reward: {reward}, Done: {done}")
print("Grid visualization:")
print(env.render())

# 2. Policy Search

## Theoretical Foundation

**Policy Search** is a family of RL algorithms that directly optimize the policy without explicitly computing value functions. The policy $π(a|s; θ)$ is parameterized by parameters $θ$.

### Types of Policies:

1. **Deterministic Policy**: $a = π(s)$
2. **Stochastic Policy**: $π(a|s) = P(a|s)$

### Policy Search Methods:

1. **Brute Force**: Try different parameter combinations
2. **Genetic Algorithms**: Evolve population of policies
3. **Policy Gradients**: Use gradient ascent to optimize policy

### Mathematical Formulation:

**Objective Function**: Expected return
$$J(θ) = E_{τ∼π_θ}[G(τ)]$$

Where $τ$ is a trajectory and $G(τ)$ is its return.

**Policy Gradient Theorem**:
$$∇_θ J(θ) = E_{τ∼π_θ}[∇_θ \log π_θ(a_t|s_t) G_t]$$

This allows us to estimate gradients using sample trajectories.

In [None]:
# Demonstration: Simple Policy Search
class RandomPolicy:
    """
    A simple random policy for the grid world.
    """
    def __init__(self, action_space=4):
        self.action_space = action_space

    def get_action(self, state):
        """Return random action"""
        return np.random.randint(self.action_space)

class ParameterizedPolicy:
    """
    A simple parameterized policy using action probabilities.
    """
    def __init__(self, action_space=4):
        self.action_space = action_space
        # Initialize parameters (action probabilities)
        self.params = np.ones(action_space) / action_space

    def get_action(self, state):
        """Sample action based on current parameters"""
        return np.random.choice(self.action_space, p=self.params)

    def update_params(self, direction, step_size=0.1):
        """Update parameters in given direction"""
        self.params += step_size * direction
        # Ensure probabilities sum to 1
        self.params = np.abs(self.params)
        self.params /= np.sum(self.params)

def evaluate_policy(policy, env, num_episodes=100):
    """
    Evaluate policy performance over multiple episodes.
    """
    total_rewards = []

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        steps = 0

        while steps < 50:  # Max steps to prevent infinite loops
            action = policy.get_action(state)
            state, reward, done = env.step(action)
            episode_reward += reward
            steps += 1

            if done:
                break

        total_rewards.append(episode_reward)

    return np.mean(total_rewards), np.std(total_rewards)

# Compare random vs parameterized policy
env = SimpleEnvironment()

# Random policy
random_policy = RandomPolicy()
random_mean, random_std = evaluate_policy(random_policy, env)

# Parameterized policy (initially random)
param_policy = ParameterizedPolicy()
param_mean, param_std = evaluate_policy(param_policy, env)

print(f"Random Policy - Mean Reward: {random_mean:.2f} ± {random_std:.2f}")
print(f"Parameterized Policy - Mean Reward: {param_mean:.2f} ± {param_std:.2f}")
print(f"\nInitial policy parameters: {param_policy.params}")

# 3. OpenAI Gym Environment

## Theoretical Foundation

**OpenAI Gym** provides a standardized interface for RL environments. It offers:

### Key Components:

1. **Observation Space**: Defines the format of observations
2. **Action Space**: Defines available actions
3. **Reward Function**: Defines the reward structure
4. **Episode Termination**: Defines when episodes end

### Standard Interface:

```python
env = gym.make('EnvName-v0')
observation = env.reset()
for t in range(1000):
    action = env.action_space.sample()  # Random action
    observation, reward, done, info = env.step(action)
    if done:
        break
```

### CartPole Environment:

**State Space**: 4-dimensional continuous
- Position of cart: $x \in (-4.8, 4.8)$
- Velocity of cart: $\dot{x} \in (-∞, ∞)$
- Angle of pole: $θ \in (-24°, 24°)$
- Angular velocity: $\dot{θ} \in (-∞, ∞)$

**Action Space**: Discrete
- 0: Push cart to the left
- 1: Push cart to the right

**Reward**: +1 for every step the pole remains upright

**Termination**: Pole angle > 15° or cart position > 2.4

In [None]:
# Comprehensive OpenAI Gym demonstration
import gym

# Create CartPole environment
env = gym.make('CartPole-v1')

print("Environment: CartPole-v1")
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"Action space sample: {env.action_space.sample()}")

# Analyze observation space
obs = env.reset()
print(f"\nInitial observation: {obs}")
print("Observation components:")
print(f"  Cart Position: {obs[0]:.4f}")
print(f"  Cart Velocity: {obs[1]:.4f}")
print(f"  Pole Angle: {obs[2]:.4f} rad ({np.degrees(obs[2]):.2f}°)")
print(f"  Pole Angular Velocity: {obs[3]:.4f}")

# Run random policy
def run_random_episode(env, render=False):
    """
    Run one episode with random actions.
    """
    obs = env.reset()
    total_reward = 0
    steps = 0
    observations = [obs]

    while True:
        if render:
            env.render()

        action = env.action_space.sample()
        obs, reward, done, info = env.step(action)
        observations.append(obs)
        total_reward += reward
        steps += 1

        if done:
            break

    if render:
        env.close()

    return total_reward, steps, observations

# Run multiple random episodes and analyze
random_scores = []
random_steps = []

for i in range(100):
    score, steps, _ = run_random_episode(env)
    random_scores.append(score)
    random_steps.append(steps)

print(f"\nRandom Policy Performance (100 episodes):")
print(f"Mean Score: {np.mean(random_scores):.2f} ± {np.std(random_scores):.2f}")
print(f"Max Score: {np.max(random_scores)}")
print(f"Min Score: {np.min(random_scores)}")
print(f"Mean Steps: {np.mean(random_steps):.2f} ± {np.std(random_steps):.2f}")

# Visualize score distribution
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.hist(random_scores, bins=20, alpha=0.7, edgecolor='black')
plt.xlabel('Episode Score')
plt.ylabel('Frequency')
plt.title('Random Policy Score Distribution')
plt.axvline(np.mean(random_scores), color='red', linestyle='--', label=f'Mean: {np.mean(random_scores):.1f}')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(random_scores[:50], alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Score')
plt.title('Random Policy Performance Over Time')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

env.close()

# 4. Neural Network Policies

## Theoretical Foundation

**Neural Network Policies** use deep learning to approximate the policy function. Instead of simple parameterized policies, we use neural networks to map states to action probabilities.

### Architecture Design:

For **discrete action spaces**:
$$π_θ(a|s) = \frac{\exp(f_θ(s)_a)}{\sum_{a'} \exp(f_θ(s)_{a'})}$$

Where $f_θ(s)$ is the neural network output (logits).

For **continuous action spaces**:
$$π_θ(a|s) = \mathcal{N}(μ_θ(s), σ_θ(s))$$

Where $μ_θ(s)$ and $σ_θ(s)$ are network outputs for mean and variance.

### Key Considerations:

1. **Exploration vs Exploitation**: Stochastic policies naturally explore
2. **Network Architecture**: Depends on state representation
3. **Output Layer**: Softmax for discrete, Gaussian for continuous
4. **Training Stability**: Requires careful hyperparameter tuning

In [None]:
# Neural Network Policy Implementation
class NeuralNetworkPolicy:
    """
    A neural network policy for CartPole environment.
    """
    def __init__(self, state_size=4, action_size=2, hidden_size=64):
        self.state_size = state_size
        self.action_size = action_size

        # Build neural network
        self.model = self._build_model(hidden_size)

    def _build_model(self, hidden_size):
        """
        Build the neural network architecture.
        """
        model = keras.Sequential([
            keras.layers.Dense(hidden_size, activation='relu', input_shape=(self.state_size,)),
            keras.layers.Dense(hidden_size, activation='relu'),
            keras.layers.Dense(self.action_size, activation='softmax')  # Probability distribution
        ])
        return model

    def get_action(self, state):
        """
        Get action probabilities and sample an action.
        """
        state = np.array([state])  # Add batch dimension
        action_probs = self.model.predict(state, verbose=0)[0]
        action = np.random.choice(self.action_size, p=action_probs)
        return action, action_probs

    def get_action_deterministic(self, state):
        """
        Get action deterministically (for evaluation).
        """
        state = np.array([state])
        action_probs = self.model.predict(state, verbose=0)[0]
        action = np.argmax(action_probs)
        return action, action_probs

# Create and test neural network policy
env = gym.make('CartPole-v1')
nn_policy = NeuralNetworkPolicy()

print("Neural Network Policy Architecture:")
nn_policy.model.summary()

# Test policy behavior
state = env.reset()
print(f"\nInitial state: {state}")

action, probs = nn_policy.get_action(state)
print(f"Action probabilities: {probs}")
print(f"Selected action: {action}")

# Analyze policy behavior over multiple states
states = []
actions = []
probabilities = []

for _ in range(100):
    state = env.reset()
    action, probs = nn_policy.get_action(state)
    states.append(state)
    actions.append(action)
    probabilities.append(probs)

states = np.array(states)
actions = np.array(actions)
probabilities = np.array(probabilities)

# Visualize policy behavior
plt.figure(figsize=(15, 10))

# Plot action probabilities vs cart position
plt.subplot(2, 3, 1)
plt.scatter(states[:, 0], probabilities[:, 0], alpha=0.6, label='Left (Action 0)')
plt.scatter(states[:, 0], probabilities[:, 1], alpha=0.6, label='Right (Action 1)')
plt.xlabel('Cart Position')
plt.ylabel('Action Probability')
plt.title('Action Probs vs Cart Position')
plt.legend()
plt.grid(True, alpha=0.3)

# Plot action probabilities vs pole angle
plt.subplot(2, 3, 2)
plt.scatter(states[:, 2], probabilities[:, 0], alpha=0.6, label='Left (Action 0)')
plt.scatter(states[:, 2], probabilities[:, 1], alpha=0.6, label='Right (Action 1)')
plt.xlabel('Pole Angle (rad)')
plt.ylabel('Action Probability')
plt.title('Action Probs vs Pole Angle')
plt.legend()
plt.grid(True, alpha=0.3)

# Action distribution
plt.subplot(2, 3, 3)
action_counts = np.bincount(actions)
plt.bar(['Left (0)', 'Right (1)'], action_counts)
plt.ylabel('Frequency')
plt.title('Action Distribution')

# State distribution
plt.subplot(2, 3, 4)
plt.hist(states[:, 0], bins=20, alpha=0.7, label='Cart Position')
plt.xlabel('Cart Position')
plt.ylabel('Frequency')
plt.title('Cart Position Distribution')

plt.subplot(2, 3, 5)
plt.hist(states[:, 2], bins=20, alpha=0.7, label='Pole Angle')
plt.xlabel('Pole Angle (rad)')
plt.ylabel('Frequency')
plt.title('Pole Angle Distribution')

# Probability distribution analysis
plt.subplot(2, 3, 6)
plt.hist(probabilities[:, 0], bins=20, alpha=0.7, label='P(Left)')
plt.hist(probabilities[:, 1], bins=20, alpha=0.7, label='P(Right)')
plt.xlabel('Probability')
plt.ylabel('Frequency')
plt.title('Action Probability Distribution')
plt.legend()

plt.tight_layout()
plt.show()

print(f"\nMean action probabilities:")
print(f"P(Left): {np.mean(probabilities[:, 0]):.3f}")
print(f"P(Right): {np.mean(probabilities[:, 1]):.3f}")

env.close()

# 5. Evaluating Actions: The Credit Assignment Problem

## Theoretical Foundation

The **Credit Assignment Problem** is fundamental in RL: when an agent receives a reward, which previous actions deserve credit (or blame)?

### Key Challenges:

1. **Temporal Credit Assignment**: Which actions in a sequence caused the outcome?
2. **Delayed Rewards**: Rewards often come long after the relevant actions
3. **Sparse Rewards**: Many environments provide infrequent feedback

### Solutions:

#### 1. Discounted Returns
$$G_t = \sum_{k=0}^{\infty} \gamma^k r_{t+k+1}$$

Where $\gamma \in [0,1]$ is the discount factor:
- $\gamma = 0$: Only immediate rewards matter
- $\gamma = 1$: All future rewards equally important
- $\gamma \in (0,1)$: Balance between immediate and future rewards

#### 2. Advantage Function
$$A(s,a) = Q(s,a) - V(s)$$

Measures how much better action $a$ is compared to the average.

#### 3. Temporal Difference Error
$$\delta_t = r_{t+1} + \gamma V(s_{t+1}) - V(s_t)$$

Measures prediction error for immediate learning.

### Baseline Subtraction:

To reduce variance, we subtract a baseline $b(s)$:
$$∇_θ J(θ) = E[∇_θ \log π_θ(a_t|s_t) (G_t - b(s_t))]$$

Common baselines:
- Mean return
- State value function $V(s)$
- Moving average

In [None]:
# Credit Assignment Problem Demonstration

def compute_returns(rewards, gamma=0.99):
    """
    Compute discounted returns for a trajectory.

    Args:
        rewards: List of rewards [r1, r2, ..., rT]
        gamma: Discount factor

    Returns:
        returns: List of discounted returns [G1, G2, ..., GT]
    """
    returns = []
    G = 0

    # Compute returns backwards
    for reward in reversed(rewards):
        G = reward + gamma * G
        returns.insert(0, G)

    return returns

def normalize_returns(returns):
    """
    Normalize returns to have zero mean and unit variance.
    """
    returns = np.array(returns)
    return (returns - np.mean(returns)) / (np.std(returns) + 1e-8)

# Example trajectory with different reward patterns
# Scenario 1: Immediate reward
rewards_immediate = [1, 0, 0, 0, 0]

# Scenario 2: Delayed reward
rewards_delayed = [0, 0, 0, 0, 1]

# Scenario 3: Mixed rewards
rewards_mixed = [0.1, -0.1, 0.2, -0.1, 0.5]

# Scenario 4: Sparse rewards
rewards_sparse = [0, 0, 1, 0, 0, 0, 0, 2, 0, 0]

scenarios = {
    'Immediate Reward': rewards_immediate,
    'Delayed Reward': rewards_delayed,
    'Mixed Rewards': rewards_mixed,
    'Sparse Rewards': rewards_sparse
}

# Analyze different discount factors
gammas = [0.0, 0.5, 0.9, 0.99]

plt.figure(figsize=(16, 12))

for i, (scenario_name, rewards) in enumerate(scenarios.items()):
    plt.subplot(2, 2, i+1)

    for gamma in gammas:
        returns = compute_returns(rewards, gamma)
        plt.plot(returns, marker='o', label=f'γ={gamma}', linewidth=2)

    plt.title(f'{scenario_name}\nRewards: {rewards}')
    plt.xlabel('Time Step')
    plt.ylabel('Return (G_t)')
    plt.legend()
    plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Demonstrate advantage computation
def demonstrate_advantage_computation():
    """
    Demonstrate how advantage computation helps with credit assignment.
    """
    # Simulate multiple episodes
    np.random.seed(42)
    n_episodes = 1000
    episode_returns = []

    for _ in range(n_episodes):
        # Random episode length
        length = np.random.randint(5, 20)
        # Random rewards (some positive, some negative)
        rewards = np.random.normal(0, 1, length)
        # Add occasional large reward
        if np.random.random() < 0.1:
            rewards[-1] += 10

        returns = compute_returns(rewards, gamma=0.99)
        episode_returns.extend(returns)

    # Compute baseline (mean return)
    baseline = np.mean(episode_returns)

    # Compute advantages
    advantages = np.array(episode_returns) - baseline

    print(f"Return Statistics:")
    print(f"  Mean: {np.mean(episode_returns):.3f}")
    print(f"  Std: {np.std(episode_returns):.3f}")
    print(f"  Min: {np.min(episode_returns):.3f}")
    print(f"  Max: {np.max(episode_returns):.3f}")

    print(f"\nAdvantage Statistics:")
    print(f"  Mean: {np.mean(advantages):.3f}")
    print(f"  Std: {np.std(advantages):.3f}")
    print(f"  Min: {np.min(advantages):.3f}")
    print(f"  Max: {np.max(advantages):.3f}")

    # Visualize
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 3, 1)
    plt.hist(episode_returns, bins=50, alpha=0.7, edgecolor='black')
    plt.axvline(baseline, color='red', linestyle='--', label=f'Baseline: {baseline:.2f}')
    plt.xlabel('Return')
    plt.ylabel('Frequency')
    plt.title('Return Distribution')
    plt.legend()

    plt.subplot(1, 3, 2)
    plt.hist(advantages, bins=50, alpha=0.7, edgecolor='black')
    plt.axvline(0, color='red', linestyle='--', label='Zero Advantage')
    plt.xlabel('Advantage')
    plt.ylabel('Frequency')
    plt.title('Advantage Distribution')
    plt.legend()

    plt.subplot(1, 3, 3)
    plt.scatter(episode_returns[:1000], advantages[:1000], alpha=0.5)
    plt.axhline(0, color='red', linestyle='--', alpha=0.5)
    plt.axvline(baseline, color='red', linestyle='--', alpha=0.5)
    plt.xlabel('Return')
    plt.ylabel('Advantage')
    plt.title('Returns vs Advantages')
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    return episode_returns, advantages, baseline

returns, advantages, baseline = demonstrate_advantage_computation()

# Show variance reduction effect
original_variance = np.var(returns)
advantage_variance = np.var(advantages)
variance_reduction = (original_variance - advantage_variance) / original_variance * 100

print(f"\nVariance Reduction Analysis:")
print(f"Original variance: {original_variance:.3f}")
print(f"Advantage variance: {advantage_variance:.3f}")
print(f"Variance reduction: {variance_reduction:.1f}%")

# 6. Policy Gradients (REINFORCE Algorithm)

## Theoretical Foundation

**REINFORCE** is a monte-carlo policy gradient algorithm that directly optimizes the policy using gradient ascent.

### Mathematical Derivation:

**Objective**: Maximize expected return
$$J(θ) = E_{τ∼π_θ}[G(τ)]$$

**Policy Gradient Theorem**:
$$∇_θ J(θ) = E_{τ∼π_θ}[∇_θ \log π_θ(a_t|s_t) G_t]$$

**REINFORCE Update Rule**:
$$θ_{t+1} = θ_t + α ∇_θ \log π_θ(a_t|s_t) G_t$$

### Algorithm Steps:

1. **Sample** trajectories using current policy $π_θ$
2. **Compute** returns $G_t$ for each time step
3. **Estimate** policy gradient using samples
4. **Update** policy parameters using gradient ascent

### Key Properties:

- **Unbiased**: Gradient estimates are unbiased
- **High Variance**: Monte Carlo estimates have high variance
- **Model-Free**: Doesn't require environment model
- **On-Policy**: Uses samples from current policy

### Variance Reduction Techniques:

1. **Baseline Subtraction**: $G_t - b(s_t)$
2. **Advantage Function**: $A(s_t, a_t) = Q(s_t, a_t) - V(s_t)$
3. **Causality**: Only use future rewards

In [None]:
# Complete REINFORCE Algorithm Implementation
class REINFORCEAgent:
    """
    REINFORCE algorithm implementation for CartPole.
    """
    def __init__(self, state_size=4, action_size=2, lr=0.01, gamma=0.99):
        self.state_size = state_size
        self.action_size = action_size
        self.lr = lr
        self.gamma = gamma

        # Build policy network
        self.policy_network = self._build_policy_network()
        self.optimizer = keras.optimizers.Adam(learning_rate=lr)

        # Storage for episode data
        self.reset_episode_data()

        # Training history
        self.episode_rewards = []
        self.episode_lengths = []
        self.losses = []

    def _build_policy_network(self):
        """
        Build the policy neural network.
        """
        model = keras.Sequential([
            keras.layers.Dense(128, activation='relu', input_shape=(self.state_size,)),
            keras.layers.Dense(128, activation='relu'),
            keras.layers.Dense(self.action_size, activation='softmax')
        ])
        return model

    def reset_episode_data(self):
        """
        Reset storage for new episode.
        """
        self.states = []
        self.actions = []
        self.rewards = []

    def get_action(self, state):
        """
        Sample action from policy and store data.
        """
        state = np.array([state])
        action_probs = self.policy_network(state)[0]
        action = np.random.choice(self.action_size, p=action_probs.numpy())

        # Store for training
        self.states.append(state[0])
        self.actions.append(action)

        return action

    def store_reward(self, reward):
        """
        Store reward for current episode.
        """
        self.rewards.append(reward)

    def compute_returns(self, rewards, gamma):
        """
        Compute discounted returns.
        """
        returns = []
        G = 0

        for reward in reversed(rewards):
            G = reward + gamma * G
            returns.insert(0, G)

        return returns

    def train_episode(self, use_baseline=True):
        """
        Train on collected episode data using REINFORCE.
        """
        # Compute returns
        returns = self.compute_returns(self.rewards, self.gamma)
        returns = np.array(returns, dtype=np.float32)

        # Baseline subtraction for variance reduction
        if use_baseline:
            baseline = np.mean(returns)
            advantages = returns - baseline
        else:
            advantages = returns

        # Normalize advantages
        if len(advantages) > 1:
            advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8)

        # Convert to tensors
        states = tf.convert_to_tensor(self.states, dtype=tf.float32)
        actions = tf.convert_to_tensor(self.actions, dtype=tf.int32)
        advantages = tf.convert_to_tensor(advantages, dtype=tf.float32)

        # Compute policy gradient and update
        with tf.GradientTape() as tape:
            # Forward pass
            action_probs = self.policy_network(states)

            # Compute log probabilities of taken actions
            action_one_hot = tf.one_hot(actions, self.action_size)
            log_probs = tf.reduce_sum(action_one_hot * tf.math.log(action_probs + 1e-8), axis=1)

            # Policy gradient loss (negative because we want to maximize)
            loss = -tf.reduce_mean(log_probs * advantages)

        # Compute gradients and update
        gradients = tape.gradient(loss, self.policy_network.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.policy_network.trainable_variables))

        # Store training metrics
        self.episode_rewards.append(sum(self.rewards))
        self.episode_lengths.append(len(self.rewards))
        self.losses.append(loss.numpy())

        return loss.numpy()

    def get_action_deterministic(self, state):
        """
        Get action deterministically for evaluation.
        """
        state = np.array([state])
        action_probs = self.policy_network(state)[0]
        return np.argmax(action_probs.numpy())

# Training function
def train_reinforce_agent(agent, env, num_episodes=1000, max_steps=500,
                         render_freq=None, eval_freq=100):
    """
    Train REINFORCE agent.
    """
    training_scores = []
    evaluation_scores = []

    for episode in range(num_episodes):
        # Reset for new episode
        agent.reset_episode_data()
        state = env.reset()
        total_reward = 0

        # Run episode
        for step in range(max_steps):
            action = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.store_reward(reward)

            state = next_state
            total_reward += reward

            if done:
                break

        # Train on episode
        loss = agent.train_episode()
        training_scores.append(total_reward)

        # Evaluation
        if episode % eval_freq == 0:
            eval_score = evaluate_policy(agent, env, num_episodes=5)
            evaluation_scores.append(eval_score)

            print(f"Episode {episode}: "
                  f"Training Score: {total_reward:.1f}, "
                  f"Eval Score: {eval_score:.1f}, "
                  f"Loss: {loss:.4f}")

    return training_scores, evaluation_scores

def evaluate_policy(agent, env, num_episodes=10, max_steps=500):
    """
    Evaluate agent policy deterministically.
    """
    total_rewards = []

    for _ in range(num_episodes):
        state = env.reset()
        episode_reward = 0

        for _ in range(max_steps):
            action = agent.get_action_deterministic(state)
            state, reward, done, _ = env.step(action)
            episode_reward += reward

            if done:
                break

        total_rewards.append(episode_reward)

    return np.mean(total_rewards)

# Create environment and agent
env = gym.make('CartPole-v1')
agent = REINFORCEAgent(lr=0.001, gamma=0.99)

print("REINFORCE Agent Architecture:")
agent.policy_network.summary()

# Train the agent
print("\nTraining REINFORCE Agent...")
training_scores, eval_scores = train_reinforce_agent(
    agent, env, num_episodes=500, eval_freq=50
)

env.close()

In [None]:
# Analyze REINFORCE training results
plt.figure(figsize=(15, 10))

# Training progress
plt.subplot(2, 3, 1)
window_size = 50
moving_avg = pd.Series(training_scores).rolling(window_size).mean()
plt.plot(training_scores, alpha=0.3, color='blue', label='Episode Score')
plt.plot(moving_avg, color='red', linewidth=2, label=f'{window_size}-Episode Average')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.title('Training Progress')
plt.legend()
plt.grid(True, alpha=0.3)

# Evaluation scores
plt.subplot(2, 3, 2)
eval_episodes = list(range(0, len(training_scores), 50))
plt.plot(eval_episodes, eval_scores, 'o-', color='green', linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Evaluation Score')
plt.title('Evaluation Progress')
plt.grid(True, alpha=0.3)

# Loss progression
plt.subplot(2, 3, 3)
loss_moving_avg = pd.Series(agent.losses).rolling(window_size).mean()
plt.plot(agent.losses, alpha=0.3, color='orange', label='Loss')
plt.plot(loss_moving_avg, color='red', linewidth=2, label=f'{window_size}-Episode Average')
plt.xlabel('Episode')
plt.ylabel('Policy Loss')
plt.title('Training Loss')
plt.legend()
plt.grid(True, alpha=0.3)

# Episode length progression
plt.subplot(2, 3, 4)
length_moving_avg = pd.Series(agent.episode_lengths).rolling(window_size).mean()
plt.plot(agent.episode_lengths, alpha=0.3, color='purple', label='Episode Length')
plt.plot(length_moving_avg, color='red', linewidth=2, label=f'{window_size}-Episode Average')
plt.xlabel('Episode')
plt.ylabel('Episode Length')
plt.title('Episode Length Progress')
plt.legend()
plt.grid(True, alpha=0.3)

# Score distribution over time
plt.subplot(2, 3, 5)
early_scores = training_scores[:100]
late_scores = training_scores[-100:]
plt.hist(early_scores, bins=20, alpha=0.5, label='Early Training (0-100)', color='red')
plt.hist(late_scores, bins=20, alpha=0.5, label='Late Training (-100:-1)', color='blue')
plt.xlabel('Score')
plt.ylabel('Frequency')
plt.title('Score Distribution Comparison')
plt.legend()

# Learning curve analysis
plt.subplot(2, 3, 6)
segment_size = 50
segments = len(training_scores) // segment_size
segment_means = []
segment_stds = []

for i in range(segments):
    start_idx = i * segment_size
    end_idx = (i + 1) * segment_size
    segment_scores = training_scores[start_idx:end_idx]
    segment_means.append(np.mean(segment_scores))
    segment_stds.append(np.std(segment_scores))

segment_episodes = [(i + 0.5) * segment_size for i in range(segments)]
plt.errorbar(segment_episodes, segment_means, yerr=segment_stds,
             fmt='o-', capsize=5, capthick=2)
plt.xlabel('Episode')
plt.ylabel('Mean Score ± Std')
plt.title('Learning Curve with Variance')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print summary statistics
print("Training Summary:")
print(f"Total Episodes: {len(training_scores)}")
print(f"Final 100 Episode Average: {np.mean(training_scores[-100:]):.2f} ± {np.std(training_scores[-100:]):.2f}")
print(f"Best Episode Score: {np.max(training_scores)}")
print(f"Final Evaluation Score: {eval_scores[-1]:.2f}")
print(f"Training Improvement: {np.mean(training_scores[-100:]) - np.mean(training_scores[:100]):.2f}")

# Test final policy
print("\nTesting Final Policy:")
env = gym.make('CartPole-v1')
final_score = evaluate_policy(agent, env, num_episodes=20)
print(f"Final Policy Average Score (20 episodes): {final_score:.2f}")
env.close()

# 7. Markov Decision Processes (MDPs)

## Theoretical Foundation

A **Markov Decision Process** is a mathematical framework for modeling decision-making problems in stochastic environments.

### MDP Components:

An MDP is defined by the tuple $(S, A, P, R, γ)$:

- **S**: Set of states
- **A**: Set of actions
- **P**: Transition probabilities $P(s'|s,a)$
- **R**: Reward function $R(s,a,s')$
- **γ**: Discount factor $γ ∈ [0,1]$

### Markov Property:

**Key Assumption**: The future is independent of the past given the present
$$P(s_{t+1}|s_t, a_t, s_{t-1}, a_{t-1}, ..., s_0, a_0) = P(s_{t+1}|s_t, a_t)$$

### Value Functions:

#### State Value Function:
$$V^π(s) = E_π[G_t | S_t = s] = E_π\left[\sum_{k=0}^{∞} γ^k R_{t+k+1} | S_t = s\right]$$

#### Action Value Function (Q-function):
$$Q^π(s,a) = E_π[G_t | S_t = s, A_t = a] = E_π\left[\sum_{k=0}^{∞} γ^k R_{t+k+1} | S_t = s, A_t = a\right]$$

### Bellman Equations:

#### Bellman Equation for $V^π$:
$$V^π(s) = \sum_a π(a|s) \sum_{s'} P(s'|s,a)[R(s,a,s') + γV^π(s')]$$

#### Bellman Equation for $Q^π$:
$$Q^π(s,a) = \sum_{s'} P(s'|s,a)[R(s,a,s') + γ\sum_{a'} π(a'|s')Q^π(s',a')]$$

### Optimality:

#### Optimal Value Functions:
$$V^*(s) = \max_π V^π(s)$$
$$Q^*(s,a) = \max_π Q^π(s,a)$$

#### Bellman Optimality Equations:
$$V^*(s) = \max_a \sum_{s'} P(s'|s,a)[R(s,a,s') + γV^*(s')]$$
$$Q^*(s,a) = \sum_{s'} P(s'|s,a)[R(s,a,s') + γ\max_{a'} Q^*(s',a')]$$

#### Optimal Policy:
$$π^*(s) = \arg\max_a Q^*(s,a)$$

In [None]:
# MDP Implementation and Value Iteration

class GridWorldMDP:
    """
    A grid world MDP for demonstrating value iteration and policy iteration.
    """
    def __init__(self, height=4, width=4, goal_states=None, obstacle_states=None):
        self.height = height
        self.width = width
        self.n_states = height * width
        self.n_actions = 4  # up, right, down, left

        # Define goal and obstacle states
        self.goal_states = goal_states or [(3, 3)]
        self.obstacle_states = obstacle_states or [(1, 1), (2, 2)]

        # Action mappings
        self.actions = {
            0: (-1, 0),  # up
            1: (0, 1),   # right
            2: (1, 0),   # down
            3: (0, -1)   # left
        }

        # Build transition probabilities and rewards
        self.build_mdp()

    def state_to_coords(self, state):
        """Convert state index to (row, col) coordinates."""
        return (state // self.width, state % self.width)

    def coords_to_state(self, row, col):
        """Convert (row, col) coordinates to state index."""
        return row * self.width + col

    def is_valid_state(self, row, col):
        """Check if coordinates represent a valid state."""
        return (0 <= row < self.height and
                0 <= col < self.width and
                (row, col) not in self.obstacle_states)

    def build_mdp(self):
        """
        Build transition probabilities and reward function.
        """
        # Initialize transition probabilities P(s'|s,a)
        self.P = np.zeros((self.n_states, self.n_actions, self.n_states))

        # Initialize rewards R(s,a,s')
        self.R = np.zeros((self.n_states, self.n_actions, self.n_states))

        for state in range(self.n_states):
            row, col = self.state_to_coords(state)

            # Skip obstacle states
            if (row, col) in self.obstacle_states:
                continue

            # Goal states are terminal
            if (row, col) in self.goal_states:
                for action in range(self.n_actions):
                    self.P[state, action, state] = 1.0
                    self.R[state, action, state] = 0.0
                continue

            for action in range(self.n_actions):
                # Intended action (probability 0.8)
                intended_dr, intended_dc = self.actions[action]
                intended_row = row + intended_dr
                intended_col = col + intended_dc

                if self.is_valid_state(intended_row, intended_col):
                    next_state = self.coords_to_state(intended_row, intended_col)
                    self.P[state, action, next_state] += 0.8
                else:
                    # Stay in place if hitting wall
                    self.P[state, action, state] += 0.8

                # Perpendicular actions (probability 0.1 each)
                perp_actions = [(action - 1) % 4, (action + 1) % 4]

                for perp_action in perp_actions:
                    perp_dr, perp_dc = self.actions[perp_action]
                    perp_row = row + perp_dr
                    perp_col = col + perp_dc

                    if self.is_valid_state(perp_row, perp_col):
                        next_state = self.coords_to_state(perp_row, perp_col)
                        self.P[state, action, next_state] += 0.1
                    else:
                        # Stay in place if hitting wall
                        self.P[state, action, state] += 0.1

                # Set rewards
                for next_state in range(self.n_states):
                    if self.P[state, action, next_state] > 0:
                        next_row, next_col = self.state_to_coords(next_state)

                        if (next_row, next_col) in self.goal_states:
                            self.R[state, action, next_state] = 10.0
                        else:
                            self.R[state, action, next_state] = -0.1  # Living penalty

    def value_iteration(self, gamma=0.9, theta=1e-6, max_iterations=1000):
        """
        Perform value iteration to find optimal value function and policy.
        """
        # Initialize value function
        V = np.zeros(self.n_states)

        history = []

        for iteration in range(max_iterations):
            V_old = V.copy()

            for state in range(self.n_states):
                # Compute Q-values for all actions
                q_values = np.zeros(self.n_actions)

                for action in range(self.n_actions):
                    q_values[action] = np.sum(
                        self.P[state, action, :] *
                        (self.R[state, action, :] + gamma * V_old)
                    )

                # Update value function
                V[state] = np.max(q_values)

            # Track convergence
            delta = np.max(np.abs(V - V_old))
            history.append(delta)

            if delta < theta:
                break

        # Extract optimal policy
        policy = np.zeros(self.n_states, dtype=int)

        for state in range(self.n_states):
            q_values = np.zeros(self.n_actions)

            for action in range(self.n_actions):
                q_values[action] = np.sum(
                    self.P[state, action, :] *
                    (self.R[state, action, :] + gamma * V)
                )

            policy[state] = np.argmax(q_values)

        return V, policy, history

    def policy_evaluation(self, policy, gamma=0.9, theta=1e-6, max_iterations=1000):
        """
        Evaluate a given policy.
        """
        V = np.zeros(self.n_states)

        for iteration in range(max_iterations):
            V_old = V.copy()

            for state in range(self.n_states):
                action = policy[state]
                V[state] = np.sum(
                    self.P[state, action, :] *
                    (self.R[state, action, :] + gamma * V_old)
                )

            if np.max(np.abs(V - V_old)) < theta:
                break

        return V

    def policy_iteration(self, gamma=0.9, max_iterations=100):
        """
        Perform policy iteration.
        """
        # Initialize random policy
        policy = np.random.randint(0, self.n_actions, self.n_states)

        for iteration in range(max_iterations):
            # Policy evaluation
            V = self.policy_evaluation(policy, gamma)

            # Policy improvement
            policy_stable = True

            for state in range(self.n_states):
                old_action = policy[state]

                q_values = np.zeros(self.n_actions)
                for action in range(self.n_actions):
                    q_values[action] = np.sum(
                        self.P[state, action, :] *
                        (self.R[state, action, :] + gamma * V)
                    )

                policy[state] = np.argmax(q_values)

                if old_action != policy[state]:
                    policy_stable = False

            if policy_stable:
                break

        return V, policy

    def visualize_policy(self, policy, values=None):
        """
        Visualize policy and optionally value function.
        """
        action_symbols = ['↑', '→', '↓', '←']

        if values is not None:
            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
        else:
            fig, ax1 = plt.subplots(1, 1, figsize=(6, 5))

        # Policy visualization
        policy_grid = np.empty((self.height, self.width), dtype=object)

        for state in range(self.n_states):
            row, col = self.state_to_coords(state)

            if (row, col) in self.obstacle_states:
                policy_grid[row, col] = '■'
            elif (row, col) in self.goal_states:
                policy_grid[row, col] = 'G'
            else:
                policy_grid[row, col] = action_symbols[policy[state]]

        # Create color map
        colors = np.ones((self.height, self.width, 3))
        for row, col in self.obstacle_states:
            colors[row, col] = [0.3, 0.3, 0.3]  # Gray for obstacles
        for row, col in self.goal_states:
            colors[row, col] = [0.2, 0.8, 0.2]  # Green for goals

        ax1.imshow(colors)

        # Add policy symbols
        for i in range(self.height):
            for j in range(self.width):
                ax1.text(j, i, policy_grid[i, j], ha='center', va='center',
                        fontsize=16, fontweight='bold')

        ax1.set_title('Optimal Policy')
        ax1.set_xticks(range(self.width))
        ax1.set_yticks(range(self.height))
        ax1.grid(True)

        # Value function visualization
        if values is not None:
            value_grid = np.zeros((self.height, self.width))

            for state in range(self.n_states):
                row, col = self.state_to_coords(state)
                value_grid[row, col] = values[state]

            im = ax2.imshow(value_grid, cmap='viridis')

            # Add value text
            for i in range(self.height):
                for j in range(self.width):
                    ax2.text(j, i, f'{value_grid[i, j]:.2f}', ha='center', va='center',
                            fontsize=10, color='white', fontweight='bold')

            ax2.set_title('Value Function')
            ax2.set_xticks(range(self.width))
            ax2.set_yticks(range(self.height))
            plt.colorbar(im, ax=ax2)

        plt.tight_layout()
        plt.show()

# Create and solve MDP
mdp = GridWorldMDP(height=4, width=4, goal_states=[(0, 3), (3, 3)],
                   obstacle_states=[(1, 1), (2, 2)])

print("Grid World MDP:")
print(f"States: {mdp.n_states}")
print(f"Actions: {mdp.n_actions}")
print(f"Goal states: {mdp.goal_states}")
print(f"Obstacle states: {mdp.obstacle_states}")

# Solve using Value Iteration
print("\nSolving with Value Iteration...")
V_vi, policy_vi, vi_history = mdp.value_iteration(gamma=0.9)
print(f"Converged in {len(vi_history)} iterations")

# Solve using Policy Iteration
print("\nSolving with Policy Iteration...")
V_pi, policy_pi = mdp.policy_iteration(gamma=0.9)

# Visualize results
print("\nValue Iteration Results:")
mdp.visualize_policy(policy_vi, V_vi)

print("\nPolicy Iteration Results:")
mdp.visualize_policy(policy_pi, V_pi)

# Compare convergence
plt.figure(figsize=(10, 4))
plt.plot(vi_history, label='Value Iteration')
plt.xlabel('Iteration')
plt.ylabel('Max Value Change')
plt.title('Value Iteration Convergence')
plt.yscale('log')
plt.grid(True, alpha=0.3)
plt.legend()
plt.show()

# Verify solutions are the same
print(f"\nSolutions identical: {np.allclose(V_vi, V_pi) and np.array_equal(policy_vi, policy_pi)}")
print(f"Max value difference: {np.max(np.abs(V_vi - V_pi)):.6f}")

# 8. Temporal Difference Learning

## Theoretical Foundation

**Temporal Difference (TD) Learning** combines ideas from Monte Carlo and Dynamic Programming. It learns directly from experience without a model and updates estimates based on other estimates.

### Key Concepts:

#### TD Error:
$$\delta_t = R_{t+1} + \gamma V(S_{t+1}) - V(S_t)$$

The TD error measures the difference between the estimated value and the actual observed reward plus discounted next state value.

#### TD(0) Update Rule:
$$V(S_t) \leftarrow V(S_t) + \alpha [R_{t+1} + \gamma V(S_{t+1}) - V(S_t)]$$
$$V(S_t) \leftarrow V(S_t) + \alpha \delta_t$$

Where $\alpha$ is the learning rate.

### Advantages of TD Learning:

1. **Model-Free**: No need for environment model
2. **Online**: Can learn from incomplete episodes
3. **Bootstrapping**: Uses current estimates to improve estimates
4. **Sample Efficient**: Often faster than Monte Carlo

### TD(λ) - Eligibility Traces (Continued):

Where:
- $\lambda = 0$: TD(0) - only current state updated
- $\lambda = 1$: Equivalent to Monte Carlo
- $\lambda \in (0,1)$: Balance between TD and MC

### Comparison with Other Methods:

| Method | Model Required | Bootstrapping | Online | Convergence |
|--------|---------------|---------------|--------|-----------|
| DP | Yes | Yes | No | Guaranteed |
| MC | No | No | No | Guaranteed |
| TD | No | Yes | Yes | Guaranteed* |

*Under certain conditions (e.g., function approximation may not converge)

In [None]:
# Temporal Difference Learning Implementation

class TDAgent:
    """
    Temporal Difference learning agent for state value estimation.
    """
    def __init__(self, n_states, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.n_states = n_states
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate

        # Initialize value function
        self.V = np.zeros(n_states)

        # For tracking learning progress
        self.td_errors = []
        self.value_history = []

    def get_action(self, state, valid_actions):
        """
        Epsilon-greedy action selection based on value function.
        """
        if np.random.random() < self.epsilon:
            return np.random.choice(valid_actions)
        else:
            # Greedy action (this is simplified - normally we'd use Q-values)
            return np.random.choice(valid_actions)

    def td_update(self, state, reward, next_state, done):
        """
        Perform TD(0) update.
        """
        if done:
            target = reward
        else:
            target = reward + self.gamma * self.V[next_state]

        # TD error
        td_error = target - self.V[state]

        # Update value function
        self.V[state] += self.alpha * td_error

        # Store for analysis
        self.td_errors.append(abs(td_error))

        return td_error

    def save_values(self):
        """Save current value function for tracking."""
        self.value_history.append(self.V.copy())

class TDLambdaAgent(TDAgent):
    """
    TD(λ) agent with eligibility traces.
    """
    def __init__(self, n_states, alpha=0.1, gamma=0.9, epsilon=0.1, lambda_=0.9):
        super().__init__(n_states, alpha, gamma, epsilon)
        self.lambda_ = lambda_
        self.eligibility_traces = np.zeros(n_states)

    def reset_traces(self):
        """Reset eligibility traces for new episode."""
        self.eligibility_traces = np.zeros(self.n_states)

    def td_lambda_update(self, state, reward, next_state, done):
        """
        Perform TD(λ) update with eligibility traces.
        """
        # Compute TD error
        if done:
            target = reward
        else:
            target = reward + self.gamma * self.V[next_state]

        td_error = target - self.V[state]

        # Update eligibility traces
        self.eligibility_traces *= self.gamma * self.lambda_
        self.eligibility_traces[state] += 1

        # Update all states proportional to their eligibility
        self.V += self.alpha * td_error * self.eligibility_traces

        self.td_errors.append(abs(td_error))

        return td_error

# Test TD learning on a simple chain MDP
class ChainMDP:
    """
    Simple chain MDP for testing TD learning.
    States: 0 -> 1 -> 2 -> 3 -> 4 (terminal)
    Reward only at the end.
    """
    def __init__(self, length=5, reward=1.0):
        self.length = length
        self.reward = reward
        self.reset()

    def reset(self):
        self.state = 0
        return self.state

    def step(self, action):
        # Only one action: move right
        self.state += 1

        if self.state >= self.length - 1:
            reward = self.reward
            done = True
        else:
            reward = 0.0
            done = False

        return self.state, reward, done

# Compare TD(0), TD(λ), and Monte Carlo
def compare_td_methods():
    chain = ChainMDP(length=5)
    n_episodes = 500

    # Create agents
    td0_agent = TDAgent(n_states=5, alpha=0.1)
    td_lambda_agent = TDLambdaAgent(n_states=5, alpha=0.1, lambda_=0.9)
    mc_values = np.zeros(5)  # Monte Carlo estimates

    # True values for comparison (computed analytically)
    gamma = 0.9
    true_values = np.array([gamma**4, gamma**3, gamma**2, gamma**1, 0.0])

    td0_errors = []
    td_lambda_errors = []
    mc_errors = []

    # Track episode returns for MC
    episode_returns = [[] for _ in range(5)]

    for episode in range(n_episodes):
        # TD(0) learning
        state = chain.reset()
        while True:
            next_state, reward, done = chain.step(0)
            td0_agent.td_update(state, reward, next_state, done)
            state = next_state
            if done:
                break

        # TD(λ) learning
        td_lambda_agent.reset_traces()
        state = chain.reset()
        while True:
            next_state, reward, done = chain.step(0)
            td_lambda_agent.td_lambda_update(state, reward, next_state, done)
            state = next_state
            if done:
                break

        # Monte Carlo learning
        state = chain.reset()
        episode_states = [state]
        episode_rewards = []

        while True:
            next_state, reward, done = chain.step(0)
            episode_rewards.append(reward)
            if not done:
                episode_states.append(next_state)
            else:
                break

        # Compute returns and update MC estimates
        G = 0
        for i in reversed(range(len(episode_states))):
            G = episode_rewards[i] + gamma * G
            episode_returns[episode_states[i]].append(G)

        # Update MC value estimates
        for state in range(5):
            if episode_returns[state]:
                mc_values[state] = np.mean(episode_returns[state])

        # Compute errors every 10 episodes
        if episode % 10 == 0:
            td0_error = np.mean(np.abs(td0_agent.V - true_values))
            td_lambda_error = np.mean(np.abs(td_lambda_agent.V - true_values))
            mc_error = np.mean(np.abs(mc_values - true_values))

            td0_errors.append(td0_error)
            td_lambda_errors.append(td_lambda_error)
            mc_errors.append(mc_error)

    return (td0_agent, td_lambda_agent, mc_values, true_values,
            td0_errors, td_lambda_errors, mc_errors)

# Run comparison
print("Comparing TD(0), TD(λ), and Monte Carlo methods...")
results = compare_td_methods()
td0_agent, td_lambda_agent, mc_values, true_values, td0_errors, td_lambda_errors, mc_errors = results

# Visualize results
plt.figure(figsize=(15, 10))

# Value function comparison
plt.subplot(2, 3, 1)
states = range(5)
plt.plot(states, true_values, 'ko-', label='True Values', linewidth=2)
plt.plot(states, td0_agent.V, 'bo-', label='TD(0)', linewidth=2)
plt.plot(states, td_lambda_agent.V, 'ro-', label='TD(λ)', linewidth=2)
plt.plot(states, mc_values, 'go-', label='Monte Carlo', linewidth=2)
plt.xlabel('State')
plt.ylabel('Value')
plt.title('Value Function Estimates')
plt.legend()
plt.grid(True, alpha=0.3)

# Learning curves
plt.subplot(2, 3, 2)
episodes = range(0, 500, 10)
plt.plot(episodes, td0_errors, 'b-', label='TD(0)', linewidth=2)
plt.plot(episodes, td_lambda_errors, 'r-', label='TD(λ)', linewidth=2)
plt.plot(episodes, mc_errors, 'g-', label='Monte Carlo', linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Mean Absolute Error')
plt.title('Learning Curves')
plt.legend()
plt.grid(True, alpha=0.3)

# TD errors over time
plt.subplot(2, 3, 3)
plt.plot(td0_agent.td_errors[:1000], alpha=0.7, label='TD(0)')
plt.plot(td_lambda_agent.td_errors[:1000], alpha=0.7, label='TD(λ)')
plt.xlabel('Update Step')
plt.ylabel('|TD Error|')
plt.title('TD Errors (First 1000 steps)')
plt.legend()
plt.grid(True, alpha=0.3)

# Error distribution
plt.subplot(2, 3, 4)
final_errors_td0 = np.abs(td0_agent.V - true_values)
final_errors_td_lambda = np.abs(td_lambda_agent.V - true_values)
final_errors_mc = np.abs(mc_values - true_values)

x = np.arange(5)
width = 0.25
plt.bar(x - width, final_errors_td0, width, label='TD(0)', alpha=0.7)
plt.bar(x, final_errors_td_lambda, width, label='TD(λ)', alpha=0.7)
plt.bar(x + width, final_errors_mc, width, label='Monte Carlo', alpha=0.7)
plt.xlabel('State')
plt.ylabel('Absolute Error')
plt.title('Final Estimation Errors')
plt.legend()
plt.xticks(x)

# Convergence comparison
plt.subplot(2, 3, 5)
final_mae_td0 = np.mean(final_errors_td0)
final_mae_td_lambda = np.mean(final_errors_td_lambda)
final_mae_mc = np.mean(final_errors_mc)

methods = ['TD(0)', 'TD(λ)', 'Monte Carlo']
final_errors = [final_mae_td0, final_mae_td_lambda, final_mae_mc]
colors = ['blue', 'red', 'green']

plt.bar(methods, final_errors, color=colors, alpha=0.7)
plt.ylabel('Mean Absolute Error')
plt.title('Final Performance Comparison')
plt.xticks(rotation=45)

# Learning efficiency
plt.subplot(2, 3, 6)
# Find episode where each method reaches within 10% of final error
threshold_td0 = final_mae_td0 * 1.1
threshold_td_lambda = final_mae_td_lambda * 1.1
threshold_mc = final_mae_mc * 1.1

conv_td0 = next((i for i, err in enumerate(td0_errors) if err <= threshold_td0), len(td0_errors))
conv_td_lambda = next((i for i, err in enumerate(td_lambda_errors) if err <= threshold_td_lambda), len(td_lambda_errors))
conv_mc = next((i for i, err in enumerate(mc_errors) if err <= threshold_mc), len(mc_errors))

convergence_episodes = [conv_td0 * 10, conv_td_lambda * 10, conv_mc * 10]
plt.bar(methods, convergence_episodes, color=colors, alpha=0.7)
plt.ylabel('Episodes to Converge')
plt.title('Convergence Speed')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

print("\nFinal Results:")
print(f"True Values:     {true_values}")
print(f"TD(0) Values:    {td0_agent.V}")
print(f"TD(λ) Values:    {td_lambda_agent.V}")
print(f"MC Values:       {mc_values}")

print(f"\nFinal MAE:")
print(f"TD(0):           {final_mae_td0:.4f}")
print(f"TD(λ):           {final_mae_td_lambda:.4f}")
print(f"Monte Carlo:     {final_mae_mc:.4f}")

# 9. Q-Learning

## Theoretical Foundation

**Q-Learning** is an off-policy TD control algorithm that learns the action-value function $Q(s,a)$ directly.

### Key Innovation:

Q-Learning learns the optimal action-value function regardless of the policy being followed:

$$Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha [r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)]$$

### Algorithm Properties:

1. **Off-Policy**: Can learn optimal policy while following any policy
2. **Model-Free**: Doesn't require environment model
3. **Convergence**: Guaranteed to converge to $Q^*$ under certain conditions
4. **Exploration**: Requires adequate exploration of state-action space

### Q-Learning Algorithm:

```
Initialize Q(s,a) arbitrarily
For each episode:
    Initialize s
    For each step of episode:
        Choose a from s using policy derived from Q (e.g., ε-greedy)
        Take action a, observe r, s'
        Q(s,a) ← Q(s,a) + α[r + γ max_a' Q(s',a') - Q(s,a)]
        s ← s'
    Until s is terminal
```

### Exploration Strategies:

#### ε-Greedy:
$$a = \begin{cases}
\arg\max_a Q(s,a) & \text{with probability } 1-\epsilon \\
\text{random action} & \text{with probability } \epsilon
\end{cases}$$

#### Boltzmann Exploration:
$$P(a|s) = \frac{e^{Q(s,a)/\tau}}{\sum_{a'} e^{Q(s,a')/\tau}}$$

Where $\tau$ is the temperature parameter.

#### Upper Confidence Bound (UCB):
$$a = \arg\max_a \left[Q(s,a) + c\sqrt{\frac{\ln t}{N(s,a)}}\right]$$

Where $N(s,a)$ is the number of times action $a$ was taken in state $s$.

In [None]:
# Q-Learning Implementation

class QLearningAgent:
    """
    Q-Learning agent implementation.
    """
    def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.95,
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        # Initialize Q-table
        self.Q = np.zeros((n_states, n_actions))

        # For tracking
        self.q_errors = []
        self.episode_rewards = []
        self.epsilon_history = []

        # Action counts for exploration analysis
        self.action_counts = np.zeros((n_states, n_actions))

    def get_action(self, state, valid_actions=None):
        """
        Choose action using ε-greedy policy.
        """
        if valid_actions is None:
            valid_actions = list(range(self.n_actions))

        if np.random.random() < self.epsilon:
            # Explore
            action = np.random.choice(valid_actions)
        else:
            # Exploit
            q_values = self.Q[state, valid_actions]
            best_action_idx = np.argmax(q_values)
            action = valid_actions[best_action_idx]

        self.action_counts[state, action] += 1
        return action

    def get_action_greedy(self, state, valid_actions=None):
        """
        Choose action greedily (for evaluation).
        """
        if valid_actions is None:
            valid_actions = list(range(self.n_actions))

        q_values = self.Q[state, valid_actions]
        best_action_idx = np.argmax(q_values)
        return valid_actions[best_action_idx]

    def update(self, state, action, reward, next_state, done):
        """
        Update Q-table using Q-learning rule.
        """
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.Q[next_state])

        # Q-learning update
        old_q = self.Q[state, action]
        self.Q[state, action] += self.alpha * (target - old_q)

        # Track Q-value changes
        q_error = abs(target - old_q)
        self.q_errors.append(q_error)

        return q_error

    def decay_epsilon(self):
        """
        Decay exploration rate.
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        self.epsilon_history.append(self.epsilon)

    def get_policy(self):
        """
        Extract greedy policy from Q-table.
        """
        return np.argmax(self.Q, axis=1)

    def get_value_function(self):
        """
        Extract value function from Q-table.
        """
        return np.max(self.Q, axis=1)

# Test on Grid World
class SimpleGridWorld:
    """
    Simple grid world for testing Q-learning.
    """
    def __init__(self, height=4, width=4, goal_states=[(3,3)],
                 obstacle_states=[(1,1)], start_state=(0,0)):
        self.height = height
        self.width = width
        self.goal_states = goal_states
        self.obstacle_states = obstacle_states
        self.start_state = start_state

        self.n_states = height * width
        self.n_actions = 4  # up, right, down, left

        self.action_effects = {
            0: (-1, 0),  # up
            1: (0, 1),   # right
            2: (1, 0),   # down
            3: (0, -1)   # left
        }

        self.reset()

    def state_to_coords(self, state):
        return (state // self.width, state % self.width)

    def coords_to_state(self, row, col):
        return row * self.width + col

    def reset(self):
        self.current_pos = self.start_state
        return self.coords_to_state(*self.current_pos)

    def step(self, action):
        dr, dc = self.action_effects[action]
        new_row = self.current_pos[0] + dr
        new_col = self.current_pos[1] + dc

        # Check boundaries and obstacles
        if (0 <= new_row < self.height and 0 <= new_col < self.width and
            (new_row, new_col) not in self.obstacle_states):
            self.current_pos = (new_row, new_col)

        # Compute reward
        if self.current_pos in self.goal_states:
            reward = 10.0
            done = True
        else:
            reward = -0.1  # Small penalty for each step
            done = False

        next_state = self.coords_to_state(*self.current_pos)
        return next_state, reward, done

    def render(self):
        grid = np.zeros((self.height, self.width))

        # Mark obstacles
        for obs in self.obstacle_states:
            grid[obs] = -1

        # Mark goals
        for goal in self.goal_states:
            grid[goal] = 2

        # Mark current position
        grid[self.current_pos] = 1

        return grid

# Train Q-learning agent
def train_q_learning(env, agent, n_episodes=1000, max_steps=100):
    """
    Train Q-learning agent.
    """
    episode_rewards = []
    episode_lengths = []

    for episode in range(n_episodes):
        state = env.reset()
        episode_reward = 0
        steps = 0

        for step in range(max_steps):
            action = agent.get_action(state)
            next_state, reward, done = env.step(action)

            agent.update(state, action, reward, next_state, done)

            state = next_state
            episode_reward += reward
            steps += 1

            if done:
                break

        agent.decay_epsilon()
        episode_rewards.append(episode_reward)
        episode_lengths.append(steps)

        if episode % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode}, Avg Reward: {avg_reward:.2f}, ε: {agent.epsilon:.3f}")

    return episode_rewards, episode_lengths

# Create environment and agent
env = SimpleGridWorld(height=4, width=4, goal_states=[(3,3)],
                     obstacle_states=[(1,1), (2,2)])

agent = QLearningAgent(n_states=16, n_actions=4, alpha=0.1, gamma=0.95,
                      epsilon=1.0, epsilon_decay=0.995)

print("Training Q-Learning Agent...")
episode_rewards, episode_lengths = train_q_learning(env, agent, n_episodes=1000)

print("\nFinal Results:")
print(f"Final ε: {agent.epsilon:.4f}")
print(f"Final 100 episodes avg reward: {np.mean(episode_rewards[-100:]):.2f}")
print(f"Final 100 episodes avg length: {np.mean(episode_lengths[-100:]):.1f}")

In [None]:
# Analyze Q-Learning Results

# Visualize Q-table and learned policy
def visualize_q_learning_results(agent, env):
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))

    # Q-values for each action
    action_names = ['Up', 'Right', 'Down', 'Left']

    for i, action_name in enumerate(action_names):
        ax = axes[0, i] if i < 2 else axes[1, i-2]

        q_values = agent.Q[:, i].reshape(env.height, env.width)

        im = ax.imshow(q_values, cmap='RdBu', interpolation='nearest')
        ax.set_title(f'Q-values for {action_name}')

        # Add text annotations
        for row in range(env.height):
            for col in range(env.width):
                ax.text(col, row, f'{q_values[row, col]:.2f}',
                       ha='center', va='center', fontsize=10)

        plt.colorbar(im, ax=ax)
        ax.set_xticks(range(env.width))
        ax.set_yticks(range(env.height))

    # Policy visualization
    ax = axes[1, 2]
    policy = agent.get_policy().reshape(env.height, env.width)
    value_function = agent.get_value_function().reshape(env.height, env.width)

    im = ax.imshow(value_function, cmap='viridis', interpolation='nearest')

    # Add policy arrows
    arrow_symbols = ['↑', '→', '↓', '←']
    for row in range(env.height):
        for col in range(env.width):
            if (row, col) in env.obstacle_states:
                symbol = '■'
                color = 'red'
            elif (row, col) in env.goal_states:
                symbol = 'G'
                color = 'gold'
            else:
                symbol = arrow_symbols[policy[row, col]]
                color = 'white'

            ax.text(col, row, symbol, ha='center', va='center',
                   fontsize=16, color=color, fontweight='bold')

    ax.set_title('Learned Policy & Value Function')
    plt.colorbar(im, ax=ax)
    ax.set_xticks(range(env.width))
    ax.set_yticks(range(env.height))

    plt.tight_layout()
    plt.show()

# Visualize training progress
def plot_training_progress(agent, episode_rewards, episode_lengths):
    fig, axes = plt.subplots(2, 3, figsize=(18, 10))

    # Episode rewards
    ax = axes[0, 0]
    window_size = 50
    moving_avg = pd.Series(episode_rewards).rolling(window_size).mean()
    ax.plot(episode_rewards, alpha=0.3, color='blue')
    ax.plot(moving_avg, color='red', linewidth=2)
    ax.set_xlabel('Episode')
    ax.set_ylabel('Total Reward')
    ax.set_title('Episode Rewards')
    ax.grid(True, alpha=0.3)

    # Episode lengths
    ax = axes[0, 1]
    moving_avg_length = pd.Series(episode_lengths).rolling(window_size).mean()
    ax.plot(episode_lengths, alpha=0.3, color='green')
    ax.plot(moving_avg_length, color='red', linewidth=2)
    ax.set_xlabel('Episode')
    ax.set_ylabel('Episode Length')
    ax.set_title('Episode Lengths')
    ax.grid(True, alpha=0.3)

    # Epsilon decay
    ax = axes[0, 2]
    ax.plot(agent.epsilon_history)
    ax.set_xlabel('Episode')
    ax.set_ylabel('Epsilon')
    ax.set_title('Exploration Rate Decay')
    ax.grid(True, alpha=0.3)

    # Q-value changes
    ax = axes[1, 0]
    q_error_ma = pd.Series(agent.q_errors).rolling(1000).mean()
    ax.plot(agent.q_errors[:5000], alpha=0.3, color='orange')
    ax.plot(q_error_ma[:5000], color='red', linewidth=2)
    ax.set_xlabel('Update Step')
    ax.set_ylabel('Q-value Change')
    ax.set_title('Q-value Updates (First 5000)')
    ax.grid(True, alpha=0.3)

    # Action distribution
    ax = axes[1, 1]
    total_actions = np.sum(agent.action_counts, axis=0)
    action_names = ['Up', 'Right', 'Down', 'Left']
    ax.bar(action_names, total_actions)
    ax.set_ylabel('Action Count')
    ax.set_title('Total Action Distribution')
    ax.tick_params(axis='x', rotation=45)

    # State visitation
    ax = axes[1, 2]
    state_visits = np.sum(agent.action_counts, axis=1).reshape(env.height, env.width)
    im = ax.imshow(state_visits, cmap='Blues', interpolation='nearest')

    for row in range(env.height):
        for col in range(env.width):
            ax.text(col, row, f'{int(state_visits[row, col])}',
                   ha='center', va='center', fontsize=10)

    ax.set_title('State Visitation Counts')
    plt.colorbar(im, ax=ax)
    ax.set_xticks(range(env.width))
    ax.set_yticks(range(env.height))

    plt.tight_layout()
    plt.show()

# Test learned policy
def test_learned_policy(agent, env, n_episodes=10):
    """
    Test the learned policy without exploration.
    """
    test_rewards = []
    test_lengths = []

    for episode in range(n_episodes):
        state = env.reset()
        episode_reward = 0
        steps = 0
        path = [env.state_to_coords(state)]

        for step in range(100):  # Max steps
            action = agent.get_action_greedy(state)
            next_state, reward, done = env.step(action)

            path.append(env.state_to_coords(next_state))
            state = next_state
            episode_reward += reward
            steps += 1

            if done:
                break

        test_rewards.append(episode_reward)
        test_lengths.append(steps)

        if episode == 0:  # Show first episode path
            print(f"Sample path: {path}")

    print(f"\nTest Results ({n_episodes} episodes):")
    print(f"Average reward: {np.mean(test_rewards):.2f} ± {np.std(test_rewards):.2f}")
    print(f"Average length: {np.mean(test_lengths):.1f} ± {np.std(test_lengths):.1f}")
    print(f"Success rate: {np.mean([r > 5 for r in test_rewards]) * 100:.1f}%")

    return test_rewards, test_lengths

# Run analysis
print("\nAnalyzing Q-Learning Results...")
visualize_q_learning_results(agent, env)
plot_training_progress(agent, episode_rewards, episode_lengths)

# Test final policy
print("\nTesting Learned Policy...")
test_rewards, test_lengths = test_learned_policy(agent, env, n_episodes=20)

# Compare with random policy
print("\nComparing with Random Policy...")
random_rewards = []
for _ in range(20):
    state = env.reset()
    episode_reward = 0
    for step in range(100):
        action = np.random.randint(4)
        next_state, reward, done = env.step(action)
        state = next_state
        episode_reward += reward
        if done:
            break
    random_rewards.append(episode_reward)

print(f"Random policy average reward: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f"Q-learning improvement: {np.mean(test_rewards) - np.mean(random_rewards):.2f}")

# 10. Deep Q-Learning (DQN)

## Theoretical Foundation

**Deep Q-Networks (DQN)** combine Q-learning with deep neural networks to handle high-dimensional state spaces.

### Key Innovations:

#### 1. Function Approximation:
Instead of a Q-table, use a neural network:
$$Q(s,a;θ) ≈ Q^*(s,a)$$

#### 2. Experience Replay:
Store experiences $(s_t, a_t, r_t, s_{t+1})$ in a replay buffer and sample randomly for training.

#### 3. Target Network:
Use a separate target network $Q(s,a;θ^-)$ for computing targets:
$$y_t = r_t + \gamma \max_{a'} Q(s_{t+1}, a'; θ^-)$$

### DQN Loss Function:
$$L(θ) = E_{(s,a,r,s') ∼ D}[(y - Q(s,a;θ))^2]$$

Where $y = r + \gamma \max_{a'} Q(s',a';θ^-)$

### Algorithm:
```
Initialize replay buffer D
Initialize Q-network with random weights θ
Initialize target Q-network with weights θ^- = θ

For episode = 1 to M:
    Initialize state s_1
    For t = 1 to T:
        Select action a_t using ε-greedy policy
        Execute action a_t, observe r_t and s_{t+1}
        Store (s_t, a_t, r_t, s_{t+1}) in D
        Sample random minibatch from D
        Perform gradient descent on DQN loss
        Every C steps: θ^- ← θ
```

### Advantages:
- Handles continuous/high-dimensional state spaces
- Learns efficient representations
- Can generalize to unseen states

### Challenges:
- Training instability
- Hyperparameter sensitivity
- Sample efficiency

In [None]:
# Deep Q-Network Implementation

from collections import deque
import tensorflow as tf
from tensorflow import keras

class DQNAgent:
    """
    Deep Q-Network agent implementation.
    """
    def __init__(self, state_size, action_size, lr=0.001, gamma=0.99,
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01,
                 buffer_size=10000, batch_size=32, target_update_freq=100):

        self.state_size = state_size
        self.action_size = action_size
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq

        # Experience replay buffer
        self.memory = deque(maxlen=buffer_size)

        # Neural networks
        self.q_network = self._build_model()
        self.target_network = self._build_model()
        self.update_target_network()

        # Training tracking
        self.losses = []
        self.episode_rewards = []
        self.epsilon_history = []
        self.training_step = 0

    def _build_model(self):
        """
        Build the deep Q-network.
        """
        model = keras.Sequential([
            keras.layers.Dense(64, activation='relu', input_shape=(self.state_size,)),
            keras.layers.Dense(64, activation='relu'),
            keras.layers.Dense(32, activation='relu'),
            keras.layers.Dense(self.action_size, activation='linear')
        ])

        model.compile(optimizer=keras.optimizers.Adam(learning_rate=self.lr),
                     loss='mse')
        return model

    def update_target_network(self):
        """
        Copy weights from main network to target network.
        """
        self.target_network.set_weights(self.q_network.get_weights())

    def remember(self, state, action, reward, next_state, done):
        """
        Store experience in replay buffer.
        """
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        """
        Choose action using ε-greedy policy.
        """
        if np.random.random() <= self.epsilon:
            return np.random.choice(self.action_size)

        state = np.array([state])
        q_values = self.q_network.predict(state, verbose=0)
        return np.argmax(q_values[0])

    def act_greedy(self, state):
        """
        Choose action greedily (for evaluation).
        """
        state = np.array([state])
        q_values = self.q_network.predict(state, verbose=0)
        return np.argmax(q_values[0])

    def replay(self):
        """
        Train the model on a batch of experiences.
        """
        if len(self.memory) < self.batch_size:
            return

        # Sample random batch
        batch = random.sample(self.memory, self.batch_size)
        states = np.array([e[0] for e in batch])
        actions = np.array([e[1] for e in batch])
        rewards = np.array([e[2] for e in batch])
        next_states = np.array([e[3] for e in batch])
        dones = np.array([e[4] for e in batch])

        # Compute current Q-values
        current_q_values = self.q_network.predict(states, verbose=0)

        # Compute next Q-values using target network
        next_q_values = self.target_network.predict(next_states, verbose=0)

        # Compute targets
        targets = current_q_values.copy()
        for i in range(self.batch_size):
            if dones[i]:
                targets[i][actions[i]] = rewards[i]
            else:
                targets[i][actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])

        # Train the model
        history = self.q_network.fit(states, targets, epochs=1, verbose=0)
        loss = history.history['loss'][0]
        self.losses.append(loss)

        # Update target network periodically
        self.training_step += 1
        if self.training_step % self.target_update_freq == 0:
            self.update_target_network()

        return loss

    def decay_epsilon(self):
        """
        Decay exploration rate.
        """
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        self.epsilon_history.append(self.epsilon)

# Train DQN on CartPole
def train_dqn_cartpole(episodes=1000):
    """
    Train DQN agent on CartPole environment.
    """
    # Create environment
    env = gym.make('CartPole-v1')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n

    # Create DQN agent
    agent = DQNAgent(state_size=state_size, action_size=action_size,
                    lr=0.001, gamma=0.99, epsilon=1.0, epsilon_decay=0.995,
                    epsilon_min=0.01, buffer_size=10000, batch_size=32)

    scores = []
    scores_window = deque(maxlen=100)

    for episode in range(episodes):
        state = env.reset()
        score = 0

        for step in range(500):  # Max steps per episode
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)

            # Store experience
            agent.remember(state, action, reward, next_state, done)

            state = next_state
            score += reward

            if done:
                break

        # Train the agent
        if len(agent.memory) > agent.batch_size:
            agent.replay()

        # Update exploration rate
        agent.decay_epsilon()

        scores.append(score)
        scores_window.append(score)
        agent.episode_rewards.append(score)

        # Print progress
        if episode % 100 == 0:
            mean_score = np.mean(scores_window)
            print(f"Episode {episode}, Average Score: {mean_score:.2f}, "
                  f"ε: {agent.epsilon:.3f}, Buffer Size: {len(agent.memory)}")

            # Check if solved
            if mean_score >= 195.0:
                print(f"\nEnvironment solved in {episode} episodes!")
                print(f"Average Score: {mean_score:.2f}")
                break

    env.close()
    return agent, scores

# Train the DQN agent
print("Training DQN Agent on CartPole...")
dqn_agent, dqn_scores = train_dqn_cartpole(episodes=1000)

print(f"\nTraining completed!")
print(f"Final ε: {dqn_agent.epsilon:.4f}")
print(f"Buffer size: {len(dqn_agent.memory)}")
print(f"Training steps: {dqn_agent.training_step}")

In [None]:
# Analyze DQN Training Results

def analyze_dqn_performance(agent, scores):
    """
    Comprehensive analysis of DQN training.
    """
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))

    # Training progress
    ax = axes[0, 0]
    window_size = 100
    moving_avg = pd.Series(scores).rolling(window_size).mean()
    ax.plot(scores, alpha=0.3, color='blue', label='Episode Score')
    ax.plot(moving_avg, color='red', linewidth=2, label=f'{window_size}-Episode Average')
    ax.axhline(y=195, color='green', linestyle='--', label='Solved Threshold')
    ax.set_xlabel('Episode')
    ax.set_ylabel('Score')
    ax.set_title('DQN Training Progress')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # Epsilon decay
    ax = axes[0, 1]
    ax.plot(agent.epsilon_history)
    ax.set_xlabel('Episode')
    ax.set_ylabel('Epsilon')
    ax.set_title('Exploration Rate Decay')
    ax.grid(True, alpha=0.3)

    # Loss progression
    ax = axes[0, 2]
    if agent.losses:
        loss_ma = pd.Series(agent.losses).rolling(100).mean()
        ax.plot(agent.losses, alpha=0.3, color='orange', label='Loss')
        ax.plot(loss_ma, color='red', linewidth=2, label='100-Step Average')
        ax.set_xlabel('Training Step')
        ax.set_ylabel('Loss')
        ax.set_title('Training Loss')
        ax.legend()
        ax.grid(True, alpha=0.3)

    # Score distribution over time
    ax = axes[1, 0]
    early_scores = scores[:len(scores)//3]
    middle_scores = scores[len(scores)//3:2*len(scores)//3]
    late_scores = scores[2*len(scores)//3:]

    ax.hist(early_scores, bins=20, alpha=0.5, label='Early Training', color='red')
    ax.hist(middle_scores, bins=20, alpha=0.5, label='Middle Training', color='orange')
    ax.hist(late_scores, bins=20, alpha=0.5, label='Late Training', color='blue')
    ax.set_xlabel('Score')
    ax.set_ylabel('Frequency')
    ax.set_title('Score Distribution Evolution')
    ax.legend()

    # Learning curve segments
    ax = axes[1, 1]
    segment_size = 50
    segments = len(scores) // segment_size
    segment_means = []
    segment_stds = []

    for i in range(segments):
        start_idx = i * segment_size
        end_idx = (i + 1) * segment_size
        segment_scores = scores[start_idx:end_idx]
        segment_means.append(np.mean(segment_scores))
        segment_stds.append(np.std(segment_scores))

    segment_episodes = [(i + 0.5) * segment_size for i in range(segments)]
    ax.errorbar(segment_episodes, segment_means, yerr=segment_stds,
                fmt='o-', capsize=5, capthick=2)
    ax.axhline(y=195, color='green', linestyle='--', label='Solved Threshold')
    ax.set_xlabel('Episode')
    ax.set_ylabel('Mean Score ± Std')
    ax.set_title('Learning Curve with Variance')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # Performance comparison
    ax = axes[1, 2]
    final_performance = np.mean(scores[-100:])
    random_performance = 20  # Approximate random performance

    methods = ['Random', 'DQN']
    performances = [random_performance, final_performance]
    colors = ['red', 'blue']

    bars = ax.bar(methods, performances, color=colors, alpha=0.7)
    ax.axhline(y=195, color='green', linestyle='--', label='Solved Threshold')
    ax.set_ylabel('Average Score')
    ax.set_title('Final Performance Comparison')
    ax.legend()

    # Add value labels on bars
    for bar, value in zip(bars, performances):
        ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 5,
               f'{value:.1f}', ha='center', va='bottom')

    plt.tight_layout()
    plt.show()

    # Print summary statistics
    print("\nTraining Summary:")
    print(f"Total Episodes: {len(scores)}")
    print(f"Final 100 Episode Average: {np.mean(scores[-100:]):.2f} ± {np.std(scores[-100:]):.2f}")
    print(f"Best Episode Score: {np.max(scores)}")
    print(f"Episodes to solve (>195): {next((i for i, score in enumerate(pd.Series(scores).rolling(100).mean()) if score >= 195), 'Not solved')}")

    if agent.losses:
        print(f"Final Training Loss: {np.mean(agent.losses[-100:]):.4f}")

    print(f"Memory Buffer Utilization: {len(agent.memory)}/{agent.memory.maxlen} ({len(agent.memory)/agent.memory.maxlen*100:.1f}%)")

# Test DQN agent performance
def test_dqn_agent(agent, n_episodes=20):
    """
    Test the trained DQN agent.
    """
    env = gym.make('CartPole-v1')
    test_scores = []

    for episode in range(n_episodes):
        state = env.reset()
        score = 0

        for step in range(500):
            action = agent.act_greedy(state)
            state, reward, done, _ = env.step(action)
            score += reward

            if done:
                break

        test_scores.append(score)

    env.close()

    print(f"\nTest Results ({n_episodes} episodes):")
    print(f"Average Score: {np.mean(test_scores):.2f} ± {np.std(test_scores):.2f}")
    print(f"Min Score: {np.min(test_scores)}")
    print(f"Max Score: {np.max(test_scores)}")
    print(f"Success Rate (>195): {np.mean([score >= 195 for score in test_scores]) * 100:.1f}%")

    return test_scores

# Analyze Q-network predictions
def analyze_q_network(agent):
    """
    Analyze what the Q-network has learned.
    """
    # Generate test states
    n_samples = 1000
    test_states = []

    # Sample random states
    for _ in range(n_samples):
        cart_pos = np.random.uniform(-2.4, 2.4)
        cart_vel = np.random.uniform(-3.0, 3.0)
        pole_angle = np.random.uniform(-0.2, 0.2)
        pole_vel = np.random.uniform(-3.0, 3.0)
        test_states.append([cart_pos, cart_vel, pole_angle, pole_vel])

    test_states = np.array(test_states)

    # Get Q-values
    q_values = agent.q_network.predict(test_states, verbose=0)

    # Analyze Q-values
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))

    # Q-values vs cart position
    ax = axes[0, 0]
    ax.scatter(test_states[:, 0], q_values[:, 0], alpha=0.5, label='Left Action', s=10)
    ax.scatter(test_states[:, 0], q_values[:, 1], alpha=0.5, label='Right Action', s=10)
    ax.set_xlabel('Cart Position')
    ax.set_ylabel('Q-Value')
    ax.set_title('Q-Values vs Cart Position')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # Q-values vs pole angle
    ax = axes[0, 1]
    ax.scatter(test_states[:, 2], q_values[:, 0], alpha=0.5, label='Left Action', s=10)
    ax.scatter(test_states[:, 2], q_values[:, 1], alpha=0.5, label='Right Action', s=10)
    ax.set_xlabel('Pole Angle (rad)')
    ax.set_ylabel('Q-Value')
    ax.set_title('Q-Values vs Pole Angle')
    ax.legend()
    ax.grid(True, alpha=0.3)

    # Action preferences
    ax = axes[1, 0]
    preferred_actions = np.argmax(q_values, axis=1)
    action_counts = np.bincount(preferred_actions)
    ax.bar(['Left (0)', 'Right (1)'], action_counts)
    ax.set_ylabel('Frequency')
    ax.set_title('Preferred Action Distribution')

    # Q-value distribution
    ax = axes[1, 1]
    ax.hist(q_values[:, 0], bins=30, alpha=0.5, label='Left Action Q-Values')
    ax.hist(q_values[:, 1], bins=30, alpha=0.5, label='Right Action Q-Values')
    ax.set_xlabel('Q-Value')
    ax.set_ylabel('Frequency')
    ax.set_title('Q-Value Distribution')
    ax.legend()

    plt.tight_layout()
    plt.show()

    # Print statistics
    print("\nQ-Network Analysis:")
    print(f"Mean Q-Value (Left): {np.mean(q_values[:, 0]):.3f} ± {np.std(q_values[:, 0]):.3f}")
    print(f"Mean Q-Value (Right): {np.mean(q_values[:, 1]):.3f} ± {np.std(q_values[:, 1]):.3f}")
    print(f"Action Preference: {action_counts[0]/(action_counts[0]+action_counts[1])*100:.1f}% Left, {action_counts[1]/(action_counts[0]+action_counts[1])*100:.1f}% Right")

# Run all analyses
print("\nAnalyzing DQN Performance...")
analyze_dqn_performance(dqn_agent, dqn_scores)

print("\nTesting DQN Agent...")
test_scores = test_dqn_agent(dqn_agent, n_episodes=50)

print("\nAnalyzing Q-Network...")
analyze_q_network(dqn_agent)

# 11. DQN Variants and Improvements

## Theoretical Foundation

Several important improvements have been made to the basic DQN algorithm:

### 1. Double DQN (DDQN)

**Problem**: DQN tends to overestimate Q-values due to the max operator.

**Solution**: Use the main network to select actions, target network to evaluate them:
$$y = r + \gamma Q(s', \arg\max_{a'} Q(s', a'; θ); θ^-)$$

### 2. Dueling DQN

**Innovation**: Separate value and advantage estimation:
$$Q(s,a) = V(s) + A(s,a) - \frac{1}{|A|}\sum_{a'} A(s,a')$$

Where:
- $V(s)$: State value function
- $A(s,a)$: Advantage function

### 3. Prioritized Experience Replay (PER)

**Innovation**: Sample experiences based on their TD error:
$$P(i) = \frac{p_i^α}{\sum_k p_k^α}$$

Where $p_i = |\delta_i| + ε$ and $α$ controls prioritization strength.

**Importance Sampling**: Correct for bias with weights:
$$w_i = \left(\frac{1}{N} \cdot \frac{1}{P(i)}\right)^β$$

### 4. Multi-Step Learning

**Innovation**: Use n-step returns instead of 1-step:
$$y = \sum_{k=0}^{n-1} \gamma^k r_{t+k+1} + \gamma^n \max_{a'} Q(s_{t+n}, a'; θ^-)$$

### 5. Noisy Networks

**Innovation**: Add learnable noise to network weights for exploration:
$$y = (μ^w + σ^w ⊙ ε^w)x + μ^b + σ^b ⊙ ε^b$$

### 6. Rainbow DQN

**Combination**: Integrates multiple improvements:
- Double DQN
- Dueling Networks
- Prioritized Experience Replay
- Multi-step Learning
- Noisy Networks
- Distributional RL

In [None]:
# Implementation of DQN Variants

class DoubleDQNAgent(DQNAgent):
    """
    Double DQN implementation.
    """
    def replay(self):
        """
        Train using Double DQN update rule.
        """
        if len(self.memory) < self.batch_size:
            return

        # Sample random batch
        batch = random.sample(self.memory, self.batch_size)
        states = np.array([e[0] for e in batch])
        actions = np.array([e[1] for e in batch])
        rewards = np.array([e[2] for e in batch])
        next_states = np.array([e[3] for e in batch])
        dones = np.array([e[4] for e in batch])

        # Compute current Q-values
        current_q_values = self.q_network.predict(states, verbose=0)

        # Double DQN: Use main network to select actions
        next_q_values_main = self.q_network.predict(next_states, verbose=0)
        next_actions = np.argmax(next_q_values_main, axis=1)

        # Use target network to evaluate selected actions
        next_q_values_target = self.target_network.predict(next_states, verbose=0)

        # Compute targets
        targets = current_q_values.copy()
        for i in range(self.batch_size):
            if dones[i]:
                targets[i][actions[i]] = rewards[i]
            else:
                # Double DQN target
                targets[i][actions[i]] = rewards[i] + self.gamma * next_q_values_target[i][next_actions[i]]

        # Train the model
        history = self.q_network.fit(states, targets, epochs=1, verbose=0)
        loss = history.history['loss'][0]
        self.losses.append(loss)

        # Update target network periodically
        self.training_step += 1
        if self.training_step % self.target_update_freq == 0:
            self.update_target_network()

        return loss

class DuelingDQNAgent(DQNAgent):
    """
    Dueling DQN implementation.
    """
    def _build_model(self):
        """
        Build dueling network architecture.
        """
        # Input layer
        inputs = keras.layers.Input(shape=(self.state_size,))

        # Shared layers
        shared = keras.layers.Dense(64, activation='relu')(inputs)
        shared = keras.layers.Dense(64, activation='relu')(shared)

        # Value stream
        value_stream = keras.layers.Dense(32, activation='relu')(shared)
        value = keras.layers.Dense(1, activation='linear')(value_stream)

        # Advantage stream
        advantage_stream = keras.layers.Dense(32, activation='relu')(shared)
        advantage = keras.layers.Dense(self.action_size, activation='linear')(advantage_stream)

        # Combine value and advantage
        # Q(s,a) = V(s) + A(s,a) - mean(A(s,a))
        mean_advantage = keras.layers.Lambda(
            lambda x: tf.reduce_mean(x, axis=1, keepdims=True)
        )(advantage)

        q_values = keras.layers.Add()([value, advantage])
        q_values = keras.layers.Subtract()([q_values, mean_advantage])

        model = keras.Model(inputs=inputs, outputs=q_values)
        model.compile(optimizer=keras.optimizers.Adam(learning_rate=self.lr),
                     loss='mse')

        return model

class PrioritizedReplayBuffer:
    """
    Prioritized Experience Replay buffer.
    """
    def __init__(self, capacity, alpha=0.6, beta=0.4, beta_increment=0.001):
        self.capacity = capacity
        self.alpha = alpha
        self.beta = beta
        self.beta_increment = beta_increment

        self.buffer = []
        self.priorities = np.zeros(capacity, dtype=np.float32)
        self.position = 0
        self.size = 0

    def add(self, state, action, reward, next_state, done):
        """Add experience with maximum priority."""
        max_priority = np.max(self.priorities) if self.size > 0 else 1.0

        if self.size < self.capacity:
            self.buffer.append((state, action, reward, next_state, done))
            self.size += 1
        else:
            self.buffer[self.position] = (state, action, reward, next_state, done)

        self.priorities[self.position] = max_priority
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        """Sample batch with prioritized sampling."""
        if self.size < batch_size:
            return None, None, None

        # Compute sampling probabilities
        priorities = self.priorities[:self.size]
        probabilities = priorities ** self.alpha
        probabilities /= probabilities.sum()

        # Sample indices
        indices = np.random.choice(self.size, batch_size, p=probabilities)

        # Sample experiences
        experiences = [self.buffer[i] for i in indices]

        # Compute importance sampling weights
        weights = (self.size * probabilities[indices]) ** (-self.beta)
        weights /= weights.max()  # Normalize

        # Update beta
        self.beta = min(1.0, self.beta + self.beta_increment)

        return experiences, indices, weights

    def update_priorities(self, indices, priorities):
        """Update priorities for sampled experiences."""
        for i, priority in zip(indices, priorities):
            self.priorities[i] = priority + 1e-6  # Add small epsilon

# Compare DQN variants
def compare_dqn_variants(episodes=500):
    """
    Compare different DQN variants.
    """
    env = gym.make('CartPole-v1')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n

    # Create different agents
    agents = {
        'DQN': DQNAgent(state_size, action_size, lr=0.001),
        'Double DQN': DoubleDQNAgent(state_size, action_size, lr=0.001),
        'Dueling DQN': DuelingDQNAgent(state_size, action_size, lr=0.001)
    }

    results = {name: [] for name in agents.keys()}

    # Train each agent
    for name, agent in agents.items():
        print(f"\nTraining {name}...")
        scores = []

        for episode in range(episodes):
            state = env.reset()
            score = 0

            for step in range(500):
                action = agent.act(state)
                next_state, reward, done, _ = env.step(action)
                agent.remember(state, action, reward, next_state, done)

                if len(agent.memory) > agent.batch_size:
                    agent.replay()

                state = next_state
                score += reward

                if done:
                    break

agent.decay_epsilon()
scores.append(score)

if episode % 100 == 0:
    mean_score = np.mean(scores[-100:]) if len(scores) >= 100 else np.mean(scores)
    print(f"Episode {episode}, Average Score: {mean_score:.2f}")

results[name] = scores
env.close()
return results
# Visualize comparison results
def plot_dqn_comparison(results):
    """Plot comparison of different DQN variants."""
    plt.figure(figsize=(15, 8))

    colors = ['blue', 'red', 'green', 'orange']
    window_size = 50

    for i, (name, scores) in enumerate(results.items()):
        moving_avg = pd.Series(scores).rolling(window_size).mean()
        plt.plot(scores, alpha=0.3, color=colors[i])
        plt.plot(moving_avg, color=colors[i], linewidth=2, label=f'{name} (MA)')

    plt.axhline(y=195, color='black', linestyle='--', label='Solved Threshold')
    plt.xlabel('Episode')
    plt.ylabel('Score')
    plt.title('DQN Variants Comparison')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

    # Performance summary
    print("\nFinal Performance Summary:")
    for name, scores in results.items():
        final_performance = np.mean(scores[-100:])
        episodes_to_solve = next((i for i, score in enumerate(pd.Series(scores).rolling(100).mean())
                                if score >= 195), len(scores))
        print(f"{name:12}: {final_performance:.1f} ± {np.std(scores[-100:]):.1f} "
              f"(solved in {episodes_to_solve} episodes)")
print("\nComparing DQN Variants...")
variant_results = compare_dqn_variants(episodes=300)
plot_dqn_comparison(variant_results)

# 12. TF-Agents Library

## Theoretical Foundation

**TF-Agents** is Google's reinforcement learning library built on TensorFlow. It provides:

### Key Components:

1. **Environments**: Standardized interfaces for RL environments
2. **Agents**: Pre-implemented RL algorithms (DQN, PPO, SAC, etc.)
3. **Policies**: Action selection strategies
4. **Replay Buffers**: Experience storage and sampling
5. **Metrics**: Training and evaluation metrics
6. **Drivers**: Training loop orchestration

### Architecture Benefits:

- **Modular Design**: Mix and match components
- **TensorFlow Integration**: GPU acceleration and distributed training
- **Production Ready**: Scalable and robust implementations
- **Research Friendly**: Easy to extend and experiment

### Typical Workflow:

```python
# 1. Create environment
env = suite_gym.load('CartPole-v1')

# 2. Create agent
agent = dqn_agent.DqnAgent(
    env.time_step_spec(),
    env.action_spec(),
    q_network=q_net)

# 3. Create replay buffer
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=env.batch_size,
    max_length=100000)

# 4. Create driver for data collection
collect_driver = dynamic_step_driver.DynamicStepDriver(
    env, agent.collect_policy, [replay_buffer.add_batch])

# 5. Training loop
for _ in range(num_iterations):
    collect_driver.run()
    experience = replay_buffer.gather_all()
    train_loss = agent.train(experience)
```

In [None]:
# Install TF-Agents (uncomment if needed)
# !pip install tf-agents[reverb]

In [None]:
# TF-Agents imports
import tensorflow as tf
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent
from tf_agents.utils import common
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.policies import random_tf_policy
from tf_agents.drivers import dynamic_step_driver
from tf_agents.metrics import tf_metrics
from tf_agents.eval import metric_utils
# TF-Agents DQN Implementation
class TFAgentsDQN:
    def __init__(self, env_name='CartPole-v1'):
        # Create environments
        self.train_py_env = suite_gym.load(env_name)
        self.eval_py_env = suite_gym.load(env_name)

        self.train_env = tf_py_environment.TFPyEnvironment(self.train_py_env)
        self.eval_env = tf_py_environment.TFPyEnvironment(self.eval_py_env)

        # Hyperparameters
        self.num_iterations = 20000
        self.initial_collect_steps = 100
        self.collect_steps_per_iteration = 1
        self.replay_buffer_max_length = 100000
        self.batch_size = 64
        self.learning_rate = 1e-3
        self.log_interval = 200
        self.num_eval_episodes = 10
        self.eval_interval = 1000

        self._setup_agent()
        self._setup_replay_buffer()
        self._setup_drivers()

    def _setup_agent(self):
        """Setup the DQN agent."""
        # Q-Network
        fc_layer_params = (100, 50)

        self.q_net = q_network.QNetwork(
            self.train_env.observation_spec(),
            self.train_env.action_spec(),
            fc_layer_params=fc_layer_params)

        # Optimizer
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

        # DQN Agent
        self.agent = dqn_agent.DqnAgent(
            self.train_env.time_step_spec(),
            self.train_env.action_spec(),
            q_network=self.q_net,
            optimizer=optimizer,
            td_errors_loss_fn=common.element_wise_squared_loss,
            train_step_counter=tf.Variable(0))

        self.agent.initialize()

    def _setup_replay_buffer(self):
        """Setup replay buffer."""
        self.replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
            data_spec=self.agent.collect_data_spec,
            batch_size=self.train_env.batch_size,
            max_length=self.replay_buffer_max_length)

    def _setup_drivers(self):
        """Setup data collection drivers."""
        # Random policy for initial data collection
        self.random_policy = random_tf_policy.RandomTFPolicy(
            self.train_env.time_step_spec(),
            self.train_env.action_spec())

        # Collect driver
        self.collect_driver = dynamic_step_driver.DynamicStepDriver(
            self.train_env,
            self.agent.collect_policy,
            observers=[self.replay_buffer.add_batch],
            num_steps=self.collect_steps_per_iteration)

        # Initial data collection
        self.initial_collect_driver = dynamic_step_driver.DynamicStepDriver(
            self.train_env,
            self.random_policy,
            observers=[self.replay_buffer.add_batch],
            num_steps=self.initial_collect_steps)

    def collect_initial_data(self):
        """Collect initial random data."""
        print("Collecting initial data...")
        self.initial_collect_driver.run()

    def compute_avg_return(self, num_episodes=10):
        """Compute average return over episodes."""
        total_return = 0.0
        for _ in range(num_episodes):
            time_step = self.eval_env.reset()
            episode_return = 0.0

            while not time_step.is_last():
                action_step = self.agent.policy.action(time_step)
                time_step = self.eval_env.step(action_step.action)
                episode_return += time_step.reward
            total_return += episode_return

        avg_return = total_return / num_episodes
        return avg_return.numpy()[0]

    def train(self):
        """Train the agent."""
        # Collect initial data
        self.collect_initial_data()

        # Create dataset
        dataset = self.replay_buffer.as_dataset(
            num_parallel_calls=3,
            sample_batch_size=self.batch_size,
            num_steps=2).prefetch(3)

        iterator = iter(dataset)

        # Training metrics
        returns = []
        losses = []

        # Training loop
        print("Starting training...")
        for iteration in range(self.num_iterations):
            # Collect data
            self.collect_driver.run()

            # Train
            experience, unused_info = next(iterator)
            train_loss = self.agent.train(experience).loss

            step = self.agent.train_step_counter.numpy()

            if step % self.log_interval == 0:
                losses.append(train_loss.numpy())
                print(f'Step {step}: loss = {train_loss.numpy():.3f}')

            if step % self.eval_interval == 0:
                avg_return = self.compute_avg_return(self.num_eval_episodes)
                returns.append(avg_return)
                print(f'Step {step}: Average Return = {avg_return:.2f}')

        return returns, losses
# Train TF-Agents DQN
print("Training TF-Agents DQN...")
tf_agents_dqn = TFAgentsDQN()
returns, losses = tf_agents_dqn.train()
# Plot TF-Agents results
def plot_tf_agents_results(returns, losses):
    """Plot TF-Agents training results."""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    # Returns plot
    iterations = range(0, len(returns) * 1000, 1000)
    ax1.plot(iterations, returns, 'b-', linewidth=2)
    ax1.axhline(y=195, color='red', linestyle='--', label='Solved Threshold')
    ax1.set_xlabel('Training Steps')
    ax1.set_ylabel('Average Return')
    ax1.set_title('TF-Agents DQN Performance')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Loss plot
    loss_iterations = range(0, len(losses) * 200, 200)
    ax2.plot(loss_iterations, losses, 'r-', linewidth=2)
    ax2.set_xlabel('Training Steps')
    ax2.set_ylabel('Training Loss')
    ax2.set_title('TF-Agents DQN Training Loss')
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance summary
    final_return = returns[-1] if returns else 0
    solved_step = next((i * 1000 for i, ret in enumerate(returns) if ret >= 195), None)

    print(f"\nTF-Agents DQN Results:")
    print(f"Final Average Return: {final_return:.2f}")
    if solved_step:
        print(f"Environment solved at step: {solved_step}")
    else:
        print("Environment not solved in training period")

plot_tf_agents_results(returns, losses)
# Advanced TF-Agents Features
class AdvancedTFAgentsDQN(TFAgentsDQN):
    def __init__(self, env_name='CartPole-v1', use_double_dqn=True):
        self.use_double_dqn = use_double_dqn
        super().__init__(env_name)

    def _setup_agent(self):
        """Setup advanced DQN agent with more features."""
        # Improved Q-Network with dropout
        fc_layer_params = (128, 64, 32)
        dropout_layer_params = (0.2, 0.2, 0.1)

        self.q_net = q_network.QNetwork(
            self.train_env.observation_spec(),
            self.train_env.action_spec(),
            fc_layer_params=fc_layer_params,
            dropout_layer_params=dropout_layer_params)

        # Target Q-Network for Double DQN
        if self.use_double_dqn:
            self.target_q_net = q_network.QNetwork(
                self.train_env.observation_spec(),
                self.train_env.action_spec(),
                fc_layer_params=fc_layer_params,
                dropout_layer_params=dropout_layer_params)

        # Learning rate schedule
        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_learning_rate=1e-3,
            decay_steps=1000,
            decay_rate=0.96)

        optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

        # Advanced DQN Agent
        self.agent = dqn_agent.DqnAgent(
            self.train_env.time_step_spec(),
            self.train_env.action_spec(),
            q_network=self.q_net,
            target_q_network=self.target_q_net if self.use_double_dqn else None,
            optimizer=optimizer,
            epsilon_greedy=0.1,
            target_update_tau=0.005,
            target_update_period=100,
            td_errors_loss_fn=common.element_wise_squared_loss,
            gamma=0.99,
            reward_scale_factor=1.0,
            gradient_clipping=None,
            train_step_counter=tf.Variable(0))

        self.agent.initialize()
# Compare basic vs advanced TF-Agents implementation
print("Training Advanced TF-Agents DQN...")
advanced_dqn = AdvancedTFAgentsDQN(use_double_dqn=True)
advanced_returns, advanced_losses = advanced_dqn.train()

# Plot comparison
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
iterations = range(0, len(returns) * 1000, 1000)
plt.plot(iterations, returns, 'b-', label='Basic DQN', linewidth=2)
plt.plot(iterations, advanced_returns, 'r-', label='Advanced DQN', linewidth=2)
plt.axhline(y=195, color='black', linestyle='--', label='Solved')
plt.xlabel('Training Steps')
plt.ylabel('Average Return')
plt.title('TF-Agents DQN Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
loss_iterations = range(0, len(losses) * 200, 200)
plt.plot(loss_iterations, losses, 'b-', label='Basic DQN', linewidth=2)
plt.plot(loss_iterations, advanced_losses, 'r-', label='Advanced DQN', linewidth=2)
plt.xlabel('Training Steps')
plt.ylabel('Training Loss')
plt.title('Training Loss Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nComparison Results:")
print(f"Basic DQN Final Return: {returns[-1]:.2f}")
print(f"Advanced DQN Final Return: {advanced_returns[-1]:.2f}")
# TF-Agents Benefits Summary
print("\n" + "="*60)
print("TF-AGENTS BENEFITS SUMMARY")
print("="*60)

print("✓ Production-Ready Implementation")
print("  - Optimized TensorFlow operations")
print("  - GPU acceleration support")
print("  - Distributed training capabilities")

print("\n✓ Modular Architecture")
print("  - Easy to swap components")
print("  - Extensible for research")
print("  - Clean separation of concerns")

print("\n✓ Advanced Features")
print("  - Multiple RL algorithms (DQN, PPO, SAC, etc.)")
print("  - Sophisticated replay buffers")
print("  - Built-in metrics and logging")
print("  - Policy evaluation utilities")

print("\n✓ Robust Implementation")
print("  - Handles edge cases")
print("  - Numerical stability")
print("  - Memory efficient")
print("  - Well-tested codebase")

print("\n" + "="*60)
print("EXERCISE COMPLETE!")
print("="*60)

In [None]:
# DQN Training Loop continuation
agent.decay_epsilon()
scores.append(score)

if episode % 100 == 0:
    mean_score = np.mean(scores[-100:]) if len(scores) >= 100 else np.mean(scores)
    print(f"Episode {episode}, Average Score: {mean_score:.2f}")

results[name] = scores

In [None]:
env.close()
return results

In [None]:
# Visualize comparison results
def plot_dqn_comparison(results):
    """Plot comparison of different DQN variants."""
    plt.figure(figsize=(15, 8))

    colors = ['blue', 'red', 'green', 'orange']
    window_size = 50

    for i, (name, scores) in enumerate(results.items()):
        moving_avg = pd.Series(scores).rolling(window_size).mean()
        plt.plot(scores, alpha=0.3, color=colors[i])
        plt.plot(moving_avg, color=colors[i], linewidth=2, label=f'{name} (MA)')

    plt.axhline(y=195, color='black', linestyle='--', label='Solved Threshold')
    plt.xlabel('Episode')
    plt.ylabel('Score')
    plt.title('DQN Variants Comparison')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

    # Performance summary
    print("\nFinal Performance Summary:")
    for name, scores in results.items():
        final_performance = np.mean(scores[-100:])
        episodes_to_solve = next((i for i, score in enumerate(pd.Series(scores).rolling(100).mean())
                                if score >= 195), len(scores))
        print(f"{name:12}: {final_performance:.1f} ± {np.std(scores[-100:]):.1f} "
              f"(solved in {episodes_to_solve} episodes)")

In [None]:
print("\nComparing DQN Variants...")
variant_results = compare_dqn_variants(episodes=300)
plot_dqn_comparison(variant_results)

# 12. TF-Agents Library

## Theoretical Foundation

**TF-Agents** is Google's reinforcement learning library built on TensorFlow. It provides:

### Key Components:

1. **Environments**: Standardized interfaces for RL environments
2. **Agents**: Pre-implemented RL algorithms (DQN, PPO, SAC, etc.)
3. **Policies**: Action selection strategies
4. **Replay Buffers**: Experience storage and sampling
5. **Metrics**: Training and evaluation metrics
6. **Drivers**: Training loop orchestration

### Architecture Benefits:

- **Modular Design**: Mix and match components
- **TensorFlow Integration**: GPU acceleration and distributed training
- **Production Ready**: Scalable and robust implementations
- **Research Friendly**: Easy to extend and experiment

### Typical Workflow:

```python
# 1. Create environment
env = suite_gym.load('CartPole-v1')

# 2. Create agent
agent = dqn_agent.DqnAgent(
    env.time_step_spec(),
    env.action_spec(),
    q_network=q_net)

# 3. Create replay buffer
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=env.batch_size,
    max_length=100000)

# 4. Create driver for data collection
collect_driver = dynamic_step_driver.DynamicStepDriver(
    env, agent.collect_policy, [replay_buffer.add_batch])

# 5. Training loop
for _ in range(num_iterations):
    collect_driver.run()
    experience = replay_buffer.gather_all()
    train_loss = agent.train(experience)
```

In [None]:
# Install TF-Agents (uncomment if needed)
# !pip install tf-agents[reverb]

In [None]:
# TF-Agents imports
import tensorflow as tf
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent
from tf_agents.utils import common
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.policies import random_tf_policy
from tf_agents.drivers import dynamic_step_driver
from tf_agents.metrics import tf_metrics
from tf_agents.eval import metric_utils

In [None]:
# TF-Agents DQN Implementation
class TFAgentsDQN:
    def __init__(self, env_name='CartPole-v1'):
        # Create environments
        self.train_py_env = suite_gym.load(env_name)
        self.eval_py_env = suite_gym.load(env_name)

        self.train_env = tf_py_environment.TFPyEnvironment(self.train_py_env)
        self.eval_env = tf_py_environment.TFPyEnvironment(self.eval_py_env)

        # Hyperparameters
        self.num_iterations = 20000
        self.initial_collect_steps = 100
        self.collect_steps_per_iteration = 1
        self.replay_buffer_max_length = 100000
        self.batch_size = 64
        self.learning_rate = 1e-3
        self.log_interval = 200
        self.num_eval_episodes = 10
        self.eval_interval = 1000

        self._setup_agent()
        self._setup_replay_buffer()
        self._setup_drivers()

    def _setup_agent(self):
        """Setup the DQN agent."""
        # Q-Network
        fc_layer_params = (100, 50)

        self.q_net = q_network.QNetwork(
            self.train_env.observation_spec(),
            self.train_env.action_spec(),
            fc_layer_params=fc_layer_params)

        # Optimizer
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

        # DQN Agent
        self.agent = dqn_agent.DqnAgent(
            self.train_env.time_step_spec(),
            self.train_env.action_spec(),
            q_network=self.q_net,
            optimizer=optimizer,
            td_errors_loss_fn=common.element_wise_squared_loss,
            train_step_counter=tf.Variable(0))

        self.agent.initialize()

    def _setup_replay_buffer(self):
        """Setup replay buffer."""
        self.replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
            data_spec=self.agent.collect_data_spec,
            batch_size=self.train_env.batch_size,
            max_length=self.replay_buffer_max_length)

    def _setup_drivers(self):
        """Setup data collection drivers."""
        # Random policy for initial data collection
        self.random_policy = random_tf_policy.RandomTFPolicy(
            self.train_env.time_step_spec(),
            self.train_env.action_spec())

        # Collect driver
        self.collect_driver = dynamic_step_driver.DynamicStepDriver(
            self.train_env,
            self.agent.collect_policy,
            observers=[self.replay_buffer.add_batch],
            num_steps=self.collect_steps_per_iteration)

        # Initial data collection
        self.initial_collect_driver = dynamic_step_driver.DynamicStepDriver(
            self.train_env,
            self.random_policy,
            observers=[self.replay_buffer.add_batch],
            num_steps=self.initial_collect_steps)

    def collect_initial_data(self):
        """Collect initial random data."""
        print("Collecting initial data...")
        self.initial_collect_driver.run()

    def compute_avg_return(self, num_episodes=10):
        """Compute average return over episodes."""
        total_return = 0.0
        for _ in range(num_episodes):
            time_step = self.eval_env.reset()
            episode_return = 0.0

            while not time_step.is_last():
                action_step = self.agent.policy.action(time_step)
                time_step = self.eval_env.step(action_step.action)
                episode_return += time_step.reward
            total_return += episode_return

        avg_return = total_return / num_episodes
        return avg_return.numpy()[0]

    def train(self):
        """Train the agent."""
        # Collect initial data
        self.collect_initial_data()

        # Create dataset
        dataset = self.replay_buffer.as_dataset(
            num_parallel_calls=3,
            sample_batch_size=self.batch_size,
            num_steps=2).prefetch(3)

        iterator = iter(dataset)

        # Training metrics
        returns = []
        losses = []

        # Training loop
        print("Starting training...")
        for iteration in range(self.num_iterations):
            # Collect data
            self.collect_driver.run()

            # Train
            experience, unused_info = next(iterator)
            train_loss = self.agent.train(experience).loss

            step = self.agent.train_step_counter.numpy()

            if step % self.log_interval == 0:
                losses.append(train_loss.numpy())
                print(f'Step {step}: loss = {train_loss.numpy():.3f}')

            if step % self.eval_interval == 0:
                avg_return = self.compute_avg_return(self.num_eval_episodes)
                returns.append(avg_return)
                print(f'Step {step}: Average Return = {avg_return:.2f}')

        return returns, losses

In [None]:
# Train TF-Agents DQN
print("Training TF-Agents DQN...")
tf_agents_dqn = TFAgentsDQN()
returns, losses = tf_agents_dqn.train()

In [None]:
# Plot TF-Agents results
def plot_tf_agents_results(returns, losses):
    """Plot TF-Agents training results."""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    # Returns plot
    iterations = range(0, len(returns) * 1000, 1000)
    ax1.plot(iterations, returns, 'b-', linewidth=2)
    ax1.axhline(y=195, color='red', linestyle='--', label='Solved Threshold')
    ax1.set_xlabel('Training Steps')
    ax1.set_ylabel('Average Return')
    ax1.set_title('TF-Agents DQN Performance')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Loss plot
    loss_iterations = range(0, len(losses) * 200, 200)
    ax2.plot(loss_iterations, losses, 'r-', linewidth=2)
    ax2.set_xlabel('Training Steps')
    ax2.set_ylabel('Training Loss')
    ax2.set_title('TF-Agents DQN Training Loss')
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Performance summary
    final_return = returns[-1] if returns else 0
    solved_step = next((i * 1000 for i, ret in enumerate(returns) if ret >= 195), None)

    print(f"\nTF-Agents DQN Results:")
    print(f"Final Average Return: {final_return:.2f}")
    if solved_step:
        print(f"Environment solved at step: {solved_step}")
    else:
        print("Environment not solved in training period")

plot_tf_agents_results(returns, losses)

In [None]:
# Advanced TF-Agents Features
class AdvancedTFAgentsDQN(TFAgentsDQN):
    def __init__(self, env_name='CartPole-v1', use_double_dqn=True):
        self.use_double_dqn = use_double_dqn
        super().__init__(env_name)

    def _setup_agent(self):
        """Setup advanced DQN agent with more features."""
        # Improved Q-Network with dropout
        fc_layer_params = (128, 64, 32)
        dropout_layer_params = (0.2, 0.2, 0.1)

        self.q_net = q_network.QNetwork(
            self.train_env.observation_spec(),
            self.train_env.action_spec(),
            fc_layer_params=fc_layer_params,
            dropout_layer_params=dropout_layer_params)

        # Target Q-Network for Double DQN
        if self.use_double_dqn:
            self.target_q_net = q_network.QNetwork(
                self.train_env.observation_spec(),
                self.train_env.action_spec(),
                fc_layer_params=fc_layer_params,
                dropout_layer_params=dropout_layer_params)

        # Learning rate schedule
        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_learning_rate=1e-3,
            decay_steps=1000,
            decay_rate=0.96)

        optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

        # Advanced DQN Agent
        self.agent = dqn_agent.DqnAgent(
            self.train_env.time_step_spec(),
            self.train_env.action_spec(),
            q_network=self.q_net,
            target_q_network=self.target_q_net if self.use_double_dqn else None,
            optimizer=optimizer,
            epsilon_greedy=0.1,
            target_update_tau=0.005,
            target_update_period=100,
            td_errors_loss_fn=common.element_wise_squared_loss,
            gamma=0.99,
            reward_scale_factor=1.0,
            gradient_clipping=None,
            train_step_counter=tf.Variable(0))

        self.agent.initialize()

In [None]:
# Compare basic vs advanced TF-Agents implementation
print("Training Advanced TF-Agents DQN...")
advanced_dqn = AdvancedTFAgentsDQN(use_double_dqn=True)
advanced_returns, advanced_losses = advanced_dqn.train()

# Plot comparison
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
iterations = range(0, len(returns) * 1000, 1000)
plt.plot(iterations, returns, 'b-', label='Basic DQN', linewidth=2)
plt.plot(iterations, advanced_returns, 'r-', label='Advanced DQN', linewidth=2)
plt.axhline(y=195, color='black', linestyle='--', label='Solved')
plt.xlabel('Training Steps')
plt.ylabel('Average Return')
plt.title('TF-Agents DQN Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
loss_iterations = range(0, len(losses) * 200, 200)
plt.plot(loss_iterations, losses, 'b-', label='Basic DQN', linewidth=2)
plt.plot(loss_iterations, advanced_losses, 'r-', label='Advanced DQN', linewidth=2)
plt.xlabel('Training Steps')
plt.ylabel('Training Loss')
plt.title('Training Loss Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nComparison Results:")
print(f"Basic DQN Final Return: {returns[-1]:.2f}")
print(f"Advanced DQN Final Return: {advanced_returns[-1]:.2f}")

In [None]:
# TF-Agents Benefits Summary
print("\n" + "="*60)
print("TF-AGENTS BENEFITS SUMMARY")
print("="*60)

print("✓ Production-Ready Implementation")
print("  - Optimized TensorFlow operations")
print("  - GPU acceleration support")
print("  - Distributed training capabilities")

print("\n✓ Modular Architecture")
print("  - Easy to swap components")
print("  - Extensible for research")
print("  - Clean separation of concerns")

print("\n✓ Advanced Features")
print("  - Multiple RL algorithms (DQN, PPO, SAC, etc.)")
print("  - Sophisticated replay buffers")
print("  - Built-in metrics and logging")
print("  - Policy evaluation utilities")

print("\n✓ Robust Implementation")
print("  - Handles edge cases")
print("  - Numerical stability")
print("  - Memory efficient")
print("  - Well-tested codebase")

print("\n" + "="*60)
print("EXERCISE COMPLETE!")
print("="*60)