### Import `Libraries`

In [1]:
import torch, sys, os, collections, random, tqdm
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(''))))
from snapbot_env.class_snapbot import Snapbot4EnvClass

  from .autonotebook import tqdm as notebook_tqdm


### `ReplayBuffer` Class

In [2]:
class ReplayBufferClass():
    def __init__(self,
                 buffer_limit,
                 device) -> None:
        self.buffer = collections.deque(maxlen=buffer_limit)
        self.device = device

    def size(self):
        return len(self.buffer)

    def clear(self):
        self.buffer.clear()

    def put(self,
            item):
        self.buffer.append(item)
    
    def put_mini_batch(self,
            mini_batch):
        for transition in mini_batch:
            self.put(transition)
        
    def sample(self,
               n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append(a)
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done_mask else 1.0
            done_mask_lst.append([done_mask])
        return torch.tensor(np.array(s_lst), dtype=torch.float).to(self.device), \
                 torch.tensor(np.array(a_lst), dtype=torch.float).to(self.device), \
                     torch.tensor(np.array(r_lst), dtype=torch.float).to(self.device), \
                         torch.tensor(np.array(s_prime_lst), dtype=torch.float).to(self.device), \
                             torch.tensor(np.array(done_mask_lst), dtype=torch.float).to(self.device)

### `Actor` Class

In [3]:
class ActorClass(nn.Module):
    def __init__(self,
                 name = "actor",
                 obs_dim = 8,
                 h_dims = [256, 256],
                 out_dim = 1,
                 max_torque = 1,
                 init_alpha = 0.1,
                 lr_actor = 0.0003,
                 lr_alpha = 0.0003,
                 device = None) -> None:
        super(ActorClass, self).__init__()
        # Initialize
        self.name = name
        self.obs_dim = obs_dim
        self.h_dims = h_dims
        self.out_dim = out_dim
        self.max_torque = max_torque
        self.init_alpha = init_alpha
        self.lr_actor = lr_actor
        self.lr_alpha = lr_alpha
        self.device = device
        self.init_layers()
        self.init_params()
        # Set optimizer
        self.actor_optimizer = optim.Adam(self.parameters(), lr=self.lr_actor)
        self.log_alpha = torch.tensor(np.log(self.init_alpha), requires_grad=True, device=self.device)
        self.log_alpha_optimizer = optim.Adam([self.log_alpha], lr=self.lr_alpha)
        
    def init_layers(self):
        self.layers = {}
        h_dim_prev = self.obs_dim
        for h_idx, h_dim in enumerate(self.h_dims):
            self.layers['mlp_{}'.format(h_idx)] = nn.Linear(h_dim_prev, h_dim)
            self.layers['relu_{}'.format(h_idx)] = nn.ReLU()
            h_dim_prev = h_dim
        self.layers['mu'] = nn.Linear(h_dim_prev, self.out_dim)
        self.layers['std'] = nn.Linear(h_dim_prev, self.out_dim)

        self.param_dict = {}
        for key in self.layers.keys():
            layer = self.layers[key]
            if isinstance(layer,nn.Linear):
                self.param_dict[key+'_w'] = layer.weight
                self.param_dict[key+'_b'] = layer.bias
        self.parameters = nn.ParameterDict(self.param_dict)

    def init_params(self):
        for key in self.layers.keys():
            layer = self.layers[key]
            if isinstance(layer,nn.Linear):
                nn.init.normal_(layer.weight,mean=0.0,std=0.01)
                nn.init.zeros_(layer.bias)
            elif isinstance(layer,nn.BatchNorm2d):
                nn.init.constant_(layer.weight,1.0)
                nn.init.constant_(layer.bias,0.0)
            elif isinstance(layer,nn.Conv2d):
                nn.init.kaiming_normal_(layer.weight)
                nn.init.zeros_(layer.bias)
                
    def forward(self,
                x):
        x = x.to(self.device)
        for h_idx, _ in enumerate(self.h_dims):
            x = self.layers['relu_{}'.format(h_idx)](self.layers['mlp_{}'.format(h_idx)](x))
        mean = self.layers['mu'](x)
        std = F.softplus(self.layers['std'](x)) + 1e-5
        GaussianDistribution = Normal(mean, std)
        action = GaussianDistribution.rsample()
        log_prob = GaussianDistribution.log_prob(action)
        real_action = torch.tanh(action) * self.max_torque
        real_log_prob = log_prob - torch.log(self.max_torque*(1-torch.tanh(action).pow(2)) + 1e-6)
        return real_action, real_log_prob
    
    def train(self,
              q_1,
              q_2,
              target_entropy,
              mini_batch):
        s, _, _, _, _ = mini_batch
        a, log_prob = self.forward(s)
        entropy = -self.log_alpha.exp() * log_prob
        
        q_1_value = q_1(s, a)
        q_2_value = q_2(s, a)
        q_1_q_2_value = torch.cat([q_1_value, q_2_value], dim=1)
        min_q_value = torch.min(q_1_q_2_value, 1, keepdim=True)[0]
        
        actor_loss = -min_q_value - entropy
        self.actor_optimizer.zero_grad()
        actor_loss.mean().backward()
        self.actor_optimizer.step()
        
        alpha_loss = -(self.log_alpha.exp() * (log_prob+target_entropy).detach()).mean()
        self.log_alpha_optimizer.zero_grad()
        alpha_loss.backward()
        self.log_alpha_optimizer.step()

### `Critic` Class

In [4]:
class CriticClass(nn.Module):
    def __init__(self,
                 name = "critic",
                 obs_dim = 75,
                 a_dim = 8,
                 h_dims = [256, 256],
                 out_dim = 1,
                 lr_critic = 0.0003,
                 device = None) -> None:
        super(CriticClass, self).__init__()
        # Initialize
        self.name = name
        self.obs_dim = obs_dim
        self.a_dim = a_dim
        self.h_dims = h_dims
        self.out_dim = out_dim
        self.lr_critic = lr_critic
        self.device = device
        self.init_layers()
        self.init_params()
        # Set optimizer
        self.critic_optimizer = optim.Adam(self.parameters(), lr=self.lr_critic)

    def init_layers(self):
        self.layers = {}
        h_dim_prev = self.h_dims[0]
        for h_idx, h_dim in enumerate(self.h_dims):
            if h_idx == 0:
                self.layers['obs'] = nn.Linear(self.obs_dim, int(self.h_dims[0]/2))
                self.layers['obs_relu'] = nn.ReLU()
                self.layers['act'] = nn.Linear(self.a_dim, int(self.h_dims[0]/2))
                self.layers['act_relu'] = nn.ReLU()
            else:
                self.layers['mlp_{}'.format(h_idx)] = nn.Linear(h_dim_prev, h_dim)
                self.layers['relu_{}'.format(h_idx)] = nn.ReLU()
            h_dim_prev = h_dim
        self.layers['out'] = nn.Linear(h_dim_prev, self.out_dim)

        self.param_dict = {}
        for key in self.layers.keys():
            layer = self.layers[key]
            if isinstance(layer,nn.Linear):
                self.param_dict[key+'_w'] = layer.weight
                self.param_dict[key+'_b'] = layer.bias
        self.parameters = nn.ParameterDict(self.param_dict)

    def init_params(self):
        for key in self.layers.keys():
            layer = self.layers[key]
            if isinstance(layer,nn.Linear):
                nn.init.normal_(layer.weight,mean=0.0,std=0.01)
                nn.init.zeros_(layer.bias)
            elif isinstance(layer,nn.BatchNorm2d):
                nn.init.constant_(layer.weight,1.0)
                nn.init.constant_(layer.bias,0.0)
            elif isinstance(layer,nn.Conv2d):
                nn.init.kaiming_normal_(layer.weight)
                nn.init.zeros_(layer.bias)
                
    def forward(self,
                x,
                a):
        x = x.to(self.device)
        a = a.to(self.device)
        for h_idx, _ in enumerate(self.h_dims):
            if h_idx == 0:
                x = self.layers['obs_relu'](self.layers['obs'](x))
                a = self.layers['act_relu'](self.layers['act'](a))
                cat = torch.cat([x,a], dim=1)
            else:
                 q = self.layers['relu_{}'.format(h_idx)](self.layers['mlp_{}'.format(h_idx)](cat))
        q = self.layers['out'](q)
        return q
    
    def train(self,
              target,
              mini_batch):
        s, a, r, s_prime, done = mini_batch
        critic_loss = F.smooth_l1_loss(self.forward(s,a), target)
        self.critic_optimizer.zero_grad()
        critic_loss.mean().backward()
        self.critic_optimizer.step()
        
    def soft_update(self, tau, net_target):
        for param_target, param in zip(net_target.parameters(), self.parameters()):
            param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)

