# Proximal Policy Optimization (PPO) Implementation Tutorial

This notebook provides a step-by-step implementation of the Proximal Policy Optimization (PPO) algorithm, tested on the CartPole environment. PPO is a policy gradient method that has become one of the most popular reinforcement learning algorithms due to its simplicity and effectiveness.

## Key Features of PPO
1. **Trust Region Update**: Uses a clipped objective function to prevent too large policy updates
2. **Actor-Critic Architecture**: Combines value function estimation with policy optimization
3. **GAE**: Implements Generalized Advantage Estimation for more stable training

Let's start by importing the required libraries:

In [1]:
%matplotlib inline

import gymnasium as gym
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from utils import rl_utils

## Environment Setup

We'll be using the CartPole-v1 environment from OpenAI Gym. This is a classic control problem where the agent needs to balance a pole on a moving cart.

In [None]:
env_name = "CartPole-v0"
env = gym.make(env_name)
seed = 42  # Choose any integer
rl_utils.set_seed(env, seed)

## Environment Details

**Observation Space**: A 4-dimensional vector representing:
- Cart Position: $x \in [-4.8, 4.8]$
- Cart Velocity: $\dot{x} \in (-\infty, \infty)$
- Pole Angle: $\theta \in [-24^\circ, 24^\circ]$
- Pole Angular Velocity: $\dot{\theta} \in (-\infty, \infty)$

**Action Space**: Binary choice:
- 0: Push cart left
- 1: Push cart right

**Reward**: +1 for each timestep the pole remains upright

## Neural Network Architecture

We'll implement both the policy network (actor) and value network (critic) using simple feedforward neural networks.

In [3]:
class PolicyNet(torch.nn.Module):
    """Actor network that predicts action probabilities"""
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        return F.softmax(self.fc2(x), dim=1)

class ValueNet(torch.nn.Module):
    """Critic network that estimates state values"""
    def __init__(self, state_dim, hidden_dim):
        super(ValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

## Advantage Estimation

PPO uses Generalized Advantage Estimation (GAE) to compute advantages. This helps reduce variance while maintaining an acceptable level of bias in our policy gradient estimates.

In [4]:
def compute_advantage(gamma, lmbda, td_delta):
    """Compute advantage using GAE (Generalized Advantage Estimation)
    
    Args:
        gamma: Discount factor
        lmbda: GAE parameter
        td_delta: Temporal difference error
    """
    td_delta = td_delta.detach().numpy()
    advantage_list = []
    advantage = 0.0
    for delta in td_delta[::-1]:
        advantage = gamma * lmbda * advantage + delta
        advantage_list.append(advantage)
    advantage_list.reverse()
    return torch.tensor(advantage_list, dtype=torch.float)

## PPO Agent Implementation

Now we'll implement the main PPO agent class that combines all the components:

In [5]:
class PPO:
    def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
                 lmbda, epochs, eps, gamma, device):
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        
        self.gamma = gamma  # Discount factor
        self.lmbda = lmbda  # GAE parameter
        self.epochs = epochs  # Number of epochs to update the policy
        self.eps = eps  # Clip parameter for PPO
        self.device = device

    def take_action(self, state, test=False):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.actor(state)
        if test:  # During testing, choose the most probable action
            action = torch.argmax(probs, dim=1)
        else:  # During training, sample from the action distribution
            action_dist = torch.distributions.Categorical(probs)
            action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
        
        # Compute TD target and advantage
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        td_delta = td_target - self.critic(states)
        advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
        old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()
        
        # PPO update for multiple epochs
        for _ in range(self.epochs):
            log_probs = torch.log(self.actor(states).gather(1, actions))
            ratio = torch.exp(log_probs - old_log_probs)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage
            
            # Update actor and critic networks
            actor_loss = torch.mean(-torch.min(surr1, surr2))
            critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
            
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()

## Training Functions

Let's implement the training loop to collect experience and update the policy:

