**Dependencies and setup**

This can take a minute or so...

In [471]:
%%capture
import gym
import random
import numpy as np
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributions as tdist
import matplotlib.pyplot as plt
import sys
import os
import pickle
import math
import optuna

from pyvirtualdisplay import Display
from IPython import display as disp
from typing import Callable
from copy import deepcopy
from scipy.spatial.transform import Rotation as R
from scipy.spatial.transform import Slerp

%matplotlib inline

os.environ['PYVIRTUALDISPLAY_DISPLAYFD'] = '0'

display = Display(visible=0,size=(600,600))
display.start()
device = T.device('cuda') if T.cuda.is_available() else T.device('cpu')

video_every = 50 # take video every n episodes

**Reinforcement learning agent**

The agent is taken from the TD3 implemenetation of a bipedal walker solution taken from https://github.com/hmomin/TD3-Bipedal-Walker

In [472]:
class Buffer:
    def __init__(
        self,
        observationDim: int,
        actionDim: int,
        device: T.device,
        size: int = 1_000_000,
    ):
        # use a fixed-size buffer to prevent constant list instantiations
        self.states = T.zeros((size, observationDim), device=device)
        self.actions = T.zeros((size, actionDim), device=device)
        self.rewards = T.zeros(size, device=device)
        self.nextStates = T.zeros((size, observationDim), device=device)
        self.doneFlags = T.zeros(size, device=device)
        self.random = np.random.default_rng()
        # use a pointer to keep track of where in the buffer we are
        self.pointer = 0
        # use current size to ensure we don't train on any non-existent data points
        self.currentSize = 0
        self.size = size
        self.indexes = []

    def store(
        self,
        state: np.ndarray,
        action: np.ndarray,
        reward: float,
        nextState: np.ndarray,
        doneFlag: bool,
    ):
        # store all the data for this transition
        
        tensorState = T.tensor(state, device=device)
        tensorAction = T.tensor(action, device=device)
        tensorNextState = T.tensor(nextState, device=device)

        if self.currentSize < self.size:
            self.indexes.append(self.currentSize)
            self.currentSize += 1
        else:
            #To make sure all states are in order of recency
            self.states = T.roll(self.states, shifts=-1, dims=0)
            self.actions = T.roll(self.actions, shifts=-1, dims=0)
            self.rewards = T.roll(self.rewards, shifts=-1, dims=0)
            self.next_states = T.roll(self.nextStates, shifts=-1, dims=0)
            self.dones = T.roll(self.doneFlags, shifts=-1, dims=0)

        ptr = self.currentSize-1
        self.states[ptr, :] = tensorState
        self.actions[ptr, :] = tensorAction
        self.rewards[ptr] = reward
        self.nextStates[ptr, :] = tensorNextState
        self.doneFlags[ptr] = float(doneFlag)


    def fade(self, norm_index, fade_param): 
        return np.tanh(fade_param*norm_index**2)

    def generate_probs(self, fade_param):
        weights = 1e-7*(self.fade(np.array(self.indexes)/self.currentSize, fade_param))# weights are based solely on the history, highly squashed
        self.probs = weights/np.sum(weights)
        return self.probs

    def getMiniBatch(self, size: int, fade_param) -> dict[str, T.Tensor]:
        # ensure size is not bigger than the current size of the buffer
        if size >= self.currentSize or fade_param == -1:
            indices = T.randint(0, self.currentSize, (size,), device=device)
        else:
            # generate random indices
            # indices = T.randint(0, self.currentSize, (size,), device=device)
            indices = self.random.choice(self.indexes, p=self.generate_probs(fade_param), size=size)
        
        # return the mini-batch of transitions
        return {
            "states": self.states[indices, :],
            "actions": self.actions[indices, :],
            "rewards": self.rewards[indices],
            "nextStates": self.nextStates[indices, :],
            "doneFlags": self.doneFlags[indices],
        }