### `Util` Function

In [5]:
def get_target(pi, q1, q2, gamma, mini_batch, device):
    q1 = q1.to(device)
    q2 = q2.to(device)
    pi = pi.to(device)
    s, a, r, s_prime, done = mini_batch
    with torch.no_grad():
        a_prime, log_prob= pi(s_prime)
        entropy = -pi.log_alpha.exp() * log_prob
        q1_val, q2_val = q1(s_prime,a_prime), q2(s_prime,a_prime)
        q = torch.cat([q1_val, q2_val], dim=1)
        min_q = torch.min(q, 1, keepdim=True)[0]
        target = r + gamma * done * (min_q + entropy.mean())
    return target 

### Define `Env` & `Hyperparameter`

In [6]:
env = Snapbot4EnvClass(xml_path='../snapbot_env/xml/snapbot_4/robot_4_', render_mode=None)
epi_length = 300
max_torque = 1.0

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
max_episode = 500
random_episode = 10
buffer_limit = 10000000
n_gradient_step_per_update = 1
lr_critic = 0.0003
lr_actor = 0.0003
lr_alpha = 0.0003
init_alpha = 0.1
gamma = 0.99
tau = 0.005
batch_size = 256

print_interval = 20
save_interval = 100


Snapbot(4legs) Environment
Obs Dim: [103] Act Dim: [8] dt:[0.02] Condition:[None]
ctrl_coef:[0] head_coef:[0]


