<a href="https://colab.research.google.com/github/carsondenison/proximal-policy-optimization/blob/main/PPO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install --quiet "torch" "pytorch-lightning" "gym"

Step 0: Import the libraries we'll need

In [2]:
import random
from typing import List, Tuple, Iterable
from collections import namedtuple, deque

import gym
import numpy as np
import torch
import torch.nn as nn
from torch import Tensor
# import pytorch_lightning as pl

# Implements: https://arxiv.org/pdf/1707.06347.pdf

Step 1: Create a dataset to store experiences

In [3]:
Experience = namedtuple('Experience', 'state action reward done new_state')

class ReplayBuffer():
    '''
        Buffer to hold Experiences for training
    '''
    def __init__(self):
        self.buffer:List[Experience] = []
    
    def append(self, x):
        self.buffer.append(x)
    
    def clear(self):
        self.buffer = []

    def to_batch(self) -> Tuple[Tensor, Tensor, Tensor, Tensor, Tensor]:
        states, actions, rewards, dones, new_states = zip(*self.buffer)
        states = torch.tensor(states, dtype=torch.float32, requires_grad=True)
        actions = torch.tensor(actions, dtype=torch.int64)[:, None]
        rewards = torch.tensor(rewards, dtype=torch.float32)[:, None]
        dones = torch.tensor(dones, dtype=torch.bool)[:, None]
        new_states = torch.tensor(new_states, dtype=torch.float32)
        return states, actions, rewards, dones, new_states

Step 2: Create an actor that can interact with the environment

In [4]:
class Actor():
    '''
        Class which can interact with the environment
    '''
    def __init__(self, env:gym.Env, replay_buffer:ReplayBuffer, pi:nn.Module):
        self.env = env
        self.buffer = buffer
        self.pi = pi
        self.state = self.env.reset() # self.state is a numpy array

    def get_action(self) -> int:
        '''
            Samples the policy to get an action given self.state
        '''
        pi_logits = self.pi(torch.tensor(self.state))
        policy = torch.distributions.categorical.Categorical(logits=pi_logits)
        action = policy.sample()
        return action.item()

    @torch.no_grad()
    def play_step(self) -> None:
        '''
            Play one step of the environment, and add it to the buffer
        '''
        action = self.get_action()
        new_state, reward, done, _ = self.env.step(action)
        exp = Experience(self.state, action, reward, done, new_state)
        self.buffer.append(exp)
        self.state = new_state
        if done:
            self.state = self.env.reset()
        return done


Step 3: Define the neural network architecture for policy and advantage

In [5]:
class MLP(nn.Module):
    '''
        Simple MLP, as described in https://arxiv.org/pdf/1707.06347.pdf
    '''
    def __init__(self, in_size, out_size, hidden_size=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, out_size),
        )

    def forward(self, state):
        return self.net(state.float())

Step 4: Define an advantage estimator function. This is built from a value network and a reward_to_go calculator



In [16]:
def reward_to_go(rewards:Tensor, dones:Tensor, gamma:float) -> Tensor:
    '''
        Calculates the rewards_to_go for a trajectory

        Args:
            rewards: (T, 1) float32 of rewards for each step
            dones: (T, 1) bool - if state i was terminal
            gamma: discount factor for each step
        Returns:
            rewards_to_go: Discounted reward_to_go for each reward in rewards
    '''
    rewards_to_go = torch.zeros_like(rewards, dtype=torch.float32)
    for i in reversed(range(len(dones))):
        if dones[i] or i == len(dones) - 1:
            rewards_to_go[i] = rewards[i]
        else:
            rewards_to_go[i] = rewards[i] + gamma * rewards_to_go[i + 1]
    return rewards_to_go  

# quick tests for reward_to_go
dones_3 = torch.tensor([0,0,0], dtype=bool)
dones_1 = torch.tensor([0], dtype=bool)

rtg = reward_to_go(torch.tensor([1, 1, 1,]), dones_3, 1)
assert torch.all(torch.eq(rtg, torch.tensor([3, 2, 1]))), str(rtg)
rtg2 = reward_to_go(torch.tensor([1, 1, 1]), dones_3, 0.5)
assert torch.all(torch.eq(rtg2, torch.tensor([1.75, 1.5, 1]))), str(rtg2)
rtg3 = reward_to_go(torch.tensor([]), torch.tensor([]), 1)
assert torch.all(torch.eq(rtg3, torch.tensor([])))
rtg4 = reward_to_go(torch.tensor([1]), dones_1, 0.5)
assert torch.eq(rtg4, torch.tensor(1)), str(rtg4)

In [7]:
def estimate_advantage(states:Tensor, rewards:Tensor, dones:Tensor, value_net:nn.Module, gamma:float, final_state:Tensor=None) -> Tensor:
    '''
        Compute advantage estimate for each step in a trajectory

        Args:
            rewards: (T, 1) torch.float32 - Reward given by each step in the trajectory
            states: (T, state_size) torch.float32 - observation vectors for each state
            dones: (T, 1) bool - if state was terminal
            v_net: Trainable network which predicts the value V(s) of a state
            gamma: Discount factor. Assume lambda = 1 from GAE-Lambda
            final_state: if given final_state, dones[-1] must be equal to 0
        Returns:
            advantages: The estimated advantage for each step in the trajectory
    '''

    values = value_net(states) # Shape (T, 1)
    values.masked_fill_(dones, 0)
    rewards_to_go = reward_to_go(rewards, dones, gamma)
    discounted_final_values = torch.zeros_like(values, dtype=torch.float32)
    if final_state is not None:
        final_value = value_net(final_state).item()
        discount = gamma
        for t in reversed(range(len(dones))):
            if dones[t]:
                break
            discounted_final_values[t] = discount * final_value 
            discount *= gamma
    advantages = rewards_to_go - values + discounted_final_values
    return advantages

