In [None]:
import torch
from torch import nn
from torch import optim
import gymnasium as gym
import random
import numpy as np

torch.manual_seed(42)
np.random.seed(42)

In [None]:
class PolicyModel(nn.Module):
    def __init__(self, obs_size, action_size):
        super().__init__()
        self.mean = nn.Sequential(
                        nn.Linear(obs_size, 512),
                        nn.ReLU(),
                        nn.Linear(512, 256),
                        nn.ReLU(),
                        nn.Linear(256, 64),
                        nn.ReLU(),
                        nn.Linear(64, action_size),
                        nn.Tanh())
        
        self.logstd_layer = nn.Parameter(torch.zeros(action_size))

        # Initialize weights with random normal distribution (stddev=0.01)
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.mean:
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, std=0.01)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        return self.mean(x), self.logstd_layer

class ValueModel(nn.Module):
    def __init__(self, obs_size):
        super().__init__()
        self.layers = nn.Sequential(
                        nn.Linear(obs_size, 512),
                        nn.ReLU(),
                        nn.Linear(512, 256),
                        nn.ReLU(),
                        nn.Linear(256, 64),
                        nn.ReLU(),
                        nn.Linear(64,1))

        # Initialize weights with random normal distribution (stddev=0.01)
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.layers:
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, std=0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.layers(x)

In [None]:
def discounted_rewards(rewards, dones, gamma=0.99):
    discounted_rewards = [rewards[-1]]
    for i in range(len(rewards) - 2, -1, -1):
        if dones[i]:  # If the current state is terminal, reset the discounting
            discounted_rewards.append(rewards[i])
        else:
            discounted_rewards.append(rewards[i] + gamma * discounted_rewards[-1])
    discounted_r = np.array(discounted_rewards[::-1])
    #discounted_r -= np.mean(discounted_r) # normalizing the result
    #discounted_r /= (np.std(discounted_r) + 1e-8) # divide by standard deviation
    return discounted_r
    
def compute_gaes(rewards, values, dones, gamma=0.99, decay=0.90, normalize=True):
    next_values = np.concatenate([values[1:], [0]])
    deltas = [rew + gamma * next_val * (1 - done) - val for rew, val, next_val, done in zip(rewards, values, next_values, dones)]
    gaes = [deltas[-1]]
    for i in range(len(deltas) - 2, -1, -1):
        if dones[i]:  # If the next state is terminal, reset the GAE
            gaes.append(deltas[i])
        else:
            gaes.append(deltas[i] + decay * gamma * gaes[-1])
    gaes = np.array(gaes[::-1])
    if normalize:
        gaes = (gaes - np.mean(gaes)) / (np.std(gaes) + 1e-8)
    return gaes

def log_prob(action, mean, logstd):
    std = np.exp(logstd)
    std = np.clip(std, a_min=1e-8, a_max=None)  
    log_prob = -0.5 * (((action - mean) / std) ** 2 + 2 * logstd + np.log(2 * np.pi))
    return log_prob

def tensor_log_prob(action, mean, logstd):
    std = torch.exp(logstd)
    dist = torch.distributions.Normal(mean, std)
    return dist.log_prob(action)

In [None]:
class PPOTrainer():
  def __init__(self,
              actor,
              critic,
              ppo_clip_val=0.2,
              entropy_coeff = 0.01,
              policy_train_iters=20,
              value_train_iters=20,
              policy_lr=3e-4,
              value_lr=1e-2,
              batch_size=64):
    self.actor = actor
    self.critic = critic
    self.ppo_clip_val = ppo_clip_val
    self.entropy_coeff = entropy_coeff
    self.policy_train_iters = policy_train_iters
    self.value_train_iters = value_train_iters
    self.policy_optim = optim.Adam(self.actor.parameters(), lr=policy_lr)
    self.value_optim = optim.Adam(self.critic.parameters(), lr=value_lr)
    self.batch_size = batch_size

  def train_policy(self, obs, acts, old_log_probs, gaes):
        dataset_size = obs.size(0)
        indices = torch.randperm(dataset_size)
        for _ in range(self.policy_train_iters):
            for start in range(0, dataset_size, self.batch_size):
                end = start + self.batch_size
                batch_indices = indices[start:end]
    
                batch_obs = obs[batch_indices]
                batch_acts = acts[batch_indices]
                batch_old_log_probs = old_log_probs[batch_indices]
                batch_gaes = gaes[batch_indices]
                
                self.policy_optim.zero_grad()
                mean, logstd = self.actor(batch_obs)
                new_log_probs = tensor_log_prob(batch_acts, mean, logstd)

                policy_ratio = torch.exp(new_log_probs - batch_old_log_probs)
                clipped_ratio = policy_ratio.clamp(1 - self.ppo_clip_val, 1 + self.ppo_clip_val)
                batch_gaes_expanded = batch_gaes.unsqueeze(1)
                clipped_loss = clipped_ratio * batch_gaes_expanded
                full_loss = policy_ratio * batch_gaes_expanded
                policy_loss = -torch.min(full_loss, clipped_loss).mean()

                probs = torch.exp(new_log_probs)
                entropy = -torch.sum(probs * new_log_probs, dim=-1)
                entropy_loss = torch.mean(entropy)

                # Combine policy loss and entropy loss
                total_loss = policy_loss - self.entropy_coeff * entropy_loss
        
                total_loss.backward()
                self.policy_optim.step()

  def train_value(self, obs, returns):
        dataset_size = obs.size(0)
        indices = torch.randperm(dataset_size)
        for _ in range(self.value_train_iters):
            for start in range(0, dataset_size, self.batch_size):
                end = start + self.batch_size
                batch_indices = indices[start:end]    
                batch_obs = obs[batch_indices]
                batch_returns = returns[batch_indices]
                
                self.value_optim.zero_grad()
                values = self.critic(batch_obs).squeeze()
                
                value_loss = (batch_returns - values) ** 2
                value_loss = 0.5*value_loss.mean()
        
                value_loss.backward()
                self.value_optim.step()

