<a href="https://colab.research.google.com/github/cove9988/catboost-icarus/blob/main/twin_delayed_ddpg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installing the package

In [1]:
!pip install pybullet

Collecting pybullet
  Downloading pybullet-3.1.8-cp37-cp37m-manylinux1_x86_64.whl (89.3 MB)
[K     |████████████████████████████████| 89.3 MB 22 kB/s 
[?25hInstalling collected packages: pybullet
Successfully installed pybullet-3.1.8


Import libs

In [6]:
import os
import time
import random
import numpy as np
import matplotlib.pyplot as plt
import pybullet_envs
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
from gym import wrappers
from torch.autograd import Variable
from collections import deque

Initialize the Experience Replay Memory

In [4]:
class ReplayBuffer(object):
  def __init__(self, max_size=1e6):
    self.storage =[]
    self.max_size = max_size
    self.prt = 0

  def add(self, transition):
    if len(self.storage) == self.max_size:
      self.storage[int(self.ptr)] = transition
      self.prt = (self.ptr + 1) % self.max_size
    else:
      self.storage.append(transition)

  def sample(self, batch_size):
    ind = np.random.randint(0,len(self.storage),batch_size)
    batch_states, batch_next_states, batch_actions, batch_rewards, batch_dones = [],[],[],[],[]
    for i in ind:
      state,next_state,action,reward,done =self.storage[i]
      batch_next_states.append(np.array(next_state,copy=False))
      batch_actions.append(np.array(action,copy=False))
      batch_rewards.append(np.array(reward,copy=False))
      batch_dones.append(np.array(done,copy=False))
    return np.array(batch_states), np.array(batch_next_states), np.array(batch_actions), np.array(batch_rewards).reshape[-1,1], np.array(batch_dones).reshape[-1,1]



### Actor|Critic Model/Target
This can be improved a better supper class

In [7]:
class Actor(nn.Module):
  def __init__(self, state_dim, action_dim,max_action):
    hiden_layer_in,hiden_layer_out  = 400,300
    super(Actor, self).__init__()
    self.layer_1 = nn.Linear(stat_dim, hiden_layer_in)
    self.layer_2 = nn.Linear(hiden_layer_in,hiden_layer_out)
    self.layer_3 = nn.Linear(hiden_layer_out,action_dim)
    self.max_action = max_action

  def forward(self, x):
    x = F.relu(self.layer_1(x))
    x = F.relu(self.layer_2(x))
    x = self.max_action * torch.tanh(self.layer_3(x))
    return x

class Critic(nn.Module):
  def __init__(self, state_dim, action_dim,):
    hiden_layer_in,hiden_layer_out  = 400,300
    super(Critic, self).__init__()
    self.layer_11 = nn.Linear(stat_dim, hiden_layer_in)
    self.layer_12 = nn.Linear(hiden_layer_in,hiden_layer_out)
    self.layer_13 = nn.Linear(hiden_layer_out,1)
    
    self.layer_21 = nn.Linear(stat_dim, hiden_layer_in)
    self.layer_22 = nn.Linear(hiden_layer_in,hiden_layer_out)
    self.layer_23 = nn.Linear(hiden_layer_out,1)

  def forward(self, x, u):
    xu = torch.cat([x,y],1)

    x1 = F.relu(self.layer_11(xu))
    x1 = F.relu(self.layer_12(x1))
    x1 = self.layer_13(x1)

    x2 = F.relu(self.layer_21(xu))
    x2 = F.relu(self.layer_22(x2))
    x2 = self.layer_23(x2)
    return x1, x2  

  def Q1(self, x, u):
    xu = torch.cat([x,y],1)

    x1 = F.relu(self.layer_11(xu))
    x1 = F.relu(self.layer_12(x1))
    x1 = self.layer_13(x1)
    return x1

Training

In [8]:
# selecting the device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# build the whole training proccess into a class
class TD3():
  def __init__(self, state_dim, action_dim,max_action):
    self.actor = Actor(state_dim, action_dim,max_action).to(device)
    self.actor_target = Actor(state_dim, action_dim,max_action).to(device)
    self.actor_target.load_state_dict(self.actor.state_dict())
    self.actor_optimizer = torch.optim.Adam(self.actor.parameters())

    self.critic = Critic(state_dim, action_dim).to(device)
    self.critic_target = Critic(state_dim, action_dim).to(device)
    self.critic_target.load_state_dict(self.critic.state_dict())
    self.critic_optimizer = torch.optim.Adam(self.critic.parameters())

    self.max_action = max_action

  def select_action(self, state):
    state = torch.Tensor(state.reshape(1,-1)).to(device)
    return self.actor(state).cpu().data.numpy.flatten()

  def train(self, replay_buffer, iteration, batch_size=100, discount=0.99, tau=0.005, policy_noise=0.2, noise_clip=0.5, policy_freq=2):
    for it in range(iteration):
      # Resample batch of transitions
      batch_states,batch_next_states,batch_actions,batch_rewards,batch_dones = replay_buffer.sample(batch_size)
      state = torch.Tensor(batch_states).to(device)
      next_state = torch.Tensor(batch_next_states).to(device)
      action = torch.Tensor(batch_actions).to(device)
      reward = torch.Tensor(batch_rewards).to(device)
      done = torch.Tensor(batch_dones).to(device)

      # From the next state s', the Actor target plays to the next action a'
      next_action = self.actor_target.forward(next_state)

      
