In [1]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
from collections import deque
import matplotlib.pyplot as plt

In [2]:
env = gym.make(
    id='Humanoid-v4',
    healthy_reward=5,                       # costant reward given after each timestep if the humanoid has an healthy posture
    terminate_when_unhealthy=True,          # if the humanoid has not a healthy posture, hence is out of range, then terminate
    healthy_z_range=(1.0, 2.0),             # z-coordinate of the torso the indicate if the humanoid has an healthy posture or not
    exclude_current_positions_from_observation=False,
)

val_env = gym.make(
    id='Humanoid-v4',
    healthy_reward=5,                       # costant reward given after each timestep if the humanoid has an healthy posture
    terminate_when_unhealthy=True,          # if the humanoid has not a healthy posture, hence is out of range, then terminate
    healthy_z_range=(1.0, 2.0),              # z-coordinate of the torso the indicate if the humanoid has an healthy posture or not
    render_mode='human',
    exclude_current_positions_from_observation=False
)

# Get the state space and action space
n_actions = env.action_space

n_frames = 4

In [3]:
env.action_space

Box(-0.4, 0.4, (17,), float32)

In [4]:
env.observation_space.sample().shape

(378,)

observation = val_env.step(val_env.action_space.sample())[0]

reward = val_env.step(val_env.action_space.sample())[1]

terminated = val_env.step(val_env.action_space.sample())[2] # bool

truncated = val_env.step(val_env.action_space.sample())[3] # bool

info = val_env.step(val_env.action_space.sample())[4] # dict

dict_keys(['reward_linvel', 'reward_quadctrl', 'reward_alive', 'x_position', 'y_position', 'distance_from_origin', 'x_velocity', 'y_velocity', 'forward_reward'])

In [6]:
obs = env.reset()
reward = []
for _ in range(10000):
    action = env.action_space.sample()
    obs, rew, _, _, info= env.step(action)
    print(obs)
    print('----')
    print(action)
    break

