## Deep Deterministic Policy Gradient (DDPG)

#### Import the necessary code libraries

In [1]:
from pyvirtualdisplay import Display
Display(visible=False, size=(1400, 900)).start()

<pyvirtualdisplay.display.Display at 0x7f90efc4b280>

In [2]:
import copy
import gym
import torch
import random
import functools

import numpy as np
import torch.nn.functional as F

from collections import deque, namedtuple
from IPython.display import HTML
from base64 import b64encode

from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from torch.optim import AdamW

from pytorch_lightning import LightningModule, Trainer

import brax
from brax.v1 import envs
from brax.v1.envs.to_torch import JaxToTorchWrapper

# from brax.envs import to_torch#this will ensure that our environments can use pytorch tensors
from brax.v1.io import html#this will allow us to display our environments in the notebook
# from gym.wrappers import RecordVideo, RecordEpisodeStatistics

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
num_gpus = torch.cuda.device_count()

v = torch.ones(1, device='cuda')#if we plan on using the gpu, we need to run this line, 
# #we have to create this tensor before we do anything with brax otherwise, brax will be a little bit greedy and it will suck up memory from gpu
import jax

jax.config.update('jax_platform_name', 'gpu')

In [3]:
print(device)

cuda:0


In [4]:
def display_video(episode=0):
  video_file = open(f'/content/videos/rl-video-episode-{episode}.mp4', "r+b").read()
  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
  return HTML(f"<video width=600 controls><source src='{video_url}'></video>")

In [5]:
entry_point = functools.partial(envs.create_gym_env, env_name='ant')#we are doing this to interface this with the gym library so that 
#we don't have to change our coding style that we have learnt with the gym library
gym.register('brax-ant-v0', entry_point=entry_point)

In [6]:
def create_environment(env_name, num_envs=256, episode_length=1000):
  env = gym.make(env_name, batch_size=num_envs, episode_length=episode_length)#in physics based simulation engines, we can pass a parameter called batch_size which will decide 
  #how many copies of the environment will be running at the same time. This will help us create the required replay buffer and the episode_length is like a limit on the number of steps
  #or more simply, how long we wish for an episode to run
  # env = RecordVideo(env, video_folder='./videos2', episode_trigger=lambda x: x % 100 == 0)
  env = JaxToTorchWrapper(env, device=device)#brax is a physics engine tool that uses a different numerical computing tool to run simulation called JAX but our tool of choice is called 
  #pytorch, so by callling this wrapper, the environment will take up our pytorch tensors, convert them into the right format, use them internally, and then speed back pytorch sensors so that we can work with that
  return env

In [7]:
@torch.no_grad()
def test_env(env_name, policy=None):
  env = gym.make(env_name, episode_length=1000)
  env = JaxToTorchWrapper(env, device=device)
  # print(env)
  # print(env.unwrapped.unwrapped)
  # print(env.unwrapped._state)
  # print(env.unwrapped._env)
  qp_array = []
  state = env.reset()
  for i in range(1000):
    if policy:
      action = algo.policy.net(state.unsqueeze(0)).squeeze()
    else:
      action = env.action_space.sample()
    state, _, _, _ = env.step(action)
    qp_array.append(env.unwrapped._state.qp)
  return HTML(html.render(env.unwrapped._env.sys, qp_array))

#### Create the gradient policy

In [8]:
class GradientPolicy(nn.Module):

  def __init__(self, hidden_size, obs_size, out_dims, min, max):
    super().__init__()
    self.min = torch.from_numpy(min).to(device)
    self.max = torch.from_numpy(max).to(device)
    self.net = nn.Sequential(
        nn.Linear(obs_size, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, out_dims),
        nn.Tanh()
    )
    
  def mu(self, x):
    if isinstance(x, np.ndarray):
      x = torch.from_numpy(x).to(device)
    return self.net(x.float()) * self.max

  def forward(self, x, epsilon=0.0):
    mu = self.mu(x)
    mu = mu + torch.normal(0, epsilon, mu.size(), device=mu.device)
    action = torch.max(torch.min(mu, self.max), self.min)#we have to clip the value of the
    action = action.cpu().numpy()
    return action


