In [2]:
import os
import time
import random
import numpy as np
import matplotlib.pyplot as plt
import pybullet_envs_gymnasium
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from gymnasium import wrappers
from collections import deque

Initialize Experience Replay

In [3]:
class ReplayBuffer(object):
    def __init__(self, max_size = 1e6):
        self.storage = []
        self.max_size = max_size
        self.ptr = 0 # initial index

    # The add function adds new transitions to the memory: In case of overflow, new transition will be added in the beginning
    def add(self, transition):
        if len(self.storage == self.max_size):
            self.storage[int(self.ptr)] = transition
            self.ptr = (self.ptr + 1) % self.max_size
        else:
            self.storage.append(transition)

    def sample(self, batch_size):
        ind = np.random.randint(0, len(self.storage), size=batch_size)
        # initialize different batches
        batch_states, batch_next_states, batch_actions, batch_rewards, batch_dones = [], [], [], [], []
        for i in ind:
            state, next_state, action, reward, done = self.storage[i]
            batch_states.append(np.array(state, copy=False))
            batch_next_states.append(np.array(next_state, copy=False))
            batch_actions.append(np.array(action, copy=False))
            batch_rewards.append(np.array(reward, copy=False))
            batch_dones.append(np.array(done, copy=False))
        return np.array(batch_states), np.array(batch_next_states), np.array(batch_actions), np.array(batch_rewards).reshape(-1, 1), np.array(batch_dones).reshape(-1, 1)


Create one NN for Actor Model and one NN for Actor Target

In [4]:
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action): # the max action variable is used to clip the actions to a range
        super(Actor, self).__init__()
        self.layer_1 = nn.Linear(state_dim, 400)
        self.layer_2 = nn.Linear(400, 300)
        self.layer_3 = nn.Linear(300, action_dim)
        self.max_action = max_action

    def forward(self, x): # function that forward propagates the signal
        x = F.relu(self.layer_1(x))
        x = F.relu(self.layer_2(x))
        x = torch.tanh(self.layer_3(x)) * self.max_action # this is done because value given by TanH is between -1 and 1.
        return x

Create two NN for critic models and two NN for critic targets

In [5]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim): # the max action variable is used to clip the actions to a range
        super(Critic, self).__init__()
        # First Critic NN
        self.layer_1 = nn.Linear(state_dim + action_dim, 400)
        self.layer_2 = nn.Linear(400, 300)
        self.layer_3 = nn.Linear(300, 1) # output is going to be a single Q-value
        # Second Critic NN
        self.layer_4 = nn.Linear(state_dim + action_dim, 400)
        self.layer_5 = nn.Linear(400, 300)
        self.layer_6 = nn.Linear(300, 1)

    def forward(self, x, u): # function that forward propagates the signal (x -> state and u -> action)
        xu = torch.concat([x, u], axis=1) # axis = 1 -> vertical concatination, axis = 0 -> horizontal concatenation
        # Forward prop on 1st critic
        x1 = F.relu(self.layer_1(xu))
        x1 = F.relu(self.layer_2(x1))
        x1 = self.layer_3(x1)
        # Forward prop on 2nd critic
        x2 = F.relu(self.layer_4(xu))
        x2 = F.relu(self.layer_5(x2))
        x2 = self.layer_6(x2)
        return x1, x2
    
    def Q1(self, x, u): # function that forward propagates the signal (x -> state and u -> action)
        xu = torch.concat([x, u], axis=1) # axis = 1 -> vertical concatination, axis = 0 -> horizontal concatenation
        x1 = F.relu(self.layer_1(xu))
        x1 = F.relu(self.layer_2(x1))
        x1 = self.layer_3(x1)
        return x1

Training Process

In [6]:
# Select device
device = torch.device("cuda" if torch.cuda.is_available else "cpu")

# Building the whole training process into a class
class TD3(object):
    def __init__(self, state_dim, action_dim, max_action):
        # Actor
        self.actor = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters())
        # Critic
        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters())
        self.max_action = max_action

    def select_action(self, state):
        state = torch.Tensor(state.reshape(-1, 1)).to(device) # creating a torch tensor from 1D nu,py array
        return self.actor(state).cpu().data.numpy.flatten()
    
    def train(self, replay_buffer, iterations, batch_size=100, discount_factor=0.99, tau=0.005, policy_noise=0.2, noise_clip=0.5, policy_freq=2):
        for it in range(iterations):
            # Step 4: Sample a batch of transitions from the memory
            batch_states, batch_next_states, batch_actions, batch_rewards, batch_dones = replay_buffer.sample(batch_size)
            state = torch.Tensor(batch_states).to(device)
            next_state = torch.Tensor(batch_next_states).to(device)
            action = torch.Tensor(batch_actions).to(device)
            reward = torch.Tensor(batch_rewards).to(device)
            done = torch.Tensor(batch_dones).to(device)

            # Step 5: From the next state S', the actor target plans the next action a'
            next_action = self.actor_target(next_state)

            # Step 6: Adding Gaussian noise to the next action a' and we clamp it to a range
            noise = torch.Tensor(batch_actions).data.normal_(0, policy_noise).to(device) # generate gaussian noise
            noise = noise.clamp(-noise_clip, noise_clip)
            next_action = (next_action + noise).clamp(-self.max_action, self.max_action)

            # Step 7: Two critic targets take couple (s', a') as input and return Q-values (Qt1 and Qt2) as outputs
            target_Q1, target_Q2 = self.critic_target(next_state, next_action)

            # Step 8: Keeping minimum of Qt1 and Qt2
            target_Q = torch.min(target_Q1, target_Q2)

            # Step 9: Getting final target of the two critic models, need to watch out if the batch plays the last transition of an episode which means the next state is meaningless, 
            # this is countered with the "done" condition
            target_Q = reward + ((1 - done) * discount_factor * target_Q).detach() # The reward is not a part of the pytorch computational graph, however the second term is, so it has to be detached first.

            # Step 10: Two critic models take couple (s, a) as input and return Q-values (Q1 and Q2) as outputs
            current_Q1, current_Q2 = self.critic(state, action)

            # Step 11: Compute loss coming from two critic models
            critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)

            # Step 12: Backpropogate loss and update params of two critic models using optimizer
            self.critic_optimizer.zero_grad() # Initializes all Gradients to 0 at start
            critic_loss.backward() # Computes all Gradients
            self.critic_optimizer.step() # Updates all Parameters

            # Step 13: Update actor model every 2 iterations by performing gradient ascent
            if it % policy_freq == 0:
                actor_loss = - self.critic.Q1(state, self.actor(state)).mean()
                self.actor_optimizer.zero_grad()
                actor_loss.backward()
                self.actor_optimizer.step()

                # Step 14: Update actor target every 2 iterations using Polyak averaging
                # implement a double for loop, first for getting actor target params and the 2nd for getting params of actor model
                for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                    target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data) # Torch variables contain much more than data of a variable
                    

            






            
            



        
