**Dependencies and setup**

This can take a minute or so...

In [None]:
### References:
# S. Fujimoto. TD3. https://github.com/sfujim/TD3/blob/master/TD3.py. [Online; accessed 09-Feb-2023]. 2018.
# F. Hu. BipedelWalker. https://github.com/FranciscoHu17/BipedalWalker. [Online; accessed 09-Feb-2023]. 2021.

In [None]:
%%capture
!apt update
!apt install xvfb -y
!pip install 'swig'
!pip install 'pyglet==1.5.27'
!pip install 'gym[box2d]==0.20.0'
!pip install 'pyvirtualdisplay==3.0'

import gym
import random
import numpy as np
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import sys
import random
from collections import deque
from pyvirtualdisplay import Display
from IPython import display as disp
%matplotlib inline

display = Display(visible=0,size=(600,600))
display.start()
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

plot_interval = 10 # update the plot every N episodes
video_every = 50 # videos can take a very long time to render so only do it every N episodes

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Reinforcement learning agent**

In [None]:
explore_policy = 0.1
alpha = .001
policy_delay = 2
tau = 0.005
noise_policy = 0.2
noise_clip = 0.5

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_actions):
        super(Actor, self).__init__()

        self.l1 = nn.Linear(state_dim, 400)
        self.l2 = nn.Linear(400, 300)
        self.l3 = nn.Linear(300, action_dim)
        self.max_actions = max_actions

    def forward(self, state):
        x = F.relu(self.l1(state))
        x = F.relu(self.l2(x))
        x = self.max_actions * torch.tanh(self.l3(x))
        return x

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()

        self.l1 = nn.Linear(state_dim + action_dim, 400)
        self.l2 = nn.Linear(400, 300)
        self.l3 = nn.Linear(300, 1)

        self.l4 = nn.Linear(state_dim + action_dim, 400)
        self.l5 = nn.Linear(400, 300)
        self.l6 = nn.Linear(300, 1)

    def forward(self, state, action):
        sa = torch.cat([state, action], 1)

        # Q1
        c1 = F.relu(self.l1(sa))
        c1 = F.relu(self.l2(c1))
        c1 = self.l3(c1)

        # Q2
        c2 = F.relu(self.l4(sa))
        c2 = F.relu(self.l5(c2))
        c2 = self.l6(c2)
        return (c1, c2)

class ExperienceReplay:
    def __init__(self, buffer_size, batch_size, device):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size= batch_size
        self.device = device
        self.ptr = 0
        print(self.buffer.maxlen)

    def __len__(self):
        return len(self.buffer)

    def store_transition(self, state, action, reward, new_state, done):
        if self.ptr < self.buffer.maxlen:
            self.buffer.append((state, action, reward, new_state, done))
        else:
            self.buffer[int(self.ptr)] = (state, action, reward, new_state, done)
            self.ptr = (self.ptr + 1) % self.buffer.maxlen

    def sample(self):
        sample = random.sample(self.buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*sample)
        states = torch.from_numpy(np.array(states, dtype=np.float32)).to(self.device)
        actions = torch.from_numpy(np.array(actions, dtype=np.float32)).to(self.device)
        rewards = torch.from_numpy(np.array(rewards, dtype=np.float32).reshape(-1, 1)).to(self.device)
        next_states = torch.from_numpy(np.array(next_states, dtype=np.float32)).to(self.device)
        dones = torch.from_numpy(np.array(dones, dtype=np.uint8).reshape(-1, 1)).float().to(self.device)
        return (states, actions, rewards, next_states, dones)

