# Summary on the "[Proximal Policy Optimization Algorithms](https://arxiv.org/pdf/1707.06347v2)" article by John Schulman, Filip Wolski, Prafulla Dhariwal, Alec Radford, Oleg Klimov



## Introduction to the article

In this article, the authors introduce a new group of reinforcement learning (RL) methods called Proximal Policy Optimization (PPO). 
The key idea behind PPO is to improve how RL agents learn from their environment. It works by:

- Interacting with the Environment: The agent collects data by taking actions and observing the results, just like in traditional RL methods.
- Optimizing with Multiple Updates: Unlike standard methods that update the agent’s policy after each new data sample, PPO allows for multiple updates using the same data, making learning more efficient.

PPO is inspired by another algorithm called Trust Region Policy Optimization (TRPO). While TRPO is effective, it’s complex to implement. PPO offers similar benefits but is simpler, more flexible, and uses data more efficiently.

**Goals of the Article**
1. Introduce PPO: Explain how PPO works and what makes it different from other RL algorithms.
2. Demonstrate PPO’s Efficiency: Show that PPO can learn faster and perform better with less data (better sample efficiency).
3. Test Across Tasks: Evaluate PPO on different benchmark environments, such as robotic simulations and Atari games, to prove its versatility.
4. Compare with Other Methods: Show that PPO outperforms other common RL algorithms in terms of performance, simplicity, and training time.

In short, the article aims to present PPO as an effective, easy-to-use algorithm that balances strong performance with efficient learning. 

## What is PPO and how it works

PPO (Proximal Policy Optimization) is based on policy gradient methods, which are a way for an **agent** (like a robot, autonomous vehicle, or game-playing AI) to learn how to make better decisions over time. The goal of the method is to improve the strategy of the agent, with the strategy being the way the agent chooses an action. This strategy is called the policy.

By interacting with the environment, the agent obtains the results of its actions, called rewards, along with information on how the environment changes. After gathering the data—state, action, reward, and next state—the policy is then updated using gradient ascent methods. The formula used is:
$$\hat{g} = \hat{E}_t \left[ \nabla_{\theta} \log \pi_{\theta}(a_t | s_t) \hat{A}_t \right]$$

where $\pi_{\theta}(a_t | s_t)$ is the **policy** - telling the agent the probability of taking an action $a_t$ when it is in state $s_t$, 

$\hat{A}_t$ is the advantage function, which compares how good an action is compared to the average action. If an action leads to a better outcome than expected, its advantage is high,

$\nabla_{\theta}$ represents the gradient, calculating how much to change the policy to improve performance,

$\hat{E}_t$ is the empirical expectation, meaning the average outcome or expectation based on the data gathered.

The loss of the objective function is calculated by differentiating the objective. However, performing multiple updates of the policy based on the loss from a single step is not recommended because it can lead to large policy updates, which can destabilize the training process.

The article then explains the TRPO whis is a reinforcement learning algorithm designed to improve an agent’s policy while keeping updates stable and safe. The problem with regular policy gradient methods is that large updates to the policy can make learning unstable. TRPO solves this by restricting how much the policy can change in one update. It tries to maximize a special function called the **surrogate objective**, which estimates how good a new policy is compared to the old one. The goal is to improve the policy without making huge changes. This is done using the formula:
$$\begin{align}
\max_{\theta} \; & \hat{E}_t \left[ \frac{\pi_{\theta}(a_t | s_t)}{\pi_{\theta_{\text{old}}}(a_t | s_t)} \hat{A}_t \right] \tag{3} \\
\text{subject to} \; & \hat{E}_t \left[ KL\left( \pi_{\theta_{\text{old}}}(\cdot | s_t), \pi_{\theta}(\cdot | s_t) \right) \right] \leq \delta \tag{4}
\end{align}
$$

where $\delta$ is small value that controls how much the policy is allowed to change. This is the **constrain**. It limit how much the new policy can differ from the old policy. This ensures that updates are small and stable, and

KL: The Kullback-Leibler (KL) divergence, which measures the difference between the old and new policies, measuring the probability distributions for all actions rather than only one. The other option is to use **penalty**:
$$
\max_{\theta} \; \hat{E}_t \left[ \frac{\pi_{\theta}(a_t | s_t)}{\pi_{\theta_{\text{old}}}(a_t | s_t)} \hat{A}_t - \beta \, KL\left( \pi_{\theta_{\text{old}}}(\cdot | s_t), \pi_{\theta}(\cdot | s_t) \right) \right]
$$