In [473]:
def init_weights(layer):
    if isinstance(layer, nn.Linear):
        T.nn.init.xavier_uniform_(layer.weight)
        T.nn.init.constant_(layer.bias, 0.0)


class Network(nn.Module):
    def __init__(
        self,
        shape: list,
        outputActivation: Callable,
        learningRate: float,
        device: T.device,
    ):
        super().__init__()
        # initialize the network
        layers = []
        for i in range(1, len(shape)):
            dim1 = shape[i - 1]
            dim2 = shape[i]
            layers.append(nn.Linear(dim1, dim2))
            if i < len(shape) - 1:
                layers.append(nn.ReLU())
        layers.append(outputActivation())
        self.network = nn.Sequential(*layers)

        self.optimizer = optim.Adam(self.parameters(), lr=learningRate)
        self.to(device)

    def forward(self, state: T.Tensor) -> T.Tensor:
        return self.network(state)

    def gradientDescentStep(self, loss: T.Tensor, retainGraph: bool = False) -> None:
        self.optimizer.zero_grad()
        loss.backward(retain_graph=retainGraph)
        self.optimizer.step()

    def reinitialise(self):
        self.apply(init_weights)


In [474]:
class Agent:
    def __init__(self, env, learningRate, gamma, tau, buffer_size):
        self.observationDim = env.observation_space.shape[0]
        self.actionDim = env.action_space.shape[0]
        self.gamma = gamma
        self.tau = tau
        # check if the saveFolder path exists

        self.buffer = Buffer(self.observationDim, self.actionDim, device, buffer_size)

        # initialize the actor and critics
        self.actor = Network([self.observationDim, 256, 256, self.actionDim], nn.Tanh, learningRate, device)

        self.critic1 =  Network([self.observationDim + self.actionDim, 256, 256, 1], nn.Identity, learningRate, device)

        self.critic2 = Network([self.observationDim + self.actionDim, 256, 256, 1], nn.Identity, learningRate, device)
        
        # create target networks
        self.targetActor = deepcopy(self.actor)
        self.targetCritic1 = deepcopy(self.critic1)
        self.targetCritic2 = deepcopy(self.critic2)

    def getNoisyAction(self, state, sigma):
        deterministicAction = self.getDeterministicAction(state)
        noise = np.random.normal(0, sigma, deterministicAction.shape)
        return np.clip(deterministicAction + noise, -1, +1)

    def getDeterministicAction(self, state):
        actions: T.Tensor = self.actor.forward(T.tensor(state, device=device))
        return actions.cpu().detach().numpy()

    def reset(self):
        self.actor.reinitialise()
        self.critic1.reinitialise()
        self.critic2.reinitialise()

    def update(self, miniBatchSize, trainingSigma, trainingClip, updatePolicy, fade_param, tau):
        # randomly sample a mini-batch from the replay buffer

        miniBatch = self.buffer.getMiniBatch(miniBatchSize, fade_param)
        # create tensors to start generating computational graph
        states = miniBatch["states"]
        actions = miniBatch["actions"]
        rewards = miniBatch["rewards"]
        nextStates = miniBatch["nextStates"]
        dones = miniBatch["doneFlags"]
        # compute the targets
        targets = self.computeTargets(
            rewards, nextStates, dones, trainingSigma, trainingClip
        )
        # do a single step on each critic network
        Q1Loss = self.computeQLoss(self.critic1, states, actions, targets)
        self.critic1.gradientDescentStep(Q1Loss, True)
        Q2Loss = self.computeQLoss(self.critic2, states, actions, targets)
        self.critic2.gradientDescentStep(Q2Loss)
        if updatePolicy:
            # do a single step on the actor network
            policyLoss = self.computePolicyLoss(states)
            self.actor.gradientDescentStep(policyLoss)
            # update target networks
            self.updateTargetNetwork(self.targetActor, self.actor, tau)
            self.updateTargetNetwork(self.targetCritic1, self.critic1, tau)
            self.updateTargetNetwork(self.targetCritic2, self.critic2, tau)

    def computeTargets(
        self,
        rewards: T.Tensor,
        nextStates: T.Tensor,
        dones: T.Tensor,
        trainingSigma: float,
        trainingClip: float,
    ) -> T.Tensor:
        targetActions = self.targetActor.forward(nextStates.float())
        # create additive noise for target actions
        noise = T.normal(0, trainingSigma, targetActions.shape, device=device)
        clippedNoise = T.clip(noise, -trainingClip, +trainingClip)
        targetActions = T.clip(targetActions + clippedNoise, -1, +1)
        # compute targets
        targetQ1Values = T.squeeze(
            self.targetCritic1.forward(T.hstack([nextStates, targetActions]).float()))
        
        targetQ2Values = T.squeeze(
            self.targetCritic2.forward(T.hstack([nextStates, targetActions]).float()))
        
        targetQValues = T.minimum(targetQ1Values, targetQ2Values)
        return rewards + self.gamma * (1 - dones) * targetQValues

    def computeQLoss(
        self, network: Network, states: T.Tensor, actions: T.Tensor, targets: T.Tensor
    ) -> T.Tensor:
        # compute the MSE of the Q function with respect to the targets
        QValues = T.squeeze(network.forward(T.hstack([states, actions]).float()))
        return T.square(QValues - targets).mean()

    def computePolicyLoss(self, states: T.Tensor):
        actions = self.actor.forward(states.float())
        QValues = T.squeeze(self.critic1.forward(T.hstack([states, actions]).float()))
        return -QValues.mean()

    def updateTargetNetwork(self, targetNetwork, network, tau):
        with T.no_grad():
            for targetParameter, parameter in zip(targetNetwork.parameters(), network.parameters()):
                targetParameter.mul_(1 - tau)
                targetParameter.add_(tau * parameter)


