# Chapter 11: Advanced Policy Optimization

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/username/ReinforcementLearning/blob/main/notebooks/chapter11_advanced_policy_optimization.ipynb)

## Introduction

This chapter explores advanced policy optimization methods that build upon the basic policy gradient algorithms. We focus on trust region methods and proximal policy optimization, which address the key challenges of policy gradient methods: sample efficiency and stability.

## References
- **Schulman et al. (2015)**: Trust Region Policy Optimization [11]
- **Schulman et al. (2017)**: Proximal Policy Optimization algorithms [12]
- **Sutton et al. (1999)**: Policy gradient methods for reinforcement learning [9]
- **Kakade (2001)**: Natural policy gradients and approximately optimal policies

## Cross-References
- **Prerequisites**: Chapter 9 (Policy Gradients), Chapter 10 (Actor-Critic)
- **Next**: Chapter 12 (Model-Based Methods)
- **Related**: Chapter 8 (Deep RL), Chapter 15 (Meta-Learning)

### Key Topics Covered:
- Trust Region Policy Optimization (TRPO)
- Proximal Policy Optimization (PPO)
- Natural Policy Gradients
- Importance Sampling and Policy Ratios
- Clipping Mechanisms and KL Divergence Constraints

## Mathematical Foundation

### Trust Region Methods

The key insight behind trust region methods is to constrain policy updates to maintain a small KL divergence between the old and new policies:

$$\max_\theta \mathbb{E}_{s \sim \rho_{\pi_{\theta_{old}}}} \left[ \mathbb{E}_{a \sim \pi_{\theta_{old}}(\cdot|s)} \left[ \frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)} A^{\pi_{\theta_{old}}}(s,a) \right] \right]$$

subject to: $\mathbb{E}_{s \sim \rho_{\pi_{\theta_{old}}}} [D_{KL}(\pi_{\theta_{old}}(\cdot|s) \| \pi_\theta(\cdot|s))] \leq \delta$

### Proximal Policy Optimization

PPO simplifies TRPO by using a clipped objective function:

$$L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t) \right]$$

where $r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}$ is the probability ratio.

In [1]:
# Import necessary libraries
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
from typing import List, Tuple, Dict, Optional

# Try to import PyTorch, fall back to NumPy implementation
try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
    import torch.nn.functional as F
    from torch.distributions import Categorical, Normal
    HAS_TORCH = True
    print("PyTorch available - using neural network implementation")
except ImportError:
    HAS_TORCH = False
    print("PyTorch not available - using NumPy implementation")

# Gym installation check
try:
    import gym
    HAS_GYM = True
except ImportError:
    HAS_GYM = False
    print("Gym not available - using custom environment")

# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)
if HAS_TORCH:
    torch.manual_seed(42)

PyTorch available - using neural network implementation


## Environment Setup

We'll use a simple continuous control environment to demonstrate advanced policy optimization methods.

In [2]:
class ContinuousCartPole:
    """Continuous version of CartPole for testing continuous control algorithms."""

    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = self.masspole + self.masscart
        self.length = 0.5
        self.polemass_length = self.masspole * self.length
        self.force_mag = 10.0
        self.tau = 0.02

        # Thresholds
        self.theta_threshold_radians = 12 * 2 * np.pi / 360
        self.x_threshold = 2.4

        self.action_space_high = np.array([1.0])
        self.action_space_low = np.array([-1.0])
        self.observation_space_high = np.array([4.8, np.inf, 0.4, np.inf])
        self.observation_space_low = -self.observation_space_high

        self.state = None
        self.steps = 0
        self.max_steps = 500

    def reset(self):
        self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps = 0
        return self.state.copy()

    def step(self, action):
        action = np.clip(action, self.action_space_low, self.action_space_high)[0]
        force = action * self.force_mag

        x, x_dot, theta, theta_dot = self.state

        costheta = np.cos(theta)
        sintheta = np.sin(theta)

        temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass)
        )
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        x = x + self.tau * x_dot
        x_dot = x_dot + self.tau * xacc
        theta = theta + self.tau * theta_dot
        theta_dot = theta_dot + self.tau * thetaacc

        self.state = np.array([x, x_dot, theta, theta_dot])
        self.steps += 1

        done = (
            x < -self.x_threshold or x > self.x_threshold or
            theta < -self.theta_threshold_radians or theta > self.theta_threshold_radians or
            self.steps >= self.max_steps
        )

        reward = 1.0 if not done else 0.0

        return self.state.copy(), reward, done, {}