### `Main` Function

In [7]:
ReplayBuffer = ReplayBufferClass(buffer_limit, device=device)
CriticOne = CriticClass(obs_dim=env.odim, a_dim=env.adim, h_dims=[256, 256], out_dim=1, lr_critic=lr_critic, device=device).to(device)
CriticTwo = CriticClass(obs_dim=env.odim, a_dim=env.adim, h_dims=[256, 256], out_dim=1, lr_critic=lr_critic, device=device).to(device)
CriticOneTarget = CriticClass(obs_dim=env.odim, a_dim=env.adim, h_dims=[256, 256], out_dim=1, lr_critic=lr_critic, device=device).to(device)
CriticTwoTarget = CriticClass(obs_dim=env.odim, a_dim=env.adim, h_dims=[256, 256], out_dim=1, lr_critic=lr_critic, device=device).to(device)
Actor = ActorClass(obs_dim=env.odim, h_dims=[256, 256], out_dim=env.adim, max_torque=max_torque, init_alpha=init_alpha, lr_actor=lr_actor, lr_alpha=lr_alpha, device=device).to(device)
CriticOneTarget.load_state_dict(CriticOne.state_dict())
CriticTwoTarget.load_state_dict(CriticTwo.state_dict())

for episode in tqdm.tqdm(range(1, max_episode+1)):
    s = env.reset()
    done = False
    reward_total = 0
    reward_forward = 0
    for i in range(epi_length):
        if episode < random_episode+1:
            a = env.action_space.sample()
            s_prime, reward, done, info = env.step(a)
            if i == epi_length-1:
                done = True
            ReplayBuffer.put((s, a, reward, s_prime, done))
        else:
            a, log_prob = Actor(torch.from_numpy(s).float().to(device))
            s_prime, reward, done, info = env.step(a.detach().cpu().numpy())
            if i == epi_length-1:
                done = True
            ReplayBuffer.put((s, a.detach().cpu().numpy(), reward, s_prime, done))
        reward_total += reward
        reward_forward += info['reward_forward']
        s = s_prime 
        if done is True:
            break   
        
        # Update
        if ReplayBuffer.size() > 10000:
            for j in range(n_gradient_step_per_update): 
                mini_batch = ReplayBuffer.sample(batch_size)
                td_target = get_target(Actor, CriticOneTarget, CriticTwoTarget, gamma=gamma, mini_batch=mini_batch, device=device)
                CriticOne.train(td_target, mini_batch)
                CriticTwo.train(td_target, mini_batch)
                Actor.train(CriticOne, CriticTwo, target_entropy=-env.adim, mini_batch=mini_batch)
                CriticOne.soft_update(tau=tau, net_target=CriticOneTarget)
                CriticTwo.soft_update(tau=tau, net_target=CriticTwoTarget)
    x_diff = env.sim.data.qpos[0] 
    
    if episode % print_interval == 0:
        print("EPISODE: {}, REWARD: {:.2f}, XDIFF: {:.2f}, ALPHA: {:.4f}".format(episode, reward_total, x_diff, Actor.log_alpha.exp()))
    
    if episode % save_interval == 0:
        if not os.path.exists("results/weights"):
            os.makedirs("results/weights")
        torch.save(Actor.state_dict(), "results/weights/sac_model_weights_{}.pth".format(episode))

  4%|▍         | 20/500 [00:05<03:15,  2.46it/s]

