# Humanaoid Standup Project

Enviroment Setup and imports

In [3]:
!pip install gymnasium[mujoco]
import gymnasium as gym
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from torch.distributions import Normal
import numpy as np
from google.colab import files
import matplotlib.pyplot as plt

# device setup
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

# enviroment
env = gym.make('HumanoidStandup-v5')
obs, info = env.reset()



Memory

In [10]:
class Memory():
  def __init__(self, batch_size):
    self.states = []
    self.values = []
    self.actions = []
    self.probs = []
    self.rewards = []
    self.finished = []
    self.batch_size = batch_size

  def store_transition_data(self, state, value, a, prob, reward, finished):
    self.states.append(state)
    if torch.is_tensor(value):
      value = value.detach().squeeze().item()
    self.values.append(value)

    if torch.is_tensor(a):
      a = a.detach().cpu().squeeze(0)
    self.actions.append(a)

    if torch.is_tensor(prob):
      prob = prob.detach().cpu().squeeze().item()
    self.probs.append(prob)

    self.rewards.append(reward)
    self.finished.append(finished)

  def create_batches(self):
    states = torch.tensor(np.array(self.states), dtype = torch.float32).to(device)
    values = torch.tensor(self.values, dtype = torch.float32).to(device)
    actions = torch.tensor(np.array(self.actions), dtype = torch.float32).to(device)
    probs = torch.tensor(self.probs, dtype = torch.float32).to(device)
    rewards = torch.tensor(self.rewards, dtype = torch.float32).to(device)
    finished = torch.tensor(self.finished, dtype = torch.bool).to(device)

    random_indices = np.random.permutation(len(states))
    batch_starting_indices = np.arange(0,len(states), self.batch_size)
    batches = []

    for b in batch_starting_indices:
      batches.append(random_indices[b:b+self.batch_size])

    return states, values, actions, probs, rewards, finished, batches

  def restart_memory(self):
    self.states = []
    self.values = []
    self.actions = []
    self.probs = []
    self.rewards = []
    self.finished = []

###Actor

In [5]:
class Actor(nn.Module):
  def __init__(self, observation_dim, action_dim, hidden_size = 256):
    super().__init__()
    self.actor = nn.Sequential(
        nn.Linear(observation_dim, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, action_dim) # represents the mean
    )
    self.std_logged = nn.Parameter(torch.zeros(action_dim)) # trainable std parameter

  def forward(self,observation):
    mean = self.actor(observation)
    std = torch.exp(self.std_logged)
    return mean, std

  def compute_distr(self, observation):
    mean, std = self.forward(observation)
    distr = torch.distributions.Normal(mean, std)
    return distr

  def take_action(self, observation):
    distr = self.compute_distr(observation)
    action_sampled = distr.sample()
    log_probabilty  = distr.log_prob(action_sampled).sum(dim= -1)
    return action_sampled, log_probabilty

###Critic

In [6]:
class Critic(nn.Module):
  def __init__(self, observation, hidden_size = 256):
    super().__init__()
    self.critic = nn.Sequential(
        nn.Linear(observation, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, 1) # 1 dim represents the value of the state
    )
  def forward(self, observation):
    return self.critic(observation)

### Interacting Agent