# Test the environment
env = ContinuousCartPole()
state = env.reset()
print(f"Initial state: {state}")
print(f"State space: {len(state)} dimensions")
print(f"Action space: continuous [-1, 1]")

Initial state: [-0.01254599  0.04507143  0.02319939  0.00986585]
State space: 4 dimensions
Action space: continuous [-1, 1]


## Trust Region Policy Optimization (TRPO)

TRPO ensures that policy updates don't deviate too much from the current policy by constraining the KL divergence.

In [3]:
if HAS_TORCH:
    class PolicyNetwork(nn.Module):
        def __init__(self, state_dim, action_dim, hidden_dim=64):
            super(PolicyNetwork, self).__init__()
            self.fc1 = nn.Linear(state_dim, hidden_dim)
            self.fc2 = nn.Linear(hidden_dim, hidden_dim)
            self.mean = nn.Linear(hidden_dim, action_dim)
            self.log_std = nn.Parameter(torch.zeros(action_dim))

        def forward(self, state):
            x = torch.relu(self.fc1(state))
            x = torch.relu(self.fc2(x))
            mean = self.mean(x)
            std = torch.exp(self.log_std)
            return Normal(mean, std)

    class ValueNetwork(nn.Module):
        def __init__(self, state_dim, hidden_dim=64):
            super(ValueNetwork, self).__init__()
            self.fc1 = nn.Linear(state_dim, hidden_dim)
            self.fc2 = nn.Linear(hidden_dim, hidden_dim)
            self.value = nn.Linear(hidden_dim, 1)

        def forward(self, state):
            x = torch.relu(self.fc1(state))
            x = torch.relu(self.fc2(x))
            return self.value(x)

class TRPO:
    def __init__(self, state_dim, action_dim, lr_v=1e-3, gamma=0.99,
                 max_kl=0.01, damping=0.1, max_backtracks=10):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.max_kl = max_kl
        self.damping = damping
        self.max_backtracks = max_backtracks

        if HAS_TORCH:
            self.policy = PolicyNetwork(state_dim, action_dim)
            self.value_net = ValueNetwork(state_dim)
            self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr_v)
        else:
            # NumPy implementation with simplified policy
            self.policy_params = np.random.randn(state_dim + 1, action_dim) * 0.1
            self.value_params = np.random.randn(state_dim + 1) * 0.1

    def get_action(self, state):
        if HAS_TORCH:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            dist = self.policy(state_tensor)
            action = dist.sample()
            log_prob = dist.log_prob(action)
            return action.numpy()[0], log_prob.detach().numpy()[0]
        else:
            # Simple linear policy with noise
            state_aug = np.append(state, 1.0)  # Add bias
            mean = np.dot(state_aug, self.policy_params)
            action = np.random.normal(mean, 0.5)
            log_prob = -0.5 * ((action - mean) ** 2 / 0.25 + np.log(2 * np.pi * 0.25))
            return action, log_prob

    def compute_advantages(self, rewards, values, dones):
        advantages = []
        returns = []

        R = 0
        for i in reversed(range(len(rewards))):
            R = rewards[i] + self.gamma * R * (1 - dones[i])
            returns.insert(0, R)

        returns = np.array(returns)
        advantages = returns - values

        # Normalize advantages
        advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8)

        return advantages, returns

    def update(self, states, actions, rewards, log_probs, dones):
        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        log_probs = np.array(log_probs)
        dones = np.array(dones)

        if HAS_TORCH:
            # Convert to tensors
            states_tensor = torch.FloatTensor(states)
            actions_tensor = torch.FloatTensor(actions)
            old_log_probs = torch.FloatTensor(log_probs)

            # Compute values and advantages
            values = self.value_net(states_tensor).squeeze().detach().numpy()
            advantages, returns = self.compute_advantages(rewards, values, dones)

            advantages_tensor = torch.FloatTensor(advantages)
            returns_tensor = torch.FloatTensor(returns)

            # Update value network
            for _ in range(5):
                value_loss = F.mse_loss(self.value_net(states_tensor).squeeze(), returns_tensor)
                self.value_optimizer.zero_grad()
                value_loss.backward()
                self.value_optimizer.step()

            # TRPO update (simplified)
            dist = self.policy(states_tensor)
            new_log_probs = dist.log_prob(actions_tensor)
            ratio = torch.exp(new_log_probs - old_log_probs)

            surrogate_loss = -(ratio * advantages_tensor).mean()

            # Simplified natural gradient step
            policy_params = list(self.policy.parameters())
            grads = torch.autograd.grad(surrogate_loss, policy_params, create_graph=True)

            # Apply update with line search (simplified)
            with torch.no_grad():
                for param, grad in zip(policy_params, grads):
                    param -= 0.001 * grad  # Small step size

        else:
            # Simple NumPy update
            state_aug = np.column_stack([states, np.ones(len(states))])
            values = np.dot(state_aug, self.value_params)
            advantages, returns = self.compute_advantages(rewards, values, dones)

            # Update value function
            value_grad = np.dot(state_aug.T, returns - values) / len(states)
            self.value_params += 0.01 * value_grad

            # Simple policy update
            policy_grad = np.zeros_like(self.policy_params)
            for i, (s, a, adv) in enumerate(zip(states, actions, advantages)):
                s_aug = np.append(s, 1.0)
                policy_grad += np.outer(s_aug, adv * (a - np.dot(s_aug, self.policy_params)))

            self.policy_params += 0.001 * policy_grad / len(states)