#### Create the Deep Q-Network

In [9]:
class DQN(nn.Module):

  def __init__(self, hidden_size, obs_size, out_dims):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(obs_size + out_dims, hidden_size),#why is the input dimension like this?
        #this neural network will take input as an observation and also an action, so we will have concatenate the value of the state and the action before it can be fed into the neural network
        nn.ReLU(),
        nn.Linear(hidden_size, hidden_size),
        nn.ReLU(),           
        nn.Linear(hidden_size, 1),
    )

  def forward(self, state, action):
    if isinstance(state, np.ndarray):
      state = torch.from_numpy(state).to(device)
    if isinstance(action, np.ndarray):
      action = torch.from_numpy(action).to(device)
    in_vector = torch.hstack((state, action))#stacking the state and action horizontally
    return self.net(in_vector.float())


In [10]:
class ReplayBuffer:

  def __init__(self, capacity):
    self.buffer = deque(maxlen=capacity)

  def __len__(self):
    return len(self.buffer)
  
  def append(self, experience):
    self.buffer.append(experience)
  
  def sample(self, batch_size):
    return random.sample(self.buffer, batch_size)

In [11]:
class RLDataset(IterableDataset):

  def __init__(self, buffer, sample_size=400):
    self.buffer = buffer
    self.sample_size = sample_size
  
  def __iter__(self):
    for experience in self.buffer.sample(self.sample_size):
      yield experience

In [12]:
def polyak_average(net, target_net, tau=0.01):
    for qp, tp in zip(net.parameters(), target_net.parameters()):
        tp.data.copy_(tau * qp.data + (1 - tau) * tp.data)