In [None]:
def test_model(policy_model):
    env = gym.make("BipedalWalker-v3")
    test_episode_reward_list = []
    test_step_list = []
    max_steps = 1600
    for _ in range(100):
        epi_reward = 0
        step = 0
        obs = env.reset()[0]
        while True:
            with torch.inference_mode():
                action, logstd = policy_model(torch.from_numpy(obs))
                action = action.numpy()
                next_state, reward, done, info, _ = env.step(action)
                epi_reward += reward
                step+=1
                if done or step>=max_steps:
                    break
                else:
                    obs = next_state
        test_episode_reward_list.append(epi_reward)
        test_step_list.append(step)    
    print(f'Average Episode reward: {np.mean(test_episode_reward_list)} | Average step count: {np.mean(test_step_list)}')
    env.close()
    return np.mean(test_episode_reward_list)

In [None]:
env = gym.make("BipedalWalker-v3")
action_size = env.action_space.shape[0]
obs_size = env.observation_space.shape[0]
policy_model = PolicyModel(obs_size, action_size)
value_model = ValueModel(obs_size)
gamma=0.99
decay=0.95
ppo = PPOTrainer(
    policy_model,
    value_model,
    policy_lr = 0.0001,
    value_lr = 0.001,
    policy_train_iters = 8,
    value_train_iters = 8,
    batch_size=256)

memory = [[], [], [], [], [], []] # obs, action, reward, value, act_log_prob, dones
episode_reward_list = []
max_episode_steps = 1600
memory_size = 2048
max_steps = 10240000
step_count = 0
episode_step = 0
obs = env.reset()[0]
episode_reward = 0

while step_count<max_steps:
    with torch.inference_mode():
        mean, logstd = policy_model(torch.from_numpy(obs))
        value = value_model(torch.from_numpy(obs))
        mean = mean.numpy()
        logstd = logstd.numpy()
        value = value.squeeze().numpy()
    
        action = np.clip(mean + np.exp(logstd) * np.random.normal(loc=0.0, scale=1.0, size=logstd.shape), -1, 1)
        act_log_prob = log_prob(action, mean, logstd)
        next_state, reward, done, info, _ = env.step(action)

    episode_step += 1
    episode_reward += reward
    if episode_step >= max_episode_steps:
        done = True
        
    for i, item in enumerate((obs, action, reward, value, act_log_prob, done)):
            memory[i].append(item)
        
    step_count += 1
    if done:
        obs = env.reset()[0]
        episode_reward_list.append(episode_reward)
        episode_reward = 0
        episode_step = 0
        if len(episode_reward_list)%100 == 0:
            avg_reward = np.mean(episode_reward_list[-100:])
            print(f'Episode {len(episode_reward_list)} | Train Avg Reward {avg_reward:.1f} | Training Steps {step_count}')
            test_avg_reward = test_model(policy_model)
            if test_avg_reward >= 300:
                print(f"Training stopped early at episode {len(episode_reward_list)} due to reaching target average reward.")
                break
    else:
        obs = next_state
    if step_count%memory_size == 0:
        memory = [np.asarray(x) for x in memory] # obs, action, reward, value, act_log_prob, dones
        memory[3] = compute_gaes(memory[2], memory[3], memory[5], gamma, decay, normalize=True)
        memory[2] = discounted_rewards(memory[2], memory[5], gamma)

         # Shuffle
        permute_idxs = np.random.permutation(len(memory[0]))
        
        # Policy data
        obs_ = torch.tensor(memory[0][permute_idxs], dtype=torch.float32)
        acts_ = torch.tensor(memory[1][permute_idxs], dtype=torch.float32)
        returns_ = torch.tensor(memory[2][permute_idxs], dtype=torch.float32)
        gaes_ = torch.tensor(memory[3][permute_idxs], dtype=torch.float32)
        act_log_probs_ = torch.tensor(memory[4][permute_idxs], dtype=torch.float32)
        
        # Train model
        ppo.train_policy(obs_, acts_, act_log_probs_, gaes_)
        ppo.train_value(obs_, returns_)
        memory = [[], [], [], [], [], []]

env.close()

In [None]:
env = gym.make("BipedalWalker-v3", render_mode = 'human')
epi_reward = 0
step = 0
obs = env.reset()[0]
while True:
    step+=1
    with torch.inference_mode():
        action, logstd = policy_model(torch.from_numpy(obs))
        action = action.numpy()
        next_state, reward, done, info, _ = env.step(action)
        epi_reward += reward
        if done:
            break
        else:
            obs = next_state
print(f'Episode reward: {epi_reward} | step count: {step}')
env.close()

In [None]:
torch.save(policy_model.state_dict(), 'actor_model_weightsv1.2.pth')
torch.save(value_model.state_dict(), 'critic_model_weightsv1.2.pth')