print("TRPO implementation ready")

TRPO implementation ready


## Proximal Policy Optimization (PPO)

PPO is a simpler alternative to TRPO that uses clipping to prevent large policy updates.

In [4]:
class PPO:
    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 eps_clip=0.2, epochs=10, mini_batch_size=64):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.epochs = epochs
        self.mini_batch_size = mini_batch_size

        if HAS_TORCH:
            self.policy = PolicyNetwork(state_dim, action_dim)
            self.value_net = ValueNetwork(state_dim)
            self.optimizer = optim.Adam(
                list(self.policy.parameters()) + list(self.value_net.parameters()),
                lr=lr
            )
        else:
            # NumPy implementation
            self.policy_params = np.random.randn(state_dim + 1, action_dim) * 0.1
            self.value_params = np.random.randn(state_dim + 1) * 0.1

    def get_action(self, state):
        if HAS_TORCH:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            with torch.no_grad():
                dist = self.policy(state_tensor)
                action = dist.sample()
                log_prob = dist.log_prob(action)
                value = self.value_net(state_tensor)
            return action.numpy()[0], log_prob.numpy()[0], value.numpy()[0]
        else:
            # Simple linear policy
            state_aug = np.append(state, 1.0)
            mean = np.dot(state_aug, self.policy_params)
            action = np.random.normal(mean, 0.5)
            log_prob = -0.5 * ((action - mean) ** 2 / 0.25 + np.log(2 * np.pi * 0.25))
            value = np.dot(state_aug, self.value_params)
            return action, log_prob, value

    def compute_gae(self, rewards, values, dones, next_value, lam=0.95):
        advantages = []
        gae = 0

        values = values + [next_value]

        for i in reversed(range(len(rewards))):
            delta = rewards[i] + self.gamma * values[i + 1] * (1 - dones[i]) - values[i]
            gae = delta + self.gamma * lam * (1 - dones[i]) * gae
            advantages.insert(0, gae)

        returns = [adv + val for adv, val in zip(advantages, values[:-1])]

        return advantages, returns

    def update(self, states, actions, rewards, log_probs, values, dones):
        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        old_log_probs = np.array(log_probs)
        old_values = np.array(values)
        dones = np.array(dones)

        # Get next value for GAE calculation
        if HAS_TORCH:
            with torch.no_grad():
                next_value = self.value_net(torch.FloatTensor(states[-1:]))[0].item()
        else:
            next_value = np.dot(np.append(states[-1], 1.0), self.value_params)

        advantages, returns = self.compute_gae(rewards.tolist(), old_values.tolist(),
                                               dones.tolist(), next_value)

        advantages = np.array(advantages)
        returns = np.array(returns)

        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        if HAS_TORCH:
            # Convert to tensors
            states_tensor = torch.FloatTensor(states)
            actions_tensor = torch.FloatTensor(actions)
            old_log_probs_tensor = torch.FloatTensor(old_log_probs)
            advantages_tensor = torch.FloatTensor(advantages)
            returns_tensor = torch.FloatTensor(returns)

            # PPO update
            for _ in range(self.epochs):
                # Get current policy and value outputs
                dist = self.policy(states_tensor)
                new_log_probs = dist.log_prob(actions_tensor)
                entropy = dist.entropy().mean()
                values_pred = self.value_net(states_tensor).squeeze()

                # Compute ratio and clipped objective
                ratio = torch.exp(new_log_probs - old_log_probs_tensor)
                surr1 = ratio * advantages_tensor
                surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantages_tensor

                # PPO loss
                policy_loss = -torch.min(surr1, surr2).mean()
                value_loss = F.mse_loss(values_pred, returns_tensor)
                entropy_loss = -0.01 * entropy

                total_loss = policy_loss + 0.5 * value_loss + entropy_loss

                self.optimizer.zero_grad()
                total_loss.backward()
                torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
                self.optimizer.step()

        else:
            # Simple NumPy PPO update
            for _ in range(min(self.epochs, 3)):  # Fewer epochs for NumPy
                # Update value function
                state_aug = np.column_stack([states, np.ones(len(states))])
                values_pred = np.dot(state_aug, self.value_params)
                value_grad = np.dot(state_aug.T, returns - values_pred) / len(states)
                self.value_params += 0.01 * value_grad

                # Policy update with clipping
                policy_grad = np.zeros_like(self.policy_params)
                for i, (s, a, adv, old_lp) in enumerate(zip(states, actions, advantages, old_log_probs)):
                    s_aug = np.append(s, 1.0)
                    mean = np.dot(s_aug, self.policy_params)
                    new_lp = -0.5 * ((a - mean) ** 2 / 0.25 + np.log(2 * np.pi * 0.25))

                    ratio = np.exp(new_lp - old_lp)
                    clipped_ratio = np.clip(ratio, 1 - self.eps_clip, 1 + self.eps_clip)

                    objective = min(ratio * adv, clipped_ratio * adv)
                    policy_grad += np.outer(s_aug, objective * (a - mean) / 0.25)

                self.policy_params += 0.001 * policy_grad / len(states)