In [475]:
class CosineScheduler:
    def __init__(self, first_value, end_value, step_count):
        self.max = first_value
        self.min = end_value
        self.step_count = step_count
    
    def get_value(self, step):
        if step >= self.step_count:
            return self.min
        return self.min + 0.5 * (self.max - self.min) * (1 + math.cos(step / self.step_count * math.pi))

class LinearScheduler:
    def __init__(self, first_value, end_value, step_count):
        self.max = first_value
        self.min = end_value
        self.step_count = step_count
    
    def get_value(self, step):
        if step >= self.step_count:
            return self.min
        return self.min + (self.max - self.min) * (1 - (step / self.step_count))


class CosineSchedulerTorch:
    def __init__(self, optimizer_list, end_value, step_count):
        self.min = end_value
        self.step_count = step_count

        self.schedulers = []
        for op in optimizer_list:
            self.schedulers.append(T.optim.lr_scheduler.CosineAnnealingLR(op, step_count, end_value))

    def make_step(self):
        for op in self.schedulers:
            op.step()


class LinearSchedulerTorch:
    def __init__(self, optimizer_list, end_value, step_count):
        self.min = end_value
        self.step_count = step_count

        self.schedulers = []
        for op in optimizer_list:
            self.schedulers.append(T.optim.lr_scheduler.LinearLR(op, step_count, end_value))

    def make_step(self):
        for op in self.schedulers:
            op.step()

Select environment to be either normal or hardcore

In [476]:
%%capture
env = gym.make("BipedalWalker-v3")
#env = gym.make("BipedalWalkerHardcore-v3")
env = gym.wrappers.Monitor(env, "./video", video_callable=lambda ep_id: ep_id%video_every == 0, force=True)

obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]

**Prepare the environment and wrap it to capture videos**

In [477]:
print('The environment has {} observations and the agent can take {} actions'.format(obs_dim, act_dim))
print('The device is: {}'.format(device))

The environment has 24 observations and the agent can take 4 actions
The device is: cuda
It's recommended to train on the cpu for this