[ 8.43694556e-03  8.83825264e-03  1.39218123e+00  9.99950099e-01
  2.19622536e-03  2.09226175e-03  9.51830764e-03 -1.55894097e-02
  6.04401659e-02  2.63836415e-02 -2.31511229e-02 -6.98137883e-02
 -6.29929493e-02 -1.44925785e-02  3.51736683e-02 -7.63490864e-03
 -9.45769460e-02 -1.09830712e-01 -2.02846543e-03 -1.93357797e-02
  8.43695766e-04 -2.99385522e-03  9.32987267e-03  1.34712738e-02
  2.39108645e-01 -1.88597709e-02 -2.22303731e-01 -3.93643096e-01
 -6.87697258e-01  1.26047870e+00 -1.36056845e+00  4.91477695e+00
  2.78772731e+00 -2.20580589e+00 -5.73515752e+00 -6.49041267e+00
 -1.91604556e+00  2.93434793e+00 -7.36316774e-01 -1.03796191e+01
 -1.21773567e+01  1.76105268e-01 -2.28202195e+00  3.91633504e-01
 -2.06834691e-01 -1.68769288e-01  1.96021346e+00  0.00000000e+00
  0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00  2.29983617e+00  2.28090642e+00  4.16339433e-02
  2.05386194e-04  2.01910

In [9]:
print(obs[45])
print(action.shape)

-0.1687692882896362
(17,)


### Actor

In [None]:
class Actor(nn.Module) :
    def __init__(self, input_size, hidden_size, n_actions, init_weight, min_act_value, lr, device=torch.device('cpu')):
        super(Actor, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.n_actions = n_actions
        self.init_weight = init_weight
        self.min_act_value = min_act_value
        self.lr = lr
        self.device = device

        self.relu = nn.ReLU()
        self.tanh = nn.Tanh()

        self.fc1 = nn.Linear(
            in_features=self.input_size,
            out_features=self.hidden_size
        )

        self.fc2 = nn.Linear(
            in_features=self.hidden_size,
            out_features=self.hidden_size
        )

        self.mean_fc3 = nn.Linear(
            in_features= self.hidden_size,
            out_features=self.n_actions
        )

        self.mean_fc3.weight.data.uniform_(-self.init_weight, self.init_weight)
        self.mean_fc3.bias.data.uniform_(-self.init_weight, self.init_weight)

        self.log_std_fc3 = nn.Linear(
            in_features= self.hidden_size,
            out_features=self.n_actions
        )

        self.log_std_fc3.weight.data.uniform_(-self.init_weight, self.init_weight)
        self.log_std_fc3.bias.data.uniform_(-self.init_weight, self.init_weight)

        self.optimizer = optim.Adam(params=self.parameters(), lr=self.lr)

    def forward(self, x):

        x = self.relu(self.fc1(x))

        x = self.relu(self.fc2(x))

        return self.mean_fc3(x), self.log_std_fc3(x)   # mean, log_std
    
    def act(self, state, greedy=False):

        mean, log_std = self.forward(state)

        action = self.tanh(mean)

        if not greedy: # explore

            gaussian = Normal(0, 1)
            z = gaussian.sample()
            action = self.tanh(mean + log_std.exp()*z)

        # clamp obtained values to [-0.4, 0.4]

        action_clamped = torch.clamp(action, 
                                     min=-self.min_act_value,
                                     max=self.min_act_value)
        
        return action_clamped


### Critic

In [None]:
class Critic(nn.Module):
    def __init__(self, input_size, hidden_size, init_weight, lr, device=torch.device('cpu')):
        super(Critic, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.init_weight = init_weight
        self.lr = lr
        self.device = device

        self.relu = nn.ReLU()

        self.fc1 = nn.Linear(
            in_features=self.input_size,
            out_features=self.hidden_size
        )

        self.fc2 = nn.Linear(
            in_features=self.hidden_size,
            out_features=self.hidden_size
        )

        self.fc3 = nn.Linear(
            in_features= self.hidden_size,
            out_features= 1
        )

        self.fc3.weight.data.uniform_(-self.init_weight, self.init_weight)
        self.fc3.bias.data.uniform_(-self.init_weight, self.init_weight)

        self.optimizer = optim.Adam(self.parameters(), lr=self.lr)

    def forward(self, x):

        x = self.relu(self.fc1(x))

        x = self.relu(self.fc2(x))

        return self.fc3(x)

### Algorithm

In [None]:
class Action_Critic():
    def __init__(self, input_size, hidden_size, n_actions, init_weight, min_act_value, act_lr, crt_lr, num_episodes, device=torch.device('cpu')):
        self.device = device

        self.num_episodes = num_episodes

        self.Actor = Actor(
            input_size= input_size,
            hidden_size= hidden_size,
            n_actions= n_actions,
            init_weight= init_weight,
            min_act_value= min_act_value,
            lr= act_lr,
            device= self.device
        )

        self.Critic = Critic(
            input_size= input_size,
            hidden_size= hidden_size,
            init_weight= init_weight,
            lr= crt_lr,
            device= self.device
        )
    def save(self):
        torch.save(self.state_dict(), 'model.pt')

    def load(self):
        self.load_state_dict(torch.load('model.pt', map_location=self.device))

    def to(self, device):
        ret = super().to(device)
        ret.device = device
        return ret


    def train(self):

        E = gym.make(
            id='Humanoid-v4',
            healthy_reward=5,                       
            terminate_when_unhealthy=True,          
            healthy_z_range=(1.0, 2.0),             
            exclude_current_positions_from_observation=False,
        )

        scores = deque(maxlen=100)
        act_losses = deque(maxlen=10)
        crt_losses = deque(maxlen=10)

        for iter in range(self.num_episodes):

        # CONTAINERS

            Rewards = []
            Log_prob = []
            Values = []
            obs_next = E.reset()
            done_next = False

        # ROLLOUT
            t = 0

            while not done_next:

                obs = obs_next
                done = done_next

                action = self.Actor.act(obs)

                obs_next, reward, done, truncated, info = E.step(action)

                value = self.Critic.forward(obs_next)

                Values.append(value)
                Rewards.append(reward)

                done_next = done | truncated

                # check if z-coordinate of torso is inside the healthy_range



                t+=1 # update counter

            scores.append(sum(Rewards))

        # LEARNING

            # compute Advantage

            # compute Log_Prob

            actor_loss = 0
            critic_loss = 0

            act_losses.append(actor_loss)
            crt_losses.append(critic_loss)

        # UPDATES

            # Actor
            self.Actor.optimizer.no_grad()
            actor_loss.backward()
            self.Actor.optimizer.step()

            # Critic
            self.Critic.optimizer.no_grad()
            critic_loss.backward()
            self.Critic.optimizer.step()

        # SAVE


            if iter % 10:

                print(f'''Episode {iter}
                    \tAverage Score: {np.mean(scores)}
                    \ttAverage Critic Loss: {np.mean(act_losses)}
                    \tAverage Actor Loss: {np.mean(crt_losses)}
                    \tLast Score: {sum(Rewards)}\n''')
                
                self.save()

        return

### Evaluation

In [None]:
def Evaluation(Agent_AC):
    return 0