print("PPO implementation ready")

PPO implementation ready


## Training and Comparison

Let's train both TRPO and PPO on our continuous CartPole environment and compare their performance.

In [6]:
def train_agent(agent, env, episodes=1000, max_steps=500):
    """Train an agent and return training statistics."""
    episode_rewards = []
    moving_avg_rewards = []

    for episode in range(episodes):
        states, actions, rewards, log_probs, values, dones = [], [], [], [], [], []

        state = env.reset()
        episode_reward = 0

        for step in range(max_steps):
            if hasattr(agent, 'get_action'):
                if isinstance(agent, PPO):
                    action, log_prob, value = agent.get_action(state)
                    values.append(value[0])
                else:  # TRPO
                    action, log_prob = agent.get_action(state)
                    values.append(0)  # Placeholder

            states.append(state)
            actions.append(action)
            log_probs.append(log_prob)

            next_state, reward, done, _ = env.step(action)

            rewards.append(reward)
            dones.append(done)
            episode_reward += reward

            state = next_state

            if done:
                break

        # Update agent
        if isinstance(agent, PPO):
            agent.update(states, actions, rewards, log_probs, values, dones)
        else:  # TRPO
            agent.update(states, actions, rewards, log_probs, dones)

        episode_rewards.append(episode_reward)

        # Compute moving average
        if len(episode_rewards) >= 100:
            moving_avg = np.mean(episode_rewards[-100:])
        else:
            moving_avg = np.mean(episode_rewards)
        moving_avg_rewards.append(moving_avg)

        if episode % 100 == 0:
            print(f"Episode {episode}, Average Reward: {moving_avg:.2f}")

    return episode_rewards, moving_avg_rewards