where $\beta$ is penalty 

Since no official code is submitted in support of the paper, below is my implementation of PPO algorithm employed on the continuous environment of MountainCarContinuous-v0. The environment is chosen as it is considered one of the simplest to begin and employ PPO algorithm in a continuous environment. The link to the descriotpion and goals of the environment can be found [here](https://gymnasium.farama.org/environments/classic_control/mountain_car_continuous/).

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

import gymnasium as gym
from gymnasium.wrappers import RecordEpisodeStatistics

from collections import deque

import numpy as np

2025-02-06 08:23:05.980820: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1738830186.006518 2413834 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1738830186.015325 2413834 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-06 08:23:06.041608: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
env = gym.make('MountainCarContinuous-v0')

The observation is a ndarray with shape (2,) where the elements correspond to the position of the car along the x-axis and the velocity of the car.

In [3]:
env.observation_space

Box([-1.2  -0.07], [0.6  0.07], (2,), float32)

The action is a ndarray with shape (1,), representing the directional force applied on the car. The action is clipped in the range [-1,1] and multiplied by a power of 0.0015.

In [4]:
env.action_space

Box(-1.0, 1.0, (1,), float32)

First, lets establish the policy or the neural network. It will take as input the observation space dimension and as output - mu (mean) and sigma (standart) of Normal Gaussian distribution. Then will be defining the layers. Will be one hidden layer of size 64 with ReLU activation. The value function will be handled by the 'critic' part of the agent.

In [30]:
# Detect GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

class ActorCritic(nn.Module):
    def __init__(self, obs_space_dim: int, action_space_dim: int):
        super().__init__()
        self.obs_space_dim = obs_space_dim
        self.action_space_dim = action_space_dim

        self.input_layer = nn.Linear(obs_space_dim, 64)
        self.hidden_layer = nn.Linear(64, 64)

        # Actor, providing the actions
        self.mu_layer = nn.Linear(64, action_space_dim)
        self.sigma_layer = nn.Linear(64, action_space_dim)

        # Critic, providing the value for that actions
        self.value_layer = nn.Linear(64, 1)  # outputs one scalar value

        # to device
        self.to(device)

    def forward(self, x):
        x = F.relu(self.input_layer(x))
        x = F.relu(self.hidden_layer(x))

        # activate the output layers for the Actor
        mu = torch.tanh(self.mu_layer(x)) # activated and bounded in range [-1, 1] in order to sample the action according to the observation space limits
        # sigma = torch.clamp(F.softplus(self.sigma_layer(x)), 1e-03, 1.0) # this will ensure always positive sigma and clamped till 1.0
        sigma = F.softplus(self.sigma_layer(x)) + 1e-05
        # no activation applied for the Critic
        value = self.value_layer(x)

        return mu, sigma, value    

    def act(self, state):
        # check if state is tensor, if not convert
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32)

        # predict mu, sigma and critic value of the current policy
        mu, sigma, value = self.forward(state)

        # generate normal distribuiton with mu and sigma
        dist = Normal(mu, sigma)

        # sample the distribution and get the log_probability
        action = torch.tanh(dist.sample())
        log_prob = dist.log_prob(action)

        return action, log_prob, value

cpu


The trajectory data must be stored. Trajectory data is the data, generated each time the agent takes an action into the environment. There are various ways of doing that.

In [6]:
class Memory:
    def __init__(self):
        self.states: list = []
        self.actions: list = []
        self.log_probs: list = []
        self.rewards: list = []
        self.dones: list = []
        self.critic_values: list = []

    def clear(self):
        self.states.clear()
        self.actions.clear()
        self.log_probs.clear()
        self.rewards.clear()
        self.dones.clear()
        self.critic_values.clear()