This is code for normal training, it should be identical to hardcore training, in everything other than hyperparameters

In [None]:
seed = 42
T.manual_seed(seed)
env.seed(seed)
random.seed(seed)
np.random.seed(seed)
env.action_space.seed(seed)

ep_reward = 0
amended_ep_reward = 0
reward_list = []
plot_data = []
learningRate = 0.000526
tau = 0.25  # tracking parameter used to update target networks slowly
gamma = 0.99  # discount factor for rewards
actionSigma = 0.18  # noise in the deterministic policy output
trainingSigma = 0.2  # noise for the target actions
trainingClip = 0.5  # clip limit for target actions
miniBatchSize = 100  # size of the batch size
policyDelay = 2  # number of steps to wait before updating the policy
min_reward = -8
render = True
fade_param = -1
exploration_trials = 5
buffer_size = 1_500_000
# initialise agent
agent = Agent(env, learningRate, gamma, tau, buffer_size)
max_episodes = 3000
max_timesteps = 2000
plot_interval = 10 # update the plot every n episodes
train_between_episodes = 0
stall_reward = 60
solution_found = False

schedule_counter = 0
schedule_period = 60

scheduler_type = LinearScheduler

tau_schedueler = scheduler_type(tau, 0.003, schedule_period)
sigma_schedueler = scheduler_type(actionSigma, 0.05, schedule_period)
train_schedueler = scheduler_type(trainingSigma, 0.125, schedule_period)
reward_schedueler = scheduler_type(min_reward, -100, schedule_period)
batch_schedueler = CosineScheduler(-miniBatchSize, -512, 2*schedule_period)
lr_scheduler = CosineSchedulerTorch([agent.actor.optimizer, agent.critic1.optimizer, agent.critic2.optimizer], 0.000003, 2*schedule_period)

# training procedure:
for episode in range(1, max_episodes+1):
    state = env.reset()

    if solution_found:
        schedule_counter += 1
        min_reward = reward_schedueler.get_value(schedule_counter)
        tau = tau_schedueler.get_value(schedule_counter)
        trainingSigma = train_schedueler.get_value(schedule_counter)
        actionSigma = sigma_schedueler.get_value(schedule_counter)
        miniBatchSize = -int(batch_schedueler.get_value(schedule_counter))
        lr_scheduler.make_step()
        if schedule_counter == schedule_period:
            fade_param = -1


    if episode <= exploration_trials:
        agent.reset()

    for t in range(max_timesteps):
        # select the agent action
        action = agent.getNoisyAction(state, actionSigma)

        # take action in environment and get r and s'
        next_state, reward, done, truncated = env.step(action)
        tuned_reward = max(reward, min_reward)
        
        agent.buffer.store(state, action, tuned_reward, next_state, done)
        state = next_state
        ep_reward += reward
        amended_ep_reward += tuned_reward

        shouldUpdatePolicy = t % policyDelay == 0
        agent.update(miniBatchSize, trainingSigma, trainingClip, shouldUpdatePolicy, fade_param, tau)

        # stop iterating when the episode finished
        if done or t==(max_timesteps-1):
            break

    # append the episode reward to the reward list
    reward_list.append(ep_reward)

    if amended_ep_reward > stall_reward:
        if not solution_found:
            print("SOLUTION FOUND!!!")
        solution_found = True
        

    ep_reward = 0
    amended_ep_reward = 0

    if episode % plot_interval == 0:
        plot_data.append([episode, np.array(reward_list).mean(), np.array(reward_list).std()])
        reward_list = []
        plt.plot([x[0] for x in plot_data], [x[1] for x in plot_data], '-', color='tab:grey')
        plt.fill_between([x[0] for x in plot_data], [x[1]-x[2] for x in plot_data], [x[1]+x[2] for x in plot_data], alpha=0.2, color='tab:grey')
        plt.xlabel('Episode number')
        plt.ylabel('Episode reward')
        plt.show()
        disp.clear_output(wait=True)