# Create environment
env = ContinuousCartPole()
state_dim = 4
action_dim = 1

print("Training PPO agent...")
ppo_agent = PPO(state_dim, action_dim)
ppo_rewards, ppo_moving_avg = train_agent(ppo_agent, env, episodes=500)

print("\nTraining TRPO agent...")
trpo_agent = TRPO(state_dim, action_dim)
trpo_rewards, trpo_moving_avg = train_agent(trpo_agent, env, episodes=500)

Training PPO agent...
Episode 0, Average Reward: 20.00
Episode 100, Average Reward: 23.81
Episode 200, Average Reward: 21.26
Episode 300, Average Reward: 22.84
Episode 400, Average Reward: 22.11

Training TRPO agent...
Episode 0, Average Reward: 19.00
Episode 100, Average Reward: 24.18
Episode 200, Average Reward: 22.62
Episode 300, Average Reward: 24.19
Episode 400, Average Reward: 22.05


## Performance Analysis and Visualization

In [None]:
# Plot training curves
plt.figure(figsize=(15, 10))

# Episode rewards
plt.subplot(2, 2, 1)
plt.plot(ppo_rewards, alpha=0.3, color='blue', label='PPO Episodes')
plt.plot(ppo_moving_avg, color='blue', linewidth=2, label='PPO Moving Average')
plt.plot(trpo_rewards, alpha=0.3, color='red', label='TRPO Episodes')
plt.plot(trpo_moving_avg, color='red', linewidth=2, label='TRPO Moving Average')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Training Performance: PPO vs TRPO')
plt.legend()
plt.grid(True, alpha=0.3)

# Moving averages comparison
plt.subplot(2, 2, 2)
plt.plot(ppo_moving_avg, color='blue', linewidth=2, label='PPO')
plt.plot(trpo_moving_avg, color='red', linewidth=2, label='TRPO')
plt.xlabel('Episode')
plt.ylabel('Moving Average Reward')
plt.title('Learning Curves Comparison')
plt.legend()
plt.grid(True, alpha=0.3)

# Final performance histogram
plt.subplot(2, 2, 3)
final_ppo = ppo_rewards[-100:] if len(ppo_rewards) >= 100 else ppo_rewards
final_trpo = trpo_rewards[-100:] if len(trpo_rewards) >= 100 else trpo_rewards
plt.hist(final_ppo, alpha=0.7, bins=20, label=f'PPO (μ={np.mean(final_ppo):.1f})', color='blue')
plt.hist(final_trpo, alpha=0.7, bins=20, label=f'TRPO (μ={np.mean(final_trpo):.1f})', color='red')
plt.xlabel('Episode Reward')
plt.ylabel('Frequency')
plt.title('Final Performance Distribution (Last 100 Episodes)')
plt.legend()
plt.grid(True, alpha=0.3)

# Convergence analysis
plt.subplot(2, 2, 4)
window_size = 50
ppo_std = [np.std(ppo_rewards[max(0, i-window_size):i+1]) for i in range(len(ppo_rewards))]
trpo_std = [np.std(trpo_rewards[max(0, i-window_size):i+1]) for i in range(len(trpo_rewards))]
plt.plot(ppo_std, color='blue', label='PPO Std Dev')
plt.plot(trpo_std, color='red', label='TRPO Std Dev')
plt.xlabel('Episode')
plt.ylabel('Reward Standard Deviation')
plt.title('Training Stability (Rolling Std Dev)')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print summary statistics
print("\n=== Performance Summary ===")
print(f"PPO Final Average: {np.mean(final_ppo):.2f} ± {np.std(final_ppo):.2f}")
print(f"TRPO Final Average: {np.mean(final_trpo):.2f} ± {np.std(final_trpo):.2f}")
print(f"PPO Max Reward: {max(ppo_rewards):.2f}")
print(f"TRPO Max Reward: {max(trpo_rewards):.2f}")