In [7]:
def compute_gae(next_value: list[int], rewards, dones: list[tuple[bool, bool]], values: list[int], gamma: float, lam: float) -> tuple[torch.Tensor, torch.Tensor]:
    """
    Compute Generalized Advantage Estimation (GAE).

    GAE helps to reduce variance in policy gradient methods while maintaining low bias. 
    It computes the advantage estimates and the corresponding discounted returns for each timestep.

    Args:
        next_value (list[int]): Value of the next state after the final step (scalar or list of scalars).
        rewards (list[float]): List of rewards collected during the episode.
        dones (list[tuple[bool, bool]]): List of done flags (terminated, truncated) indicating episode termination.
        values (list[int]): List of value predictions from the value function (for each state).
        gamma (float): Discount factor for future rewards (typically between 0.9 and 0.99).
        lam (float): GAE lambda parameter controlling the bias-variance trade-off (typically between 0.9 and 0.95).

    Returns:
        tuple[torch.Tensor, torch.Tensor]: 
            - **returns (torch.Tensor):** Discounted cumulative rewards (returns) for each timestep.
            - **advantages (torch.Tensor):** Advantage estimates normalized for stable learning.
    """
    values = values + [next_value]
    gae = 0  # Initial value of the advantage
    returns = deque([])
    advantages = deque([])

    for step in reversed(range(len(rewards))):
        delta = rewards[step] + gamma * values[step + 1] * (1 - dones[step]) - values[step]
        gae = delta + gamma * lam * (1 - dones[step]) * gae
        returns.appendleft(gae + values[step])
        advantages.appendleft(gae)

    # Convert returns and advantages to tensors
    returns = torch.tensor(returns, dtype=torch.float32)
    advantages = torch.tensor(advantages, dtype=torch.float32)

    # Normalize advantages
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    return returns, advantages

The model and the optimizer will be initialized using defaut parameters.

In [25]:
obs_size = env.observation_space.shape[0]
act_size = env.action_space.shape[0]
GAMMA = 0.99
LAMBDA = 0.85
EPOCHS = 4
CLIP_EPS = 0.3
LEARNING_RATE = 6e-4

# initializing the agent and the optimizer.
agent = ActorCritic(obs_size, act_size).to(device)
optimizer = optim.Adam(agent.parameters(), lr=LEARNING_RATE)

In [9]:
def train(env, agent: ActorCritic, optimizer: optim.Adam, num_steps: int, writer, episode_count, printit=False) -> ActorCritic:   
    memory = Memory()
    state, _ = env.reset()
    memory.clear()

    episode_reward = 0
    episode_length = 0

    for step in range(num_steps):
        # get the action, log_prob and value
        action, log_prob, value = agent.act(state)

        if printit:
            print(f'Action: {action}\nLog Prob: {log_prob}\nValue: {value}')

        # take a step into the environment
        state, reward, terminated, truncated, _ = env.step(action)

        # set the termination flag
        done = terminated or truncated

        # add all to the memory
        memory.states.append(state)
        memory.actions.append(action)
        memory.log_probs.append(log_prob)
        memory.rewards.append(reward)
        memory.dones.append(done)
        memory.critic_values.append(value)

        # track episode stats
        episode_reward += reward
        episode_length += 1
        
        if done:
            break

    # when the episoed is completed, calculate the GAE
    if done:
        next_value = 0
    else:
        _, _, next_value = agent.act(state)
    
    returns, advantages = compute_gae(
        next_value=next_value,
        rewards=memory.rewards,
        dones=memory.dones,
        values=memory.critic_values,
        gamma=GAMMA,
        lam=LAMBDA,
    ) 

    # updating the policy of the agent.
    # iterate over the collected trajectories for the set epochs
    for _ in range(EPOCHS):
        # set the gradients of the optimizer to zero
        optimizer.zero_grad()

        # convert the states to tensor for faster computing
        states = torch.tensor(np.array(memory.states), dtype=torch.float32)
        _, new_log_probs, new_values = agent.act(states)

        # probability ratio
        old_log_probs = torch.tensor(memory.log_probs, dtype=torch.float32).detach()
        ratio = torch.exp(new_log_probs - old_log_probs)
        
        # clipped surrogate loss
        clipped_loss = torch.clamp(ratio, 1 - CLIP_EPS, 1 + CLIP_EPS)

        # calculate policy loss and value loss
        policy_loss = -torch.min(ratio * advantages, clipped_loss * advantages).mean()

        returns = torch.tensor(np.array(returns))
        value_loss = nn.MSELoss()(new_values.squeeze(), returns)
        
        # total loss
        total_loss = policy_loss + 0.5 * value_loss

        # backpropagate the loss and step the optimizer
        total_loss.backward()
        optimizer.step()

        # TensorBoard Logging
        writer.add_scalar("Loss/Policy Loss", policy_loss.item(), episode_count)
        writer.add_scalar("Loss/Value Loss", value_loss.item(), episode_count)
        writer.add_scalar("Loss/Total Loss", total_loss.item(), episode_count)

    # Log episode reward and length
    writer.add_scalar("Episode Reward", episode_reward, episode_count)
    writer.add_scalar("Episode Length", episode_length, episode_count)

    if episode_count % 10 == 0:
        print(f"Episode {episode_count}, Reward: {episode_reward:.2f}, Steps: {episode_length}")
        print(f"Action min: {min(memory.actions)}, Action max: {max(memory.actions)}, Action mean: {np.array(memory.actions).mean()}")
  

