Notebook that contains a baseline environment run and a DDPG solver to solve the environment.

# Baseline 

In [48]:
import gym
import time 
env = gym.make("BipedalWalker-v3", render_mode="human", hardcore=True)
env.action_space.seed(42)

observation, info = env.reset(seed=42)
env.render()
for _ in range(1000):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    if terminated or truncated:
        observation, info = env.reset()
env.close()

self.finish_x: 542.0


# Solving with DDPG

In [12]:
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [45]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

from collections import deque
from torch.autograd import Variable

# Hyperparameters
BATCH_SIZE = 128
GAMMA = 0.99
TAU = 0.001
LR_ACTOR = 1e-3
LR_CRITIC = 1e-3
BUFFER_SIZE = int(1e6)
MAX_EPISODES = 1000
MAX_STEPS = 2000
EPSILON = 1.0
EPSILON_DECAY = 1e-6
EPSILON_MIN = 0.01

# Environment
env = gym.make('BipedalWalker-v3', hardcore = True)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_high = env.action_space.high
action_low = env.action_space.low

# Actor network
class Actor(nn.Module):
    def __init__(self):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, action_dim)
        
    def forward(self, x):
        x = nn.functional.relu(self.fc1(x))
        x = nn.functional.relu(self.fc2(x))
        x = nn.functional.tanh(self.fc3(x))
        return x.detach().numpy() * action_high

# Critic network
class Critic(nn.Module):
    def __init__(self):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 1)
        
    def forward(self, state, action):

        x = torch.cat((state, action), 1)
        x = nn.functional.relu(self.fc1(x))
        x = nn.functional.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Replay buffer
class ReplayBuffer:
    def __init__(self):
        self.buffer = deque(maxlen=BUFFER_SIZE)
        
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
        
    def sample(self, batch_size):
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)
        
    def __len__(self):
        return len(self.buffer)

# DDPG solver
class DDPG:
    def __init__(self):
        self.actor = Actor()
        self.target_actor = Actor()
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.critic = Critic()
        self.target_critic = Critic()
        self.target_critic.load_state_dict(self.critic.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LR_ACTOR)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LR_CRITIC)
        self.replay_buffer = ReplayBuffer()
        self.epsilon = EPSILON
        
    def act(self, state):

        state = Variable(torch.from_numpy(state).float().unsqueeze(0))
        self.actor.eval()
        action = self.actor(state).data
        self.actor.train()
        action += self.epsilon * np.random.normal(size=action_dim)
        return np.clip(action, action_low, action_high)[0]
        
    def learn(self):
        if len(self.replay_buffer) < BATCH_SIZE:
            return
        
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
        
        states = Variable(torch.from_numpy(states).float())
        actions = Variable(torch.from_numpy(actions).float())
        rewards = Variable(torch.from_numpy(rewards).float())
        next_states = Variable(torch.from_numpy(next_states).float())
        dones = Variable(torch.from_numpy(dones).float())
        
        # Update critic
        next_actions = self.target_actor(next_states)
        q_targets_next = self.target_critic(next_states, torch.tensor(next_actions))
        q_targets = rewards + (GAMMA * q_targets_next * (1 - dones))
        q_expected = self.critic(states, actions)
        critic_loss = nn.functional.mse_loss(q_expected, q_targets)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 1)
        self.critic_optimizer.step()
        
        # Update actor
        actions_pred = self.actor(states)
        actor_loss = -self.critic(states, torch.tensor(actions_pred)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        # Update target networks
        self.soft_update(self.critic, self.target_critic)
        self.soft_update(self.actor, self.target_actor)
        
        # Decay exploration rate
        self.epsilon = max(self.epsilon - EPSILON_DECAY, EPSILON_MIN)
        
    def soft_update(self, local_model, target_model):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(TAU * local_param.data + (1 - TAU) * target_param.data)


self.finish_x: 542.0


# Training 

In [46]:
ddpg = DDPG()
ddpg.actor.to(device)
ddpg.critic.to(device)
ddpg.target_actor.to(device)
ddpg.target_critic.to(device)
scores = []
for episode in range(1, MAX_EPISODES+1):
    state = env.reset()[0]
    score = 0
    for step in range(1, MAX_STEPS+1):
        action = ddpg.act(state)
        next_state, reward, done, _,_ = env.step(action)
        ddpg.replay_buffer.add(state, action, reward, next_state, done)
        state = next_state
        score += reward
        ddpg.learn()
        if done:
            break
    scores.append(score)
    print('Episode %d Score: %.2f' % (episode, score))
    if episode % 100 == 0:
        print('Episode %d Average Score: %.2f' % (episode, np.mean(scores[-100:])))

  critic_loss = nn.functional.mse_loss(q_expected, q_targets)


Episode 1 Score: -203.45
Episode 2 Score: -103.88
Episode 3 Score: -202.41
Episode 4 Score: -189.00
Episode 5 Score: -101.68
Episode 6 Score: -100.58
Episode 7 Score: -200.48
Episode 8 Score: -203.15
Episode 9 Score: -203.12
Episode 10 Score: -201.44
Episode 11 Score: -200.31
Episode 12 Score: -199.09
Episode 13 Score: -200.45
Episode 14 Score: -202.03
Episode 15 Score: -102.95
Episode 16 Score: -101.26
Episode 17 Score: -200.40
Episode 18 Score: -102.98
Episode 19 Score: -122.32
Episode 20 Score: -189.75
Episode 21 Score: -157.76
Episode 22 Score: -105.36
Episode 23 Score: -103.82
Episode 24 Score: -101.71
Episode 25 Score: -129.19
Episode 26 Score: -103.72
Episode 27 Score: -124.44
Episode 28 Score: -197.95
Episode 29 Score: -199.37
Episode 30 Score: -201.25
Episode 31 Score: -188.37
Episode 32 Score: -118.17
Episode 33 Score: -140.34
Episode 34 Score: -101.17
Episode 35 Score: -201.64
Episode 36 Score: -197.50
Episode 37 Score: -197.36
Episode 38 Score: -200.08
Episode 39 Score: -11