class Agent(nn.Module):
    def __init__(self, state_dim, action_dim, max_action, env, device):
        super(Agent, self).__init__()

        # Actor network
        self.actor = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target = copy.deepcopy(self.actor)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=alpha)
        self.device = device

        # Critic network
        self.critic = Critic(state_dim, action_dim).to(device) # only needs state + action
        self.critic_target = copy.deepcopy(self.critic)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=alpha)
        self.max_action = max_action
        self.env = env

    def select_action(self, state, noise=0.1):
        state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
        action = self.actor(state).cpu().data.numpy().flatten()
        if(noise == explore_policy):
            action = (action + np.random.normal(0, noise, size=self.env.action_space.shape[0]))

        return self.actor(state).cpu().data.numpy().flatten()

    def save(self):
        torch.save(self.actor.state_dict(), '/content/drive/MyDrive/td3_actor.pth')
        torch.save(self.critic.state_dict(), '/content/drive/MyDrive/td3_critic.pth')
        return

    # def load(self):
    #     self.actor.load_state_dict(torch.load("/content/drive/MyDrive/td3_actor.pth",  map_location=torch.device('cpu')))
    #     self.critic.load_state_dict(torch.load("/content/drive/MyDrive/td3_critic.pth",  map_location=torch.device('cpu')))
    #     return

    def train(self, replay_buffer, current_iteration):
        state, action, reward, next_state, done = replay_buffer.sample()

        tensor_cpy = action.clone().detach()
        noise = tensor_cpy.normal_(0, noise_policy).clamp(-noise_clip, noise_clip)

        next_action = (self.actor_target(next_state) + noise).clamp(-self.max_action, self.max_action)

        target_q1, target_q2 = self.critic_target(next_state, next_action)
        target_q = ((torch.min(target_q1, target_q2)) * (1-done)) + reward
        curr_q1, curr_q2 = self.critic(state, action)

        critic_loss = F.mse_loss(curr_q1, target_q) + F.mse_loss(curr_q2, target_q)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        if (current_iteration % policy_delay == 0):
            actor_loss = -self.critic(state, self.actor(state))[0].mean()

            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

**Prepare the environment and wrap it to capture videos**

In [None]:
%%capture
env = gym.make("BipedalWalker-v3")
#env = gym.make("BipedalWalkerHardcore-v3") # only attempt this when your agent has solved BipedalWalker-v3
env = gym.wrappers.Monitor(env, "/content/drive/MyDrive/video", video_callable=lambda ep_id: ep_id%video_every == 0, force=True)

obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]

In [None]:
print('The environment has {} observations and the agent can take {} actions'.format(obs_dim, act_dim))
print('The device is: {}'.format(device))

if device.type != 'cpu': print('It\'s recommended to train on the cpu for this')

The environment has 24 observations and the agent can take 4 actions
The device is: cpu


In [None]:
seed = 42
torch.manual_seed(seed)
env.seed(seed)
random.seed(seed)
np.random.seed(seed)
env.action_space.seed(seed)

# logging variables
ep_reward = 0
reward_list = []
plot_data = []
log_f = open("agent-log.txt","w+")

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
buffer_size = 1000000
batch_size = 100
noise = 0.1

# initialise agent
agent = Agent(state_dim, action_dim, max_action, env, device)
max_episodes = 1000
max_timesteps = 2000

try:
    print("Loading previous model")
    agent.load()
except Exception as e:
    print('No previous model to load. Training from scratch.')

buffer = ExperienceReplay(buffer_size, batch_size, device)

save_score = 400
episodes = 1000
timesteps = 2000

best_reward = -1*sys.maxsize
scores_over_episodes = []

# training procedure:
for episode in range(1, max_episodes+1):
    ep_reward = 0
    state = env.reset()
    for t in range(max_timesteps):

        # select the agent action
        action = agent.select_action(state) + np.random.normal(0, max_action * noise, size=action_dim)
        action = action.clip(env.action_space.low, env.action_space.high)

        # take action in environment and get r and s'
        next_state, reward, done, _ = env.step(action)
        buffer.store_transition(state, action, reward, next_state, done)
        state = next_state
        ep_reward += reward
        env.render()

        # stop iterating when the episode finished
        if(len(buffer) > batch_size):
            agent.train(buffer, t)
        if(done or t > timesteps):
            scores_over_episodes.append(ep_reward)
            print('Episode ', episode,'finished with reward:', ep_reward)
            print('Finished at timestep ', t)
            break


    # append the episode reward to the reward list
    reward_list.append(ep_reward)

    log_f.write('episode: {}, reward: {}\n'.format(episode, ep_reward))
    log_f.flush()
    ep_reward = 0

    if(np.mean(scores_over_episodes[-50:]) > save_score):
        best_reward = np.mean(scores_over_episodes[-50:])
        save_score = best_reward
        agent.save()
        break
    if(episode >= 0 and ep_reward > best_reward):
        best_reward = ep_reward
        agent.save()

    # print reward data every so often - add a graph like this in your report
    if episode % plot_interval == 0:
        plot_data.append([episode, np.array(reward_list).mean(), np.array(reward_list).std()])
        reward_list = []
        # plt.rcParams['figure.dpi'] = 100
        plt.plot([x[0] for x in plot_data], [x[1] for x in plot_data], '-', color='tab:grey')
        plt.fill_between([x[0] for x in plot_data], [x[1]-x[2] for x in plot_data], [x[1]+x[2] for x in plot_data], alpha=0.2, color='tab:grey')
        plt.xlabel('Episode number')
        plt.ylabel('Episode reward')
        plt.show()
        disp.clear_output(wait=True)