In [13]:
class DDPG(LightningModule):

  def __init__(self, env_name, capacity=500, batch_size=8192, actor_lr=1e-3, 
               critic_lr=1e-3, hidden_size=256, gamma=0.99, loss_fn=F.smooth_l1_loss, 
               optim=AdamW, eps_start=1.0, eps_end=0.2, eps_last_episode=500, 
               samples_per_epoch=10, tau=0.005):

    super().__init__()
    self.automatic_optimization = False  # Disable automatic optimization
    self.env = create_environment(env_name, num_envs=batch_size)#this time, batch_size denotes the number of paraller environments that will run parallely
    #each entry in the the replay buffer will have 8192 entries in itself
    # print(type(self.env))
    # print(type(self.env.reset()))
    self.obs = self.env.reset()
    self.videos = []

    obs_size = self.env.observation_space.shape[1]#why like this?
    #remember, now we are learning parallel environments
    #if the observation of a single environment has a size of 8, then 
    #then shape of the observation space would be [8192, 8]
    action_dims = self.env.action_space.shape[1]
    #same reason as earlier as to why we are taking the second index this time
    max_action = self.env.action_space.high
    min_action = self.env.action_space.low

    self.q_net = DQN(hidden_size, obs_size, action_dims)
    self.policy = GradientPolicy(hidden_size, obs_size, action_dims, min_action, max_action)

    self.target_policy = copy.deepcopy(self.policy)
    self.target_q_net = copy.deepcopy(self.q_net)

    self.buffer = ReplayBuffer(capacity=capacity)

    self.save_hyperparameters()

    while len(self.buffer) < self.hparams.samples_per_epoch:
      print(f"{len(self.buffer)} samples in experience buffer. Filling...")
      self.play_episode(epsilon=self.hparams.eps_start)

  @torch.no_grad()
  def play_episode(self, policy=None, epsilon=0.):
    #in the previous sections, we have the similar method, where we play an entire episode to sample an experience
    #but this time, every time we take an action in the environment, we get 8192 different observations.
    #so, instead of playing an episode every time, when we call the play_episode function, we'll simply play one move
    if policy:
      action = policy(self.obs, epsilon=epsilon)
    else:
      action = self.env.action_space.sample()
    next_obs, reward, done, info = self.env.step(action)
    exp = (self.obs, action, reward, done, next_obs)
    self.buffer.append(exp)
    self.obs = next_obs
    return reward.mean()

  def forward(self, x):#what happens when someone calls this DDPG class on a state
    output = self.policy(x)
    return output

  def configure_optimizers(self):
    #the following line of code was added due to the following error
    #RuntimeError: Training with multiple optimizers is only supported with manual optimization. Remove the `optimizer_idx` argument from `training_step`, set `self.automatic_optimization = False` and access your optimizers in `training_step` with `opt1, opt2, ... = self.optimizers()`.
    
    q_net_optimizer = self.hparams.optim(self.q_net.parameters(), lr=self.hparams.critic_lr)
    policy_optimizer = self.hparams.optim(self.policy.parameters(), lr=self.hparams.actor_lr)
    return [q_net_optimizer, policy_optimizer]#in this alogrithm, we'll call the training step method, twice, once with the first optimiser and once with the second optimiser
    #so that both the actor and the critic have the opportunity to update their weights with the same batch of data

  def train_dataloader(self):
    dataset = RLDataset(self.buffer, self.hparams.samples_per_epoch)
    dataloader = DataLoader(
        dataset=dataset,
        batch_size=1,#because we know every observation in our replay buffer will have 8192 observations, so each individual observation we store will effectively be a batch
        #we make this value 1 to avoid making batches of batches
    )
    return dataloader

  def training_step(self, batch, batch_idx):#since we have an optimiser for the q-network and an optimiser for the policy, the training step will be called twice for every batch of data
    #once with the first optimiser, that is, optimiser of the q-network, and another time with the optimiser of the policy network
    epsilon = max(
        self.hparams.eps_end,
        self.hparams.eps_start - self.current_epoch / self.hparams.eps_last_episode
    )

    mean_reward = self.play_episode(policy=self.policy, epsilon=epsilon)#remember, play episode is now just a single step, but not in one environment, but in 8192 environments
    self.log("episode/mean_reward", mean_reward)

    

    polyak_average(self.q_net, self.target_q_net, tau=self.hparams.tau)
    polyak_average(self.policy, self.target_policy, tau=self.hparams.tau)

    states, actions, rewards, dones, next_states = map(torch.squeeze, batch)
    #what are we doing here? Remember that the dataloader loads batches of data. This time, it will load a batch of 1 because in a single observation, we already have a batch of data
    #but that will give us the data in the wrong format. It will give us a batch of data with the size (1, 8192, ..), but it is wrong because 8192 is already the correct size, what we need is (8192, ...)
    #so what we will do is apply torch.squeez to each element of the batch using the map function, the squeeze operation will look for indices, where we only have one item and therefore, it's a dimension we can kill
    #the problem is now rewards and dones are flat lists
    #we'll do the same thing as we did in previous sections
    rewards = rewards.unsqueeze(1)
    dones = dones.unsqueeze(1).bool()

    #the following are changes that are added by me
    opt_q_net, opt_policy = self.optimizers()
    # if isinstance(opt_q_net, torch.optim.Optimizer):
    # print('Inside q_network loss optimsation')
    state_action_values = self.q_net(states, actions)
    next_state_values = self.target_q_net(next_states, self.target_policy.mu(next_states))
    next_state_values[dones] = 0.0
    expected_state_action_values = rewards + self.hparams.gamma * next_state_values
    q_loss = self.hparams.loss_fn(state_action_values, expected_state_action_values)
    opt_q_net.zero_grad()
    q_loss.backward()
    opt_q_net.step()
    # self.log_dict({"episode/Q-Loss": q_loss})
    # return q_loss
    
    # elif isinstance(opt_policy, torch.optim.Optimizer):
    # print("Inside policy network loss optimisation")
    mu = self.policy.mu(states)
    policy_loss = - self.q_net(states, mu).mean()#what exactly are we doing here?
    opt_policy.zero_grad()
    policy_loss.backward()
    opt_policy.step()
    #the better the actions of the policy, the higher will be the q-values of the actions taken by that policy in a specific set of stateas
    #so we want to increase the values produced by the q-network by modifying only the actions taken by the policy
    #that is, here the q-network remains constant, and the better our actions, the higher the estimates of the q-newtork of the values of those actions
    #so by passing those actions through the q-network and trying to maximise their value, will be improving the performance of the policy
    #but pytorch doesn't have a mechanism to maximise a value, it can only minismise them. Therefore, what we did is to place the negative sign of those values
    #because minimising the negative of the value will achieve the same thing as maximising the values themselves. Then we compute the mean of those values and then we have the loss of our policy
    self.log_dict({"episode/Policy Loss": policy_loss})
    # return q_loss, policy_loss
  
  def on_train_epoch_end(self):
    if self.current_epoch % 100 == 0:
      video = test_env(self.env.spec.id, policy=self.policy)
      self.videos.append(video)