In [10]:
# %matplotlib inline
# %load_ext tensorboard

# %tensorboard --logdir ./custom_ppo_logs/ --host=0.0.0.0

In [26]:
env = gym.make("MountainCarContinuous-v0", render_mode="rgb_array", goal_velocity=0.1)
env = RecordEpisodeStatistics(env)
# Initialize TensorBoard writer
writer = SummaryWriter(log_dir="./custom_ppo_logs")

# Training Loop 
EPISODES = 1000
for episode_count in range(EPISODES):
    train(env, agent, optimizer, 1000, writer, episode_count)

# Cleanup
env.close()
writer.close()

Episode 0, Reward: -30.02, Steps: 999
Action min: tensor([-0.9803]), Action max: tensor([0.9829]), Action mean: 0.0854717493057251
Episode 10, Reward: -40.28, Steps: 999
Action min: tensor([-0.9948]), Action max: tensor([0.9970]), Action mean: 0.034234922379255295


KeyboardInterrupt: 

In [20]:
import time

env_test = gym.make("MountainCarContinuous-v0", render_mode="human", goal_velocity=0.1)
env_test = RecordEpisodeStatistics(env_test)

state, _ = env_test.reset()
total_reward = 0
for _ in range(1000):
    # action = agent.predict(state)
    action, *rest = agent.act(state)
    # print(state)
    state, reward, terminated, truncated, _ = env_test.step(action)

    total_reward += reward
    if terminated or truncated:
        break

print(f'Episode Reward: {total_reward}')
env_test.close() 

Episode Reward: -97.92537157130785


In [18]:
state, _ = env.reset()
print(f'State: {state}')

State: [-0.44388816  0.        ]


In [19]:
env.observation_space

Box([-1.2  -0.07], [0.6  0.07], (2,), float32)

In [None]:
mu, sigma, value = agent(torch.tensor(state))
print(f'Mu: {mu}\nSigma: {sigma}\nvalue: {value}')

In [None]:
action, log_prob, value = agent.act(state)
print(f'Action: {action}\nLog Prob: {log_prob}\nValue: {value}')

In [None]:
dist = Normal(mu, sigma)
action = dist.sample()
print(f'Action: {action}')
print(f'Log Prob: {dist.log_prob(action).sum(axis=-1)}')


In [None]:
state, reward, terminated, truncated, _ = env.step(action)
print(f'State: {state}\nReward: {reward}\n')

In [None]:
done = terminated or truncated

In [None]:
memory = Memory()

In [None]:
memory.states.append(state)
memory.actions.append(action)
memory.log_probs.append(log_prob)
memory.rewards.append(reward)
memory.dones.append(done)
memory.critic_values.append(value)

In [None]:
memory.states

In [None]:
next_value = 0 if done else memory.critic_values[-1]

In [None]:
GAMMA = 0.99
LAMBDA = 0.95
returns, advantages = compute_gae(
            next_value=next_value,
            rewards=memory.rewards,
            dones=memory.dones,
            values=memory.critic_values,
            gamma=GAMMA,
            lam=LAMBDA,
        ) 

In [None]:
states = np.array(memory.states)
_, new_log_probs, new_values = agent.act(states)

In [None]:
new_log_probs

In [None]:
# log_probs_detached = memory.log_probs.detach().numpy()
ratio = torch.exp(new_log_probs.detach() - torch.tensor(memory.log_probs).detach())

In [None]:
returns

In [None]:
returns = torch.tensor(np.array(returns))
value_loss = nn.MSELoss()(new_values.squeeze(), returns)

In [None]:

value_loss

In [None]:
# clipped surrogate loss
clipped_loss = torch.clamp(ratio, 1 - CLIP_EPS, 1 + CLIP_EPS)

# calculate policy loss and value loss
policy_loss = -torch.min(ratio * advantages, clipped_loss * advantages).mean()

In [None]:
total_loss = policy_loss + 0.5 * value_loss

In [None]:
total_loss.backward()

In [None]:
for name, param in agent.named_parameters():
    print(f"{name}: requires_grad = {param.requires_grad}")


In [None]:
# from stable_baselines3 import PPO

In [None]:
# agent = PPO(policy='MlpPolicy', env=env, verbose=1)

In [None]:
# agent.learn(total_timesteps=int(1e+5), progress_bar=True)