In [7]:
class PPOController:
  def __init__(self,observation_dim, action_dim, lambda_, gamma, alpha,  clip, N, epochs, batch_size):
    self.observation_dim = observation_dim
    self.controller_memory = Memory(batch_size)
    self.action_dim = action_dim
    self.lambda_ = lambda_
    self.gamma = gamma
    self.alpha = alpha
    self.clip = clip
    self.N = N
    self.epochs = epochs
    self.batch_size = batch_size
    self.actor = Actor(observation_dim, action_dim).to(device)
    self.critic = Critic(observation_dim).to(device)
    self.actor_opt = torch.optim.Adam(self.actor.parameters(), lr = alpha)
    self.critic_opt = torch.optim.Adam(self.critic.parameters(), lr = alpha)

  def sample_action(self,observation):
    observation_torch = torch.as_tensor(observation, dtype = torch.float32).to(device)
    if observation_torch.dim() == 1:
      observation_torch = observation_torch.unsqueeze(0) # add batch dim
    action_sampled, log_probabilty = self.actor.take_action(observation_torch)
    state_value = self.critic(observation_torch)
    return action_sampled, log_probabilty, state_value


  def compute_gae(self):
    states, values, actions, prev_probs, rewards, finished, batches  = self.controller_memory.create_batches()
    add_one = torch.zeros(1, dtype = values.dtype).to(device)
    vals = torch.cat([values, add_one], dim = 0)
    advantages = torch.zeros_like(rewards).to(device)
    prev_advantage = 0
    T = len(rewards)

    # computes gae (reversed)
    for t in reversed(range(T)):
      finished_mask = 1 - finished[t].float()
      td_error  = rewards[t] + self.gamma * vals[t+1] *finished_mask - vals[t]
      advantage = td_error + self.gamma * self.lambda_ * prev_advantage * finished_mask
      prev_advantage = advantage
      advantages[t]  = advantage

    return states, values, actions, prev_probs, rewards, finished, batches, advantages


  def iterate_batches(self):
    states, values, actions, prev_probs, rewards, finished, batches, advantages = self.compute_gae()
    batch_loss = []
    for batch in batches:
      batch_states = states[batch]
      batch_actions = actions[batch]
      batch_prev_probs = prev_probs[batch]
      batch_advantages = advantages[batch]
      batch_rewards = rewards[batch]
      batch_values = values[batch]

      distribution = self.actor.compute_distr(batch_states)
      new_log_probabilty = distribution.log_prob(batch_actions).sum(dim = -1)
      prob_ratio = (new_log_probabilty - batch_prev_probs).exp()

      value_pred = self.critic(batch_states).squeeze(-1)

      # computing actor loss
      term1 = prob_ratio * batch_advantages
      term2 = torch.clamp(prob_ratio, 1-self.clip, 1+ self.clip)*batch_advantages
      actor_loss =  -torch.min(term1, term2).mean()

      # computing critic loss
      return_ = batch_advantages + batch_values
      critic_loss = 0.5 * ((value_pred - return_)**2).mean()

      total_loss = actor_loss + critic_loss
      batch_loss.append(total_loss)

    batch_loss = torch.stack(batch_loss).mean()

    return batch_loss


  def train(self):
    for ep in range(self.epochs):
      total_loss = self.iterate_batches()
      self.actor_opt.zero_grad()
      self.critic_opt.zero_grad()
      total_loss.backward()
      self.actor_opt.step()
      self.critic_opt.step()
    self.controller_memory.restart_memory()


# Running

In [21]:
# parameters
hidden_size = 256
observation_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
alpha = 3e-4
batch_size = 64
gamma = 0.9
lambda_ = 0.95
clip = 0.2
epochs = 4
runs = 1000
done= False
scores = []
mean_score = 0.0
top_score = float('-inf')
steps = 0
N = 2048
mean_scores_list = []

PPOAgent = PPOController(observation_dim, action_dim, lambda_, gamma, alpha,  clip, N, epochs, batch_size)

for i in range(runs):
  observation, info = env.reset()
  score = 0
  done = False
  while done == False:
    action, log_prob, val = PPOAgent.sample_action(observation)

    a = action.squeeze(0).detach().cpu().numpy()
    obs, r, terminated, truncated, info = env.step(a)
    done = terminated or truncated
    steps += 1

    PPOAgent.controller_memory.store_transition_data(observation, val, action, log_prob, r, done)
    if steps % N == 0:
      PPOAgent.train()
    score += r
    observation = obs

  scores.append(score)
  mean_score = np.mean(scores[-100:]) # prev 100 runs
  mean_scores_list.append(mean_score)

  if mean_score > top_score:
    top_score = mean_score

  #print(f'EP: {i}, BEST SCORE: {top_score}, AVG 50: {mean_score}')

###Saving models/data

In [19]:
torch.save(PPOAgent, "PPOAgent.pth")
np.save('Scores.npy',np.array(scores))
np.save('MeanScores.npy',np.array(mean_scores_list))