In [1]:
from IPython.display import clear_output

## Set up a virtual display

Mujoco rendering requires a display(even if we want to just generate rgb_array frames). A colab notebook does not have a display. Therefore we need to account for that by creating a virtual display

In [2]:
%%capture
!apt install -y python3-opengl
!apt install -y ffmpeg
!apt install -y xvfb
!pip3 install pyvirtualdisplay

clear_output()

In [3]:
from pyvirtualdisplay import Display

display = Display(visible=0, size=(1400, 900))
display.start()

<pyvirtualdisplay.display.Display at 0x7a253ca9cf10>

In [4]:
%pip install gymnasium[mujoco]

clear_output()

In [5]:
# %pip install torch numpy matplotlib

clear_output()

# Contents

In this notebook, you will implement the DDPG algo using pytorch and then use it to train the **walker** environment.

- Write code to define and train the agent
- Also include a visualization of the agent's performance in the form of a video

Walker environment consists of a structure of legs. The agent's actions can move the joints. The goal is to make the structure able to walk.

You can see more about the actions, observations and rewards [here](https://gymnasium.farama.org/environments/mujoco/walker2d/)

![Walker Image](https://gymnasium.farama.org/_images/walker2d.gif)


In [6]:
import torch
import gymnasium as gym
import numpy as np
import collections
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

## Initializing the environment

### Solve here

write the code to define and train the agent:

In [8]:
Experience = collections.namedtuple(typename="Experience", field_names=['state', 'action', 'reward', 'done', 'nextState'])


class ExperienceBuffer(object):
    def __init__(self, args):
        self.buffer = collections.deque(maxlen=args['replay_size'])

    def __len__(self):
        return len(self.buffer)

    def append(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=True)
        states, actions, rewards, done, next_states = zip(*[self.buffer[idx] for idx in indices])
        return np.array(states), actions, np.array(rewards, dtype=np.float32), done, np.array(next_states)


class Actor(nn.Module):
    def __init__(self, input_dim, output_dim, action_scale):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(input_dim, 400)
        self.fc2 = nn.Linear(400, 300)
        self.fc3 = nn.Linear(300, output_dim)
        self.action_scale = action_scale

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.action_scale * torch.tanh(self.fc3(x))
        return x


class Critic(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(input_dim, 400)
        self.fc2 = nn.Linear(400, 300)
        self.fc3 = nn.Linear(300, output_dim)

    def forward(self, state, action):
        x = F.relu(self.fc1(torch.cat([state, action], 1)))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class Agent(object):
    def __init__(self, env, exp_buffer, args):
        super(Agent, self).__init__()
        self.env = env
        self.exp_buffer = exp_buffer
        self.args = args
        self.actor = None
        self.critic = None
        self.target_actor = None
        self.target_critic = None
        self.build_model()
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=self.args['actor_lr'])
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=self.args['critic_lr'])

    def build_model(self):
        obs_dim = self.env.observation_space.shape[0]
        action_dim = self.env.action_space.shape[0]
        action_scale = self.env.action_space.high[0]

        self.actor = Actor(input_dim=obs_dim, output_dim=action_dim, action_scale=action_scale).to(device)
        self.target_actor = Actor(input_dim=obs_dim, output_dim=action_dim, action_scale=action_scale).to(device)
        self.target_actor.load_state_dict(self.actor.state_dict())

        self.critic = Critic(input_dim=obs_dim+action_dim, output_dim=1).to(device)
        self.target_critic = Critic(input_dim=obs_dim+action_dim, output_dim=1).to(device)
        self.target_critic.load_state_dict(self.critic.state_dict())

    def choose_action(self, state):
        x = torch.unsqueeze(torch.FloatTensor(state).to(device), 0)
        return self.actor(x)

    def store_transition(self, state, action, r, done, state_next):
        exp = Experience(state, action, r, done, state_next)
        self.exp_buffer.append(exp)

    def learn(self):
        for _ in range(0, self.args['update_iteration']):

            buffer = self.exp_buffer.sample(self.args['batch_size'])
            states, actions, rewards, done, next_states = buffer

            states_tensor = torch.FloatTensor(states).to(device)
            actions_tensor = torch.FloatTensor(actions).to(device)
            rewards_tensor = torch.unsqueeze(torch.FloatTensor(rewards), 1).to(device)
            done_tensor = torch.unsqueeze(1-torch.FloatTensor(done), 1).to(device)
            next_states_tensor = torch.FloatTensor(next_states).to(device)

            # Compute the target Q value
            target_q = self.target_critic(next_states_tensor, self.target_actor(next_states_tensor))
            target_q = rewards_tensor + (done_tensor * self.args['gamma'] * target_q).detach()

            # Get current Q estimate
            current_q = self.critic(states_tensor, actions_tensor)

            # Compute critic loss
            critic_loss = F.mse_loss(current_q, target_q)
            self.critic_optimizer.zero_grad()
            critic_loss.backward()
            self.critic_optimizer.step()

            # Compute actor loss
            actor_loss = -self.critic(states_tensor, self.actor(states_tensor)).mean()
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # Update the frozen target models
            for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
                target_param.data.copy_(self.args['tau'] * param.data + (1 - self.args['tau']) * target_param.data)

            for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
                target_param.data.copy_(self.args['tau'] * param.data + (1 - self.args['tau']) * target_param.data)

### Visualization

You are provided with some functions which will help you visualize the results as a video.
Feel free to wrie your own code for visualization if you prefer

In [9]:
def frames_to_video(frames, fps=24):
    # DO NOT MODIFY: This function is for visualization

    fig = plt.figure(figsize=(frames[0].shape[1] / 100, frames[0].shape[0] / 100), dpi=100)
    ax = plt.axes()
    ax.set_axis_off()

    if len(frames[0].shape) == 2:  # Grayscale image
        im = ax.imshow(frames[0], cmap='gray')
    else:  # Color image
        im = ax.imshow(frames[0])

    def init():
        if len(frames[0].shape) == 2:
            im.set_data(frames[0], cmap='gray')
        else:
            im.set_data(frames[0])
        return im,

    def update(frame):
        if len(frames[frame].shape) == 2:
            im.set_data(frames[frame], cmap='gray')
        else:
            im.set_data(frames[frame])
        return im,

    interval = 1000 / fps
    anim = FuncAnimation(fig, update, frames=len(frames), init_func=init, blit=True, interval=interval)
    plt.close()
    return HTML(anim.to_html5_video())

In [10]:
args = {
    'replay_size': 20000,
    'batch_size': 128,
    'actor_lr': 1e-4,
    'critic_lr': 1e-3,
    'tau': 0.005,
    'exploration_noise': 0.1,
    'update_iteration': 200,
    'gamma': 0.99
}

In [11]:
buffer = ExperienceBuffer(args=args)
env = gym.make('Walker2d-v4', render_mode='rgb_array')
agent = Agent(env, buffer, args)

In [12]:
state = env.reset()[0]
frames = []

while True:
    with torch.no_grad():
        action = agent.choose_action(state).cpu().numpy().flatten()
        state_next, r, done, truncated, info = env.step(action)
        frames.append(env.render())
        state = state_next
        if done or truncated:
            break

In [13]:
frames_to_video(frames)

In [None]:
for epoch in range(20000):
    state, done, truncated = env.reset()[0], False, False
    episode_r = []
    while (not done) and (not truncated):
        action = agent.choose_action(state).cpu().detach().numpy().flatten()
        action = (action + np.random.normal(0, args['exploration_noise'], size=env.action_space.shape[0])).clip(
            env.action_space.low, env.action_space.high)
        state_next, r, done, truncated, info = env.step(action)
        agent.store_transition(state, action, r, done, state_next)
        if not done:
            state = state_next
        episode_r.append(r)
    agent.learn()
    print("epoch: {} | avg_r: {} | ep_r: {} | len_ep {}".format(epoch, np.sum(episode_r) / len(episode_r),
                                                                sum(episode_r), len(episode_r)))
env.close()

  actions_tensor = torch.FloatTensor(actions).to(device)


epoch: 0 | avg_r: 0.014946452540169729 | ep_r: 0.418500671124752 | len_ep 28
epoch: 1 | avg_r: -0.5513231763658478 | ep_r: -6.064554940024326 | len_ep 11
epoch: 2 | avg_r: 1.0699022822616011 | ep_r: 74.8931597583121 | len_ep 70
epoch: 3 | avg_r: -0.692296237428386 | ep_r: -78.921771066836 | len_ep 114
epoch: 4 | avg_r: -0.7035824155495292 | ep_r: -75.98690087934912 | len_ep 108
epoch: 5 | avg_r: -0.5035193117150446 | ep_r: -11.580944169446028 | len_ep 23
epoch: 6 | avg_r: 0.1446404034356678 | ep_r: 2.748167665277686 | len_ep 19
epoch: 7 | avg_r: 1.4440419520560486 | ep_r: 142.96015325354878 | len_ep 99
epoch: 8 | avg_r: 1.5074954262635312 | ep_r: 309.0365623840237 | len_ep 205
epoch: 9 | avg_r: 0.40884692171880804 | ep_r: 77.68091512657352 | len_ep 190
epoch: 10 | avg_r: 1.6230039119597295 | ep_r: 301.8787276245094 | len_ep 186
epoch: 11 | avg_r: 1.6454831127890677 | ep_r: 292.89599407645386 | len_ep 178
epoch: 12 | avg_r: 1.715582576750394 | ep_r: 288.2178728940662 | len_ep 168
epoch:

In [None]:
state = env.reset()[0]
frames = []

while True:
    with torch.no_grad():
        action = agent.choose_action(state).cpu().numpy().flatten()
        state_next, r, done, truncated, info = env.step(action)
        frames.append(env.render())
        state = state_next
        if done or truncated:
            break

In [None]:
frames_to_video(frames)