# Exercise Overview

Implementation of PPO to solve the bipedal walker from Gymnasium environment


# Install & Import Requirements



In [1]:
!pip install swig
!pip install gymnasium[box2d]

Collecting swig
  Downloading swig-4.4.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.4.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━[0m [32m1.6/1.9 MB[0m [31m49.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m34.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.4.1
Collecting box2d==2.3.10 (from gymnasium[box2d])
  Downloading Box2D-2.3.10-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (573 bytes)
Downloading Box2D-2.3.10-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.7/3.7 MB[0m [31m49.9 MB/s[0m eta [36m0:00

In [2]:
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
from torch.optim import Adam
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from datetime import datetime
from typing import Sequence
from tqdm import tqdm

# Helper functions

## MLP function to create the policy network of our agent

In [3]:
def mlp(sizes: Sequence[int], activation=nn.ReLU, output_activation=nn.Identity) -> nn.Sequential:
  """
      Create a simple feedforward neural network.
  """
  layers = []
  for j in range(len(sizes)-1):
    act = activation if j < len(sizes)-2 else output_activation
    layers += [nn.Linear(sizes[j], sizes[j+1]), act()]
  return nn.Sequential(*layers)

## Functions to get the ouput of the policy network

In [4]:
def get_policy(obs: torch.Tensor) -> Normal:
  """
  Get the stochastic policy for a given observation (-batch).
  Returns a distribution for every action-dimension.
  """
  obs = obs.unsqueeze(0) if obs.dim() == 1 else obs  # for single observations that do not have a batch dimension
  logits = actor(obs)
  mean, logstd = logits[:, :n_acts], logits[:, n_acts:]  # split the output layer into mean and logstd
  logstd = torch.clamp(logstd, min=-20, max=2)  # for numerical stability
  return Normal(mean, torch.exp(logstd))

In [5]:
def get_action(obs: torch.Tensor) -> np.ndarray:
  """
  Get the action (-batch) from the policy for a given observation (-batch).
  """
  dist = get_policy(obs)
  return dist.sample().squeeze(0).numpy()

In PPO, we need to compute log probabilities separately for importance sampling.

This function extracts the log probability computation.

In [6]:
def get_log_prob(obs: torch.Tensor, act: torch.Tensor) -> torch.Tensor:
  """
  Get the log-probability of a given action for a given observation.

  Args:
      obs: Observations tensor of shape (batch_size, obs_dim)
      act: Actions tensor of shape (batch_size, n_acts)

  Returns:
      log_probs: Log probabilities of shape (batch_size,)
  """
  dist = get_policy(obs)
  logp = dist.log_prob(act).sum(dim=-1) #sum actions up fro example prob of left thtrust and right thrust
  return logp

## Function to compute GAE

In [7]:
def compute_gae(rewards: torch.Tensor,
                values: torch.Tensor,
                next_values: torch.Tensor,
                dones: torch.Tensor,  # values are either 1.0 (has ended) or 0.0 (has not ended), indicating whether an episode has ended or not.
                gamma=0.99,  # discount factor (0,1]
                lam=0.95,  # trace-decay parameter [0,1]. lam=0.0: temporal difference, lam=1.0: Monte Carlo
                ) -> torch.Tensor:
  """
  Compute the Generalized Advantage Estimation (GAE) used for the actor loss function.
  Balances the bias-variance tradeoff of the advantage estimates.
  lam=0.0: temporal difference, high bias - low variance
  lam=1.0: Monte Carlo, high variance - low bias
  """
  T = rewards.shape[0]
  advantages = torch.zeros_like(rewards)
  gae = 0.0
  for t in reversed(range(T)):
    nonterminal = 1.0 - dones[t]
    delta = rewards[t] + gamma * next_values[t] * nonterminal - values[t]
    gae = delta + gamma * lam * nonterminal * gae
    advantages[t] = gae
  return advantages

PPO uses a clipped surrogate objective instead of vanilla policy gradient.

Key components:
1. Importance sampling ratio (for numerical stability we first calclate the log of the ratio)
2. Clipped ratio: prevents ratio from going too far from 1.0
3. Clipped objective
4. Entropy bonus: encourages exploration


In [17]:
def get_actor_loss(obs: torch.Tensor,
                  act: torch.Tensor,
                  weights: torch.Tensor,
                  old_logp: torch.Tensor,
                  beta: float = 0.00  # no entropy regularization per default
                  ) -> torch.Tensor:
  """
  Compute the PPO clipped surrogate loss with entropy regularization for the actor.
  weights: advantages
  old_logp: log-probability under the old policy (before doing a first update)
  beta: hyperparameter for entropy regularization
  """
  dist = get_policy(obs)
  logp = get_log_prob(obs, act)
  ratio = torch.exp(logp - old_logp)  # importance weight
  clipped_obj = torch.min(ratio * weights, torch.clamp(ratio, 1 - epsilon, 1 + epsilon) * weights).mean()
  entropy = dist.entropy().sum(dim=-1).mean()

  return - clipped_obj - beta * entropy

## Function to compute the discounted return

In [9]:
def discounted_return(arr: Sequence[float], gamma=0.99) -> list[float]:
  """
  Compute the discounted return for a single episode, given a sequence of rewards.
  gamma: discount factor (0,1]
  Used for the MSE loss function of the critic.
  """
  ret = [0.0] * len(arr)
  ret[-1] = arr[-1]
  for i in range(len(arr)-2, -1, -1):
    ret[i] = arr[i] + gamma * ret[i+1]
  return ret

PPO makes several key changes to Actor-Critic's training:

1. Compute old_log_prob BEFORE any updates (for importance sampling)
2. Remove separate critic update loop
3. Add outer loop for multiple epochs (n_ppo_epochs)
4. Add inner loop for mini-batches with shuffling
5. Both actor and critic update in each mini-batch
6. Add gradient clipping for stability

In [10]:
def train_one_epoch() -> tuple[list, list]:
  """
  Train the actor and critic for one epoch,
  i.e. one actor-update and n_critic_updates critic-updates.
  """
  batch_obs = []
  batch_acts = []
  batch_rewards = []
  batch_next_obs = []
  batch_dones = []
  batch_rets = []
  batch_lens = []
  batch_Rtogo = []

  obs, _ = env.reset()
  ep_rews = []

  while True:
    act = get_action(torch.as_tensor(obs, dtype=torch.float32))
    next_obs, rew, terminated, truncated, _ = env.step(act)
    done = terminated or truncated

    batch_obs.append(obs.copy())  # copy as obs is modified in-place
    batch_acts.append(act)  # act is newly initialized every loop -> no copy
    batch_rewards.append(rew)
    batch_next_obs.append(next_obs.copy())
    batch_dones.append(done)
    ep_rews.append(rew)

    obs = next_obs

    if done:
      batch_rets.append(sum(ep_rews))
      batch_lens.append(len(ep_rews))
      batch_Rtogo += discounted_return(ep_rews, gamma=gamma)
      ep_rews = []
      obs, _ = env.reset()

      if len(batch_obs) > batch_size:
        break

  # convert lists to tensors
  batch_obs = torch.as_tensor(np.array(batch_obs), dtype=torch.float32)
  batch_next_obs = torch.as_tensor(np.array(batch_next_obs), dtype=torch.float32)
  batch_acts = torch.as_tensor(np.array(batch_acts), dtype=torch.float32)
  batch_rewards = torch.as_tensor(np.array(batch_rewards), dtype=torch.float32)
  batch_dones = torch.as_tensor(np.array(batch_dones), dtype=torch.float32)
  batch_Rtogo = torch.as_tensor(np.array(batch_Rtogo), dtype=torch.float32)

  V_target = batch_Rtogo.detach()  # target for critic, cannot have gradients

  # Remove the critic update here, remember that PPO is introduced to speed up the computational time
  # we can witness the fact that for just 1 actor update is required to generate several samples and
  #and update the critic up to 80 times
  #now we want with the same batch of observations to train both the actor and the critic multiple times

  # calculate generalized advantage estimate GAE
  with torch.no_grad():  # advantages should never have gradients
    value = critic(batch_obs).squeeze()
    next_values = critic(batch_next_obs).squeeze()
    A_gae = compute_gae(batch_rewards, value, next_values, batch_dones, gamma=gamma, lam=lam)  # weight for actor loss function
  A_gae = ((A_gae - A_gae.mean()) / (A_gae.std() + 1e-8))


  # Compute old_log_prob before any updates (for importance sampling)
  old_log_prob = get_log_prob(batch_obs, batch_acts).detach()


  #Add outer loop for multiple epochs (use n_ppo_epochs)
  #Inside outer loop, shuffle the data
  #Use torch.randperm(len(batch_obs)) to get shuffled indices
  #we get shuffled indices in order to have less correlated samples and make data look more iid otherwise sgd doesnt work
  for _ in range(n_ppo_epochs):
    idx = torch.randperm(len(batch_obs))
    #torch.randperm(4) --> tensor([2, 1, 0, 3])
    #Add inner loop for mini-batches
  #Loop from 0 to len(batch_obs) with step size mini_batch_size
  #Get mini-batch indices: idx = indices[start:end]
  #Check if idx is not empty before processing
    for start in range(0, len(batch_obs), mini_batch_size):
      end = min(start + mini_batch_size, len(batch_obs)) #avoid to go over the lengt of the array idx
      mini_batch_indices = idx[start:end]

      if len(mini_batch_indices) == 0:
        continue
      #Extract mini-batches for obs, acts, advantages, old_logp, V_target
      obs_min_batch = batch_obs[mini_batch_indices]
      acts_min_batch = batch_acts[mini_batch_indices]
      old_logp_min_batch = old_log_prob[mini_batch_indices]
      V_target_min_batch = V_target[mini_batch_indices]
      A_gae_min_batch = A_gae[mini_batch_indices]
      #Actor update
      #Use get_actor_loss with mini-batch data
      #Add gradient clipping: nn.utils.clip_grad_norm_(actor.parameters(), 0.5)
      actor_optimizer.zero_grad()
      actor_loss = get_actor_loss(obs_min_batch,acts_min_batch, _, old_logp_min_batch, beta)
      actor_loss.backward()
      nn.utils.clip_grad_norm_(actor.parameters(), 0.5)#useful to not make gradients explode
      actor_optimizer.step()
      #Critic update
      #Use mse loss with critic predictions and V_target
      #Use .flatten() instead of .squeeze() for robust handling
      #Add gradient clipping: nn.utils.clip_grad_norm_(critic.parameters(), 0.5)
      critic_optimizer.zero_grad()
      critic_loss = mse(critic(batch_obs).squeeze() ,V_target)
      critic_loss.backward()
      nn.utils.clip_grad_norm_(critic.parameters(), 0.5)
      critic_optimizer.step()

  return batch_rets, batch_lens

Add PPO-specific hyperparameters and adjust the setup.

Changes needed:
1. Add: clip_ratio, n_ppo_epochs, mini_batch_size, beta
2. Remove: n_critic_updates (no longer separate)
3. Everything else stays the same!

In [None]:

env_name='LunarLander-v3'
hidden_sizes=[64, 64]
lr=3e-4
lr_critic=1e-3
epochs=1_000
batch_size=5_000
gamma=0.99
lam=0.95
plot=True
mini_batch_size = 128
beta = 0.0
n_ppo_epochs = 10
env = gym.make(env_name, continuous=True) if env_name == "LunarLander-v3" else gym.make(env_name)  # BipedalWalker is continuous per default

obs_dim = env.observation_space.shape[0]
n_acts = env.action_space.shape[0]

actor = mlp([obs_dim]+hidden_sizes+[2*n_acts])  # output-layer: 2*n_acts, for mean and logstd as the policy is stochastic
critic = mlp([obs_dim]+hidden_sizes+[1])
actor_optimizer = Adam(actor.parameters(), lr=lr)
critic_optimizer = Adam(critic.parameters(), lr=lr_critic)
mse = nn.MSELoss()

returns = []
std = []

# training loop
progress_bar = tqdm(range(1, epochs+1))
for _ in progress_bar:
  batch_rets, batch_lens = train_one_epoch()
  avg_ret = np.mean(batch_rets)
  avg_len = np.mean(batch_lens)
  returns.append(avg_ret)
  std.append(np.std(batch_rets))
  progress_bar.set_postfix({"avg_ret": f"{avg_ret:5.0f}", "avg_len": f"{avg_len:5.0f}"})

if plot:
  plt.plot(returns)
  plt.fill_between(range(len(returns)), np.array(returns) - np.array(std), np.minimum(300, np.array(returns) + np.array(std)), alpha=0.3)
  plt.grid()
  goal = {"LunarLander-v3": 200, "BipedalWalker-v3": 300}.get(env_name, 0)
  plt.axhline(goal, color='r', linestyle='--')
  plt.xlabel('epoch')
  plt.ylabel('average return')
  timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
  # TODO: Change filename
  plt.savefig(f"PPO_training_{timestamp}.png")
  plt.show()

 91%|█████████ | 908/1000 [1:53:19<11:22,  7.42s/it, avg_ret=-285, avg_len=132]