## Setup

In [4]:
import os
import time
import sys
from dataclasses import dataclass
from tqdm import tqdm
import numpy as np
from numpy.random import Generator
import torch as t
from torch import Tensor
from torch.optim.optimizer import Optimizer
import gym
import gym.envs.registration
from gym.envs.classic_control.cartpole import CartPoleEnv
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
import einops
from pathlib import Path
from typing import Tuple, Literal, Union, Optional
from jaxtyping import Float, Int
import wandb
from IPython.display import clear_output
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import warnings
warnings.filterwarnings('ignore')
# !pip install opencv-python
# Make sure exercises are in the path
chapter = "chapter2_rl"
exercises_dir = Path(f"{os.getcwd().split(chapter)[0]}/{chapter}/exercises").resolve()
section_dir = exercises_dir / "part3_ppo"
if str(exercises_dir) not in sys.path: sys.path.append(str(exercises_dir))

from part2_q_learning_and_dqn.utils import set_global_seeds
from part2_q_learning_and_dqn.solutions import Probe1, Probe2, Probe3, Probe4, Probe5
from part3_ppo.utils import make_env
import part3_ppo.utils as utils
import part3_ppo.tests as tests
from plotly_utils import plot_cartpole_obs_and_dones

# Register our probes from last time
for idx, probe in enumerate([Probe1, Probe2, Probe3, Probe4, Probe5]):
    gym.envs.registration.register(id=f"Probe{idx+1}-v0", entry_point=probe)

Arr = np.ndarray

device = t.device('mps' if t.backends.mps.is_available() else 'cuda' if t.cuda.is_available() else 'cpu')

MAIN = __name__ == "__main__"

In [5]:
@dataclass
class PPOArgs:
    # Basic / global
    seed: int = 1
    cuda: bool = t.cuda.is_available()
    env_id: str = "CartPole-v1"
    mode: Literal["classic-control", "atari", "mujoco"] = "classic-control"

    # Wandb / logging
    use_wandb: bool = False
    capture_video: bool = True
    exp_name: str = "PPO_Implementation"
    log_dir: str = "logs"
    wandb_project_name: str = "PPOCart"
    wandb_entity: str = None

    # Duration of different phases
    total_timesteps: int = 500000
    num_envs: int = 4
    num_steps: int = 128
    num_minibatches: int = 4
    batches_per_learning_phase: int = 4

    # Optimization hyperparameters
    learning_rate: float = 2.5e-4
    max_grad_norm: float = 0.5

    # Computing advantage function
    gamma: float = 0.99
    gae_lambda: float = 0.95

    # Computing other loss functions
    clip_coef: float = 0.2
    ent_coef: float = 0.01
    vf_coef: float = 0.25

    def __post_init__(self):
        self.batch_size = self.num_steps * self.num_envs
        assert self.batch_size % self.num_minibatches == 0, "batch_size must be divisible by num_minibatches"
        self.minibatch_size = self.batch_size // self.num_minibatches
        self.total_phases = self.total_timesteps // self.batch_size
        self.total_training_steps = self.total_phases * self.batches_per_learning_phase * self.num_minibatches

args = PPOArgs(num_minibatches=2)
utils.arg_help(args)

Unnamed: 0_level_0,default value,description
arg,Unnamed: 1_level_1,Unnamed: 2_level_1
seed,1,seed of the experiment
cuda,False,"if toggled, cuda will be enabled by default"
env_id,'CartPole-v1',the id of the environment
mode,'classic-control',"can be 'classic-control', 'atari' or 'mujoco'"
use_wandb,False,"if toggled, this experiment will be tracked with Weights and Biases"
capture_video,True,whether to capture videos of the agent performances (check out `videos` folder)
exp_name,'PPO_Implementation',the name of this experiment
log_dir,'logs',the directory where the logs will be stored
wandb_project_name,'PPOCart',the wandb's project name
wandb_entity,,the entity (team) of wandb's project


In [7]:
def layer_init(layer: nn.Linear, std=np.sqrt(2), bias_const=0.0):
    t.nn.init.orthogonal_(layer.weight, std)
    t.nn.init.constant_(layer.bias, bias_const)
    return layer


def get_actor_and_critic(
    envs: gym.vector.SyncVectorEnv,
    mode: Literal["classic-control", "atari", "mujoco"] = "classic-control",
) -> tuple[nn.Module, nn.Module]:
    '''
    Returns (actor, critic), the networks used for PPO, in one of 3 different modes.
    '''
    assert mode in ["classic-control", "atari", "mujoco"]

    obs_shape = envs.single_observation_space.shape
    num_obs = np.array(obs_shape).prod()
    num_actions = (
        envs.single_action_space.n
        if isinstance(envs.single_action_space, gym.spaces.Discrete)
        else np.array(envs.single_action_space.shape).prod()
    )

    if mode == "classic-control":
        actor, critic = get_actor_and_critic_classic(num_obs, num_actions)
    if mode == "atari":
        actor, critic = get_actor_and_critic_atari(obs_shape, num_actions)
    if mode == "mujoco":
        actor, critic = get_actor_and_critic_mujoco(num_obs, num_actions)

    return actor.to(device), critic.to(device)


def get_actor_and_critic_classic(num_obs: int, num_actions: int):
    '''
    Returns (actor, critic) in the "classic-control" case, according to diagram above.
    '''
    actor = nn.Sequential(
        layer_init(nn.Linear(num_obs, 64)),
        nn.Tanh(),
        layer_init(nn.Linear(64,64)),
        nn.Tanh(),
        layer_init(nn.Linear(64, num_actions), std=0.01)
    )
    critic = nn.Sequential(
        layer_init(nn.Linear(num_obs, 64)),
        nn.Tanh(),
        layer_init(nn.Linear(64,64)),
        nn.Tanh(),
        layer_init(nn.Linear(64, 1), std=1)
    )
    return actor, critic
    

tests.test_get_actor_and_critic(get_actor_and_critic, mode="classic-control")

All tests in `test_get_actor_and_critic(mode='classic-control')` passed!


In [10]:
@t.inference_mode()
def compute_advantages(
    next_value: t.Tensor,
    next_done: t.Tensor,
    rewards: t.Tensor,
    values: t.Tensor,
    dones: t.Tensor,
    gamma: float,
    gae_lambda: float,
) -> t.Tensor:
    '''Compute advantages using Generalized Advantage Estimation.
    next_value: shape (env,)
    next_done: shape (env,)
    rewards: shape (buffer_size, env)
    values: shape (buffer_size, env)
    dones: shape (buffer_size, env)
    Return: shape (buffer_size, env)
    '''
    T = values.shape[0]
    next_values = t.concat([values[1:], next_value.unsqueeze(0)])
    next_dones = t.concat([dones[1:], next_done.unsqueeze(0)])
    deltas = rewards + gamma * next_values * (1.0 - next_dones) - values
    advantages = t.zeros_like(deltas)
    advantages[-1] = deltas[-1]
    for s in reversed(range(1, T)):
        advantages[s-1] = deltas[s-1] + gamma * gae_lambda * (1.0 - dones[s]) * advantages[s]
    return advantages

tests.test_compute_advantages(compute_advantages)

All tests in `test_compute_advantages_single` passed!