In [14]:
# Start tensorboard.
# !rm -r /content/lightning_logs/
# !rm -r /content/videos/
%load_ext tensorboard
%tensorboard --logdir /lightning_logs/

ERROR: Failed to start `tensorboard`: [Errno 8] Exec format error:
'tensorboard'

In [15]:
torch.cuda.empty_cache()

In [16]:
algo = DDPG('brax-ant-v0')

trainer = Trainer(
    # gpus=num_gpus, 
    max_epochs=10_000,
    log_every_n_steps=10
)

trainer.fit(algo)

  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


0 samples in experience buffer. Filling...


  logger.deprecation(
  if not isinstance(done, (bool, np.bool8)):
  logger.warn(
  logger.warn(
  logger.warn("Casting input x to numpy array.")
  logger.warn(


1 samples in experience buffer. Filling...
2 samples in experience buffer. Filling...
3 samples in experience buffer. Filling...
4 samples in experience buffer. Filling...
5 samples in experience buffer. Filling...
6 samples in experience buffer. Filling...
7 samples in experience buffer. Filling...
8 samples in experience buffer. Filling...
9 samples in experience buffer. Filling...


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 3060 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type           | Params | Mode 
---------------------------------------------------------
0 | q_net         | DQN            | 90.6 K | train
1 | policy        | GradientPolicy | 90.4 K | train
2 | target_policy | GradientPolicy | 90.4 K | train
3 | target_q_net  | DQN            | 90.6 K | train
---------------------------------------------------------
362 K     Trainable params
0         Non-trainable params
362 K     Total params
1.448    

Epoch 0: |          | 10/? [00:01<00:00,  7.65it/s, v_num=30]

  logger.warn(
  logger.warn(
  logger.warn(
  logger.deprecation(
  if not isinstance(done, (bool, np.bool8)):
  logger.warn(
  logger.warn(
  logger.warn("Casting input x to numpy array.")
  logger.warn(


Epoch 9999: |          | 10/? [00:00<00:00, 39.52it/s, v_num=30]

`Trainer.fit` stopped: `max_epochs=10000` reached.


Epoch 9999: |          | 10/? [00:00<00:00, 37.78it/s, v_num=30]


In [18]:
algo.videos[-5]

In [20]:
print(type(algo.videos[1]))

<class 'IPython.core.display.HTML'>


In [21]:
import re
import base64

def extract_video_data(html_obj):
    video_data = None
    html_str = str(html_obj.data)
    match = re.search(r'data:video/mp4;base64,(.*?)"', html_str)
    if match:
        video_data = match.group(1)
    return video_data

def save_video(video_data, filename):
    video_bytes = base64.b64decode(video_data)
    with open(filename, 'wb') as video_file:
        video_file.write(video_bytes)

# Extract and save all videos
video_data_list = [extract_video_data(video) for video in algo.videos]
for idx, video_data in enumerate(video_data_list):
    if video_data:  # Ensure the video data is not None
        save_video(video_data, f'video_{idx}.mp4')

print("Videos saved successfully.")


Videos saved successfully.