In [6]:
def train_episode(env, agent):
    """Train for one episode"""
    states, actions, rewards, next_states, dones = [], [], [], [], []
    state = env.reset()
    if isinstance(state, tuple):  # Handle new gym API
        state = state[0]
    episode_return = 0
    done = False

    while not done:
        action = agent.take_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Store transition
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        next_states.append(next_state)
        dones.append(done)

        state = next_state
        episode_return += reward

    # Update policy using collected transitions
    transition_dict = {
        'states': np.array(states),
        'actions': np.array(actions),
        'rewards': np.array(rewards),
        'next_states': np.array(next_states),
        'dones': np.array(dones)
    }
    agent.update(transition_dict)
    return episode_return

def train(env, agent, num_episodes):
    """Complete training process"""
    return_list = []
    for i in range(num_episodes):
        episode_return = train_episode(env, agent)
        return_list.append(episode_return)
        if (i + 1) % 10 == 0:
            print(f"Episode {i+1}, Return: {episode_return}")
    return return_list

## Training the Agent

Now let's set up the hyperparameters and train our PPO agent:

In [None]:
# Hyperparameters
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
hidden_dim = 128
actor_lr = 1e-3
critic_lr = 1e-2
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
num_episodes = 500
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Create and train agent
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
           lmbda, epochs, eps, gamma, device)
return_list = train(env, agent, num_episodes)

## Visualizing the Results

Let's plot the training curve using a moving average to smooth out the noise:

In [None]:
# Plot training returns
episodes_list = list(range(len(return_list)))
mv_return = rl_utils.moving_average(return_list, 9)
plt.figure(figsize=(10, 6))
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title(f'PPO Learning Curve on {env_name}')
plt.grid(True)
plt.show()

## Testing the Trained Agent

Now that we have trained our agent, let's create a function to visualize its performance. This function will:
1. Run multiple test episodes
2. Render each episode
3. Display the cumulative reward

Note: We'll use `test=True` in the `take_action` method to make the agent choose actions deterministically.

In [9]:
from IPython import display
import PIL.Image

def test_trained_policy(agent, env, num_episodes=5):
    """Test the trained policy and visualize the agent's behavior
    
    Args:
        agent: Trained PPO agent
        env: Gym environment
        num_episodes: Number of test episodes to run
    """
    for i in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]
        done = False
        episode_return = 0
        step_count = 0

        print(f"\nEpisode {i + 1}:")
        while not done:
            # Render the environment
            rgb_array = env.render()
            img = PIL.Image.fromarray(rgb_array)
            display.clear_output(wait=True)
            display.display(img)
            
            # Take deterministic action
            action = agent.take_action(state, test=True)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            state = next_state
            episode_return += reward
            step_count += 1
            
            # Print current status
            print(f"Step: {step_count}, Cumulative Reward: {episode_return}")

        print(f"\nEpisode {i + 1} finished with total return: {episode_return}")

    env.close()

Let's test our trained agent:

In [None]:
# Create a new environment instance for testing with rendering enabled
test_env = gym.make(env_name, render_mode="rgb_array")
test_trained_policy(agent, test_env)

## Analysis and Discussion

Let's analyze the key components of our PPO implementation:

1. **Policy Network (Actor)**:
   - Maps states to action probabilities
   - Uses a simple two-layer neural network
   - Outputs softmax probabilities for discrete actions

2. **Value Network (Critic)**:
   - Estimates the value function V(s)
   - Helps reduce variance in policy updates
   - Also uses a two-layer architecture

3. **PPO Clipping**:
   - Prevents too large policy updates
   - Clip parameter ε=0.2 is a common choice
   - Helps maintain stable learning

4. **Advantage Estimation**:
   - Uses GAE for better trade-off between bias and variance
   - Lambda parameter controls this trade-off

## Potential Improvements

1. **Network Architecture**:
   - Add more layers or units
   - Try different activation functions

2. **Training Process**:
   - Implement parallel environment sampling
   - Add entropy bonus for exploration
   - Try different learning rate schedules

3. **Hyperparameter Tuning**:
   - Adjust GAE parameters (γ, λ)
   - Modify network learning rates
   - Change the number of epochs and batch size

## Conclusion

We've successfully implemented and trained a PPO agent to solve the CartPole environment. The implementation includes all key components of the PPO algorithm while maintaining simplicity and readability. The training results show that our agent can learn to balance the pole effectively, demonstrating the power of PPO even with a relatively simple implementation.