In [8]:
class FakeValue(nn.Module):
    '''
        torch.nn.Module that returns the identiy. Useful for testing
    '''
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x


fake_value_net = FakeValue()
r = torch.tensor([1,1])
s = torch.tensor([1,1])
d = torch.tensor([0,0])

# Test that discounting of values works with gamma = 1, done = True
adv = estimate_advantage(s, r, d, fake_value_net, 1)
assert torch.allclose(adv, torch.tensor([1,0], dtype=torch.float32)), adv
# Test that discounting of values works with gamma = 0.3, done = True
adv2 = estimate_advantage(s, r, d, fake_value_net, 0.3)
assert torch.allclose(adv2, torch.tensor([0.3,0], dtype=torch.float32)), adv2
# Test that this works with a final_state
final_state = torch.tensor([2])
adv3 = estimate_advantage(s, r, d, fake_value_net, 0.5, final_state)
assert torch.allclose(adv3, torch.tensor([1,1], dtype=torch.float32)), adv3

# Test that this works with intermediate dones:
r = torch.tensor([1,1,0,1,1,0])
s = torch.tensor([2,2,1,2,2,1])
d = torch.tensor([0,0,1,0,0,1])
adv4 = estimate_advantage(s, r, d, fake_value_net, 0.5, final_state=None)
expected = torch.tensor([-0.5, -1, 0, -0.5, -1, 0])
assert torch.allclose(adv4, expected), adv4

Step 5: Define the clipped loss function

In [11]:
def clipped_loss(states:Tensor, actions:Tensor, advantages:Tensor, pi_old:Tensor, pi_net:nn.Module, epsilon=0.2) -> Tensor:
    '''
        PPO Clipped Loss

        Args:
            states: The states from a given trajectory T
            advantages: Advantage estimates for T, based on GAE-Lambda
            pi_old: Probability distribution for pi(a|s) used to generate the trajectory
            pi_net: Most up to date policy network
            epsilon: Clipping hyperparameter for loss. See page 3 - https://arxiv.org/pdf/1707.06347.pdf
        Returns:
            loss: L_clip used to optimize the policy network
    '''
    pi = pi_net(states).gather(1, actions)
    ratio = torch.div(pi, pi_old)
    unclipped = ratio * advantages
    clipped = torch.clamp(ratio, 1-epsilon, 1+epsilon) * advantages
    elementwise_mins = torch.minimum(unclipped, clipped)
    loss = torch.mean(elementwise_mins)
    return loss

Step 6: Main Training Loop


In [22]:
# Set up the environment
env = gym.make('CartPole-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Build the neural networks and optimizers
pi = MLP(state_size, action_size)
v = MLP(state_size, 1)

pi_optimizer = torch.optim.Adam(pi.parameters(), lr=1e-3)
v_optimizer = torch.optim.Adam(v.parameters(), lr=1e-3)
v_loss_fn = nn.MSELoss()

# Set up the buffer and actor
buffer = ReplayBuffer()
actor = Actor(env, buffer, pi)

# Hyperparameters
total_epochs = 101
episode = 0
longest_episode_length = 0
gamma = 0.99
K_pi = 20
K_v = 20

for epoch in range(total_epochs):
    if epoch % 100 == 0:
        print(f'epoch: {epoch}')

    # Play up to 400 steps
    episode_length = 0
    for _ in range(200):
        done = actor.play_step()
        episode_length += 1
        if done:
            episode += 1
            # Log eisode information to the console
            if episode_length >= longest_episode_length:
                longest_episode_length = episode_length
                print(f'episode: {episode} length: {longest_episode_length}')
            episode_length = 0

    # Unpack replay buffer into arrays and reset the buffer
    states, actions, rewards, dones, new_states = buffer.to_batch()
    buffer.clear()

    # Compute the information we'll need to feed the two loss functions
    if done:
        final_state = None
    else:
        final_state = new_states[-1]
    advantages = estimate_advantage(states, rewards, dones, v, gamma, final_state).detach()
    pi_old = pi(states).gather(1, actions).detach()
    rtg = reward_to_go(rewards, dones, gamma) # (N, 1)

    # Update policy network for K steps
    # A full implementation have a KL divergence check here to stop updating policy network and get another trajectory when the policy changes too much
    for _ in range(K_pi):
        pi_loss = clipped_loss(states, actions, advantages, pi_old, pi)
        pi_optimizer.zero_grad()
        pi_loss.backward()
        pi_optimizer.step()

    # Update Value Network V(s_t) should be reward_to_go_t. MSE Loss
    for _ in range(K_v):
        values = v(states)
        v_loss = v_loss_fn(values, rtg)
        v_optimizer.zero_grad()
        v_loss.backward()
        v_optimizer.step()

epoch: 0
episode: 1 length: 24
episode: 4 length: 38
episode: 6 length: 51
epoch: 100