Code for Hardcore Training

In [None]:

seed = 42
T.manual_seed(seed)
env.seed(seed)
random.seed(seed) 
np.random.seed(seed)
env.action_space.seed(seed)

ep_reward = 0
amended_ep_reward = 0
reward_list = []
plot_data = []
learningRate = 0.000526
tau = 0.25  # tracking parameter used to update target networks slowly
gamma = 0.99  # discount factor for rewards
actionSigma = 0.18  # noise in the deterministic policy output
trainingSigma = 0.2  # noise for the target actions
trainingClip = 0.5  # clip limit for target actions
miniBatchSize = 100  # size of the batch size
policyDelay = 2  # number of steps to wait before updating the policy
min_reward = -8
render = True
fade_param = 7
exploration_trials = 5
buffer_size = 2_000_000

# initialise agent
agent = Agent(env, learningRate, gamma, tau, buffer_size)
max_episodes = 3000
max_timesteps = 2000
plot_interval = 10 # update the plot every n episodes
train_between_episodes = 0
stall_reward = 280
solution_found = False

schedule_counter = 0
schedule_period = 800

scheduler_type = LinearScheduler

tau_schedueler = scheduler_type(tau, 0.05, schedule_period)
sigma_schedueler = scheduler_type(actionSigma, 0.1, schedule_period)
train_schedueler = scheduler_type(trainingSigma, 0.125, schedule_period)
reward_schedueler = scheduler_type(min_reward, -30, schedule_period)
batch_schedueler = scheduler_type(-miniBatchSize, -512, schedule_period)
lr_scheduler = CosineSchedulerTorch([agent.actor.optimizer, agent.critic1.optimizer, agent.critic2.optimizer], 0.00005, schedule_period)

# training procedure:
for episode in range(1, max_episodes+1):
    state = env.reset()

    if solution_found:
        schedule_counter += 1
        min_reward = reward_schedueler.get_value(schedule_counter)
        tau = tau_schedueler.get_value(schedule_counter)
        trainingSigma = train_schedueler.get_value(schedule_counter)
        actionSigma = sigma_schedueler.get_value(schedule_counter)
        miniBatchSize = -int(batch_schedueler.get_value(schedule_counter))
        lr_scheduler.make_step()

    if episode <= exploration_trials:
        agent.reset()

    for t in range(max_timesteps):
        # select the agent action
        action = agent.getNoisyAction(state, actionSigma)

        # take action in environment and get r and s'
        next_state, reward, done, truncated = env.step(action)
        tuned_reward = max(reward, min_reward)
        
        agent.buffer.store(state, action, tuned_reward, next_state, done)
        state = next_state
        ep_reward += reward
        amended_ep_reward += tuned_reward

        shouldUpdatePolicy = t % policyDelay == 0
        agent.update(miniBatchSize, trainingSigma, trainingClip, shouldUpdatePolicy, fade_param, tau)

        # stop iterating when the episode finished
        if done or t==(max_timesteps-1):
            break

    # append the reward to a list
    reward_list.append(ep_reward)

    if amended_ep_reward > stall_reward:
        if not solution_found:
            print("SOLUTION FOUND!!!")
        solution_found = True
        

    ep_reward = 0
    amended_ep_reward = 0

    if episode % plot_interval == 0:
        plot_data.append([episode, np.array(reward_list).mean(), np.array(reward_list).std()])
        reward_list = []
        plt.plot([x[0] for x in plot_data], [x[1] for x in plot_data], '-', color='tab:grey')
        plt.fill_between([x[0] for x in plot_data], [x[1]-x[2] for x in plot_data], [x[1]+x[2] for x in plot_data], alpha=0.2, color='tab:grey')
        plt.xlabel('Episode number')
        plt.ylabel('Episode reward')
        plt.show()
        disp.clear_output(wait=True)