EPISODE: 20, REWARD: 0.12, XDIFF: 0.00, ALPHA: 0.1000


  8%|▊         | 40/500 [00:28<19:00,  2.48s/it]

EPISODE: 40, REWARD: 4.88, XDIFF: 0.10, ALPHA: 0.0638


 12%|█▏        | 60/500 [01:24<20:31,  2.80s/it]

EPISODE: 60, REWARD: 11.79, XDIFF: 0.24, ALPHA: 0.0169


 16%|█▌        | 80/500 [02:19<19:02,  2.72s/it]

EPISODE: 80, REWARD: 2.16, XDIFF: 0.04, ALPHA: 0.0048


 20%|██        | 100/500 [03:14<18:46,  2.82s/it]

EPISODE: 100, REWARD: 23.76, XDIFF: 0.48, ALPHA: 0.0014


 24%|██▍       | 120/500 [04:11<17:43,  2.80s/it]

EPISODE: 120, REWARD: 77.17, XDIFF: 1.54, ALPHA: 0.0004


 28%|██▊       | 140/500 [05:05<16:35,  2.77s/it]

EPISODE: 140, REWARD: 114.87, XDIFF: 2.30, ALPHA: 0.0001


 32%|███▏      | 160/500 [06:00<15:28,  2.73s/it]

EPISODE: 160, REWARD: 87.80, XDIFF: 1.76, ALPHA: 0.0001


 36%|███▌      | 180/500 [06:59<16:02,  3.01s/it]

EPISODE: 180, REWARD: 99.66, XDIFF: 1.99, ALPHA: 0.0001


 40%|████      | 200/500 [07:59<14:43,  2.95s/it]

EPISODE: 200, REWARD: 134.27, XDIFF: 2.69, ALPHA: 0.0001


 44%|████▍     | 220/500 [08:58<13:44,  2.95s/it]

EPISODE: 220, REWARD: 147.00, XDIFF: 2.94, ALPHA: 0.0001


 48%|████▊     | 240/500 [10:00<13:48,  3.19s/it]

EPISODE: 240, REWARD: 149.77, XDIFF: 3.00, ALPHA: 0.0001


 52%|█████▏    | 260/500 [11:03<12:58,  3.24s/it]