## Advanced Policy Analysis

Let's analyze the learned policies and understand how they differ.

In [None]:
def analyze_policy_behavior(agent, env, num_episodes=10):
    """Analyze the behavior of a trained policy."""
    trajectories = []
    rewards = []

    for ep in range(num_episodes):
        trajectory = []
        state = env.reset()
        episode_reward = 0

        for step in range(500):
            if isinstance(agent, PPO):
                action, _, _ = agent.get_action(state)
            else:
                action, _ = agent.get_action(state)

            trajectory.append({
                'state': state.copy(),
                'action': action,
                'step': step
            })

            state, reward, done, _ = env.step(action)
            episode_reward += reward

            if done:
                break

        trajectories.append(trajectory)
        rewards.append(episode_reward)

    return trajectories, rewards

# Analyze both agents
print("Analyzing PPO policy...")
ppo_trajectories, ppo_eval_rewards = analyze_policy_behavior(ppo_agent, env)

print("Analyzing TRPO policy...")
trpo_trajectories, trpo_eval_rewards = analyze_policy_behavior(trpo_agent, env)

# Visualize policy behavior
plt.figure(figsize=(15, 10))

# State trajectories
for i, (ppo_traj, trpo_traj) in enumerate(zip(ppo_trajectories[:3], trpo_trajectories[:3])):
    # Cart position
    plt.subplot(3, 4, i*4 + 1)
    ppo_positions = [step['state'][0] for step in ppo_traj]
    trpo_positions = [step['state'][0] for step in trpo_traj]
    plt.plot(ppo_positions, 'b-', label='PPO' if i == 0 else '')
    plt.plot(trpo_positions, 'r-', label='TRPO' if i == 0 else '')
    plt.ylabel('Cart Position')
    plt.title(f'Episode {i+1}')
    if i == 0:
        plt.legend()
    plt.grid(True, alpha=0.3)

    # Pole angle
    plt.subplot(3, 4, i*4 + 2)
    ppo_angles = [step['state'][2] for step in ppo_traj]
    trpo_angles = [step['state'][2] for step in trpo_traj]
    plt.plot(ppo_angles, 'b-')
    plt.plot(trpo_angles, 'r-')
    plt.ylabel('Pole Angle')
    plt.grid(True, alpha=0.3)

    # Actions
    plt.subplot(3, 4, i*4 + 3)
    ppo_actions = [step['action'][0] if hasattr(step['action'], '__iter__') else step['action'] for step in ppo_traj]
    trpo_actions = [step['action'][0] if hasattr(step['action'], '__iter__') else step['action'] for step in trpo_traj]
    plt.plot(ppo_actions, 'b-')
    plt.plot(trpo_actions, 'r-')
    plt.ylabel('Action')
    plt.grid(True, alpha=0.3)

    # Phase plot (position vs angle)
    plt.subplot(3, 4, i*4 + 4)
    plt.plot(ppo_positions, ppo_angles, 'b-', alpha=0.7)
    plt.plot(trpo_positions, trpo_angles, 'r-', alpha=0.7)
    plt.xlabel('Cart Position')
    plt.ylabel('Pole Angle')
    plt.title('Phase Plot')
    plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Performance comparison
print("\n=== Evaluation Results ===")
print(f"PPO Evaluation Reward: {np.mean(ppo_eval_rewards):.2f} ± {np.std(ppo_eval_rewards):.2f}")
print(f"TRPO Evaluation Reward: {np.mean(trpo_eval_rewards):.2f} ± {np.std(trpo_eval_rewards):.2f}")
print(f"PPO Episode Lengths: {[len(traj) for traj in ppo_trajectories]}")
print(f"TRPO Episode Lengths: {[len(traj) for traj in trpo_trajectories]}")

## Key Insights and Theoretical Analysis