EPISODE: 260, REWARD: 153.03, XDIFF: 3.06, ALPHA: 0.0002


 56%|█████▌    | 280/500 [12:05<11:08,  3.04s/it]

EPISODE: 280, REWARD: 160.83, XDIFF: 3.22, ALPHA: 0.0001


 60%|██████    | 300/500 [13:06<10:13,  3.07s/it]

EPISODE: 300, REWARD: 165.67, XDIFF: 3.31, ALPHA: 0.0001


 64%|██████▍   | 320/500 [14:06<08:52,  2.96s/it]

EPISODE: 320, REWARD: 171.36, XDIFF: 3.43, ALPHA: 0.0001


 68%|██████▊   | 340/500 [15:06<07:59,  3.00s/it]

EPISODE: 340, REWARD: 174.13, XDIFF: 3.48, ALPHA: 0.0001


 72%|███████▏  | 360/500 [16:08<07:21,  3.16s/it]

EPISODE: 360, REWARD: 170.82, XDIFF: 3.42, ALPHA: 0.0002


 76%|███████▌  | 380/500 [17:07<05:36,  2.81s/it]

EPISODE: 380, REWARD: 169.59, XDIFF: 3.39, ALPHA: 0.0001


 80%|████████  | 400/500 [18:02<04:41,  2.81s/it]

EPISODE: 400, REWARD: 178.06, XDIFF: 3.56, ALPHA: 0.0001


 84%|████████▍ | 420/500 [18:57<03:38,  2.74s/it]

EPISODE: 420, REWARD: 179.33, XDIFF: 3.59, ALPHA: 0.0001


 88%|████████▊ | 440/500 [19:52<02:44,  2.74s/it]

EPISODE: 440, REWARD: 181.15, XDIFF: 3.62, ALPHA: 0.0002


 92%|█████████▏| 460/500 [20:47<01:50,  2.76s/it]

EPISODE: 460, REWARD: 182.46, XDIFF: 3.65, ALPHA: 0.0002


 96%|█████████▌| 480/500 [21:44<00:57,  2.89s/it]

EPISODE: 480, REWARD: 184.89, XDIFF: 3.70, ALPHA: 0.0002


100%|██████████| 500/500 [22:42<00:00,  2.72s/it]

EPISODE: 500, REWARD: 3.68, XDIFF: 0.07, ALPHA: 0.0004





### `Eval` Function

In [13]:
n_sample = 3
eval_episode = 400
RENDER = False

In [14]:
env = Snapbot4EnvClass(xml_path='../snapbot_env/xml/snapbot_4/robot_4_', render_mode=None)
epi_length = 300
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

s = env.reset()
Actor = ActorClass(
                    obs_dim=env.odim, 
                    out_dim=env.adim,
                    max_torque=1,
                    init_alpha=0.1,
                    lr_actor=0,
                    lr_alpha=0.1,
                    device=device).to(device)
Actor.load_state_dict(torch.load('results/weights/sac_model_weights_{}.pth'.format(eval_episode), map_location=device))

for sample_idx in range(n_sample):
    done = False
    env.reset()
    q_pos = []
    rewards = 0
    for i in range(epi_length):
        if RENDER:
            env.render()
        q_pos.append(env.get_joint_pos_deg())
        a, _ = Actor(torch.from_numpy(s).float())
        s_prime, reward, done, info = env.step(a.detach().cpu().numpy())
        rewards += reward
        s = s_prime
        if done:
            break
    print("REWARD: {:.2f} XDIFF: {:.2f}".format(rewards, env.sim.data.qpos[0]))

Snapbot(4legs) Environment
Obs Dim: [103] Act Dim: [8] dt:[0.02] Condition:[None]
ctrl_coef:[0] head_coef:[0]
REWARD: 162.75 XDIFF: 3.25
REWARD: 168.02 XDIFF: 3.36
REWARD: 176.91 XDIFF: 3.54