In [None]:
# Compute policy divergence analysis
def compute_policy_statistics(agent, states_sample):
    """Compute statistics about the policy."""
    actions = []
    entropies = []

    for state in states_sample:
        if isinstance(agent, PPO):
            action, _, _ = agent.get_action(state)
        else:
            action, _ = agent.get_action(state)

        actions.append(action)

        # Estimate entropy (simplified)
        if HAS_TORCH:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                if hasattr(agent, 'policy'):
                    dist = agent.policy(state_tensor)
                    entropy = dist.entropy().item()
                    entropies.append(entropy)

    return {
        'action_mean': np.mean(actions),
        'action_std': np.std(actions),
        'entropy_mean': np.mean(entropies) if entropies else 0
    }

# Sample states from different regions
np.random.seed(42)
sample_states = []
for _ in range(100):
    state = np.random.uniform(-1, 1, 4)
    sample_states.append(state)

ppo_stats = compute_policy_statistics(ppo_agent, sample_states)
trpo_stats = compute_policy_statistics(trpo_agent, sample_states)

print("\n=== Policy Statistics ===")
print("PPO Policy:")
for key, value in ppo_stats.items():
    print(f"  {key}: {value:.4f}")

print("\nTRPO Policy:")
for key, value in trpo_stats.items():
    print(f"  {key}: {value:.4f}")

# Theoretical insights
print("\n=== Theoretical Insights ===")
print("""
1. **PPO vs TRPO Trade-offs**:
   - PPO uses clipping for simplicity vs TRPO's KL constraint
   - PPO is computationally more efficient
   - TRPO provides stronger theoretical guarantees

2. **Policy Optimization Challenges**:
   - Large policy updates can cause performance collapse
   - Both methods address the exploration-exploitation trade-off
   - Trust regions prevent destructive policy changes

3. **Practical Considerations**:
   - PPO is more widely adopted due to implementation simplicity
   - Hyperparameter sensitivity varies between methods
   - Sample efficiency depends on environment characteristics
""")

## Summary and Educational Insights

### Key Takeaways from Advanced Policy Optimization:

1. **Trust Region Methods**: TRPO constrains policy updates using KL divergence to ensure stable learning, preventing the policy from changing too dramatically in a single update [11].

2. **Proximal Policy Optimization**: PPO achieves similar stability to TRPO through a simpler clipping mechanism, making it more practical to implement and tune [12].

3. **Importance Sampling**: Both methods use importance sampling to reuse data from the old policy, improving sample efficiency compared to vanilla policy gradients [9].

4. **Practical Impact**: These methods have become the foundation for many state-of-the-art RL applications, from robotics to game playing.

### Mathematical Foundations:

- **Policy Gradient Theorem**: $\nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}[\nabla_\theta \log \pi_\theta(a|s) A^\pi(s,a)]$ [9]
- **Trust Region Constraint**: $\mathbb{E}[D_{KL}(\pi_{old} \| \pi_{new})] \leq \delta$ [11]
- **PPO Clipping**: $\min(r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t)$ [12]

### Engineering Insights:

- **Generalized Advantage Estimation (GAE)** reduces variance in advantage estimates
- **Mini-batch updates** improve computational efficiency
- **Entropy regularization** maintains exploration during training
- **Gradient clipping** prevents destructive updates

These advanced policy optimization methods represent a significant evolution in reinforcement learning, providing the stability and efficiency needed for complex real-world applications while maintaining theoretical soundness.

### References:
- [9] Sutton, R. S., et al. (1999). Policy gradient methods for reinforcement learning with function approximation
- [11] Schulman, J., et al. (2015). Trust region policy optimization. *International conference on machine learning*
- [12] Schulman, J., et al. (2017). Proximal policy optimization algorithms. *arXiv preprint arXiv:1707.06347*

### Cross-References:
- **Previous**: [Chapter 10: Actor-Critic Methods](chapter10_actor_critic.ipynb)
- **Next**: [Chapter 12: Model-Based Methods](chapter12_model_based_methods.ipynb)
- **Related**: [Chapter 9: Policy Gradients](chapter09_policy_gradients.ipynb)

### Next Steps:
- Implement PPO on more complex environments
- Explore natural policy gradients and advanced trust region methods
- Study recent improvements like PPO2 and implementation tricks

---
*This notebook is part of the Reinforcement Learning for Engineer-Mathematicians textbook. For complete bibliography, see [bibliography.md](bibliography.md)*