# PPO on Humanoid-v4 using only Numpy

This notebook implements the Proximal Policy Optimization (PPO) algorithm from scratch using only `numpy` for the neural networks and optimization. The environment used is `Humanoid-v4` from Gymnasium.

In [None]:
!pip install gymnasium[mujoco]

In [None]:
import numpy as np
import gymnasium as gym
import time

# Set seeds
np.random.seed(42)

In [None]:
class Dense:
    def __init__(self, input_dim, output_dim):
        self.weights = np.random.randn(input_dim, output_dim) * np.sqrt(2. / input_dim)
        self.bias = np.zeros(output_dim)
        # Adam parameters
        self.m_w, self.v_w = np.zeros_like(self.weights), np.zeros_like(self.weights)
        self.m_b, self.v_b = np.zeros_like(self.bias), np.zeros_like(self.bias)
        self.t = 0
        self.input = None

    def forward(self, x):
        self.input = x
        return np.dot(x, self.weights) + self.bias

    def backward(self, grad_output, lr):
        self.t += 1
        grad_input = np.dot(grad_output, self.weights.T)
        grad_weights = np.dot(self.input.T, grad_output)
        grad_bias = np.sum(grad_output, axis=0)
        
        # Adam Update
        beta1, beta2, eps = 0.9, 0.999, 1e-8
        self.m_w = beta1 * self.m_w + (1 - beta1) * grad_weights
        self.v_w = beta2 * self.v_w + (1 - beta2) * grad_weights**2
        m_hat_w = self.m_w / (1 - beta1**self.t)
        v_hat_w = self.v_w / (1 - beta2**self.t)
        self.weights -= lr * m_hat_w / (np.sqrt(v_hat_w) + eps)

        self.m_b = beta1 * self.m_b + (1 - beta1) * grad_bias
        self.v_b = beta2 * self.v_b + (1 - beta2) * grad_bias**2
        m_hat_b = self.m_b / (1 - beta1**self.t)
        v_hat_b = self.v_b / (1 - beta2**self.t)
        self.bias -= lr * m_hat_b / (np.sqrt(v_hat_b) + eps)
        
        return grad_input

def relu(x):
    return np.maximum(0, x)

def d_relu(x):
    return (x > 0).astype(float)

def log_prob(x, mean, std):
    var = std**2
    return -0.5 * np.sum((x - mean)**2 / (var + 1e-8), axis=-1) - 0.5 * x.shape[-1] * np.log(2 * np.pi) - np.sum(np.log(std + 1e-8), axis=-1)

In [None]:
class PPOAgent:
    def __init__(self, obs_dim, act_dim, lr=1e-3, gamma=0.99, clip_ratio=0.2, lam=0.95):
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        self.lam = lam
        self.lr = lr
        
        # Policy Network (2 layers of 64)
        self.p_l1 = Dense(obs_dim, 64)
        self.p_l2 = Dense(64, 64)
        self.p_out = Dense(64, act_dim)
        self.log_std = np.zeros(act_dim) # Trainable log_std
        
        # Value Network (2 layers of 64)
        self.v_l1 = Dense(obs_dim, 64)
        self.v_l2 = Dense(64, 64)
        self.v_out = Dense(64, 1)

    def get_action(self, obs):
        if obs.ndim == 1: obs = obs[np.newaxis, :]
        h1 = relu(self.p_l1.forward(obs))
        h2 = relu(self.p_l2.forward(h1))
        mean = self.p_out.forward(h2)
        std = np.exp(self.log_std)
        action = mean + np.random.normal(0, 1, size=mean.shape) * std
        return action[0], mean[0], std

    def get_value(self, obs):
        if obs.ndim == 1: obs = obs[np.newaxis, :]
        h1 = relu(self.v_l1.forward(obs))
        h2 = relu(self.v_l2.forward(h1))
        val = self.v_out.forward(h2)
        return val[0, 0]

    def update(self, obs, act, ret, adv, old_log_probs):
        # Convert to batches if needed, here we assume full batch for simplicity or mini-batches
        # For pure numpy simplicity, we do full batch update
        
        # --- Policy Update ---
        # Forward pass
        h1 = relu(self.p_l1.forward(obs))
        h2 = relu(self.p_l2.forward(h1))
        mean = self.p_out.forward(h2)
        std = np.exp(self.log_std)
        
        # New Log Probs
        new_log_probs = log_prob(act, mean, std)
        ratio = np.exp(new_log_probs - old_log_probs)
        
        # PPO Loss Gradients
        # Loss = -min(ratio * adv, clip(ratio) * adv)
        # We need gradient of Loss w.r.t mean and log_std
        
        surr1 = ratio * adv
        surr2 = np.clip(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * adv
        
        # Gradient of min(surr1, surr2) w.r.t ratio
        mask = (surr1 < surr2).astype(float)
        d_loss_d_ratio = - (mask * adv + (1 - mask) * 0) # Simplified: if clipped, grad is 0 w.r.t ratio (approx)
        # Actually, if clipped, the gradient is 0. If not clipped, it's adv.
        # Correct logic: 
        # if ratio * adv < clipped * adv: grad is adv
        # else: grad is 0
        
        d_ratio = np.where(surr1 < surr2, adv, 0)
        
        d_log_prob = d_ratio * ratio
        
        # Gradient of log_prob w.r.t mean: (x - mean) / var
        d_mean = d_log_prob[:, np.newaxis] * (act - mean) / (std**2 + 1e-8)
        # Gradient of log_prob w.r.t log_std: -1 + ((x - mean)^2 / var)
        d_log_std = d_log_prob[:, np.newaxis] * (-1 + (act - mean)**2 / (std**2 + 1e-8))
        
        # Backprop through Policy MLP
        d_out = d_mean
        d_h2 = self.p_out.backward(d_out, self.lr)
        d_h2_relu = d_h2 * d_relu(h2)
        d_h1 = self.p_l2.backward(d_h2_relu, self.lr)
        d_h1_relu = d_h1 * d_relu(h1)
        self.p_l1.backward(d_h1_relu, self.lr)
        
        # Update log_std manually (it's a parameter)
        # Adam for log_std is skipped for brevity, just SGD
        self.log_std += self.lr * np.mean(d_log_std, axis=0)

        # --- Value Update ---
        # Forward
        v_h1 = relu(self.v_l1.forward(obs))
        v_h2 = relu(self.v_l2.forward(v_h1))
        values = self.v_out.forward(v_h2)
        
        # MSE Loss: (values - ret)^2
        # d_loss_d_val = 2 * (values - ret)
        d_val = 2 * (values - ret[:, np.newaxis])
        
        # Backprop Value
        d_v_h2 = self.v_out.backward(d_val, self.lr)
        d_v_h2_relu = d_v_h2 * d_relu(v_h2)
        d_v_h1 = self.v_l2.backward(d_v_h2_relu, self.lr)
        d_v_h1_relu = d_v_h1 * d_relu(v_h1)
        self.v_l1.backward(d_v_h1_relu, self.lr)

In [None]:
def compute_gae(rewards, values, dones, gamma=0.99, lam=0.95):
    advs = []
    gae = 0
    values = np.append(values, 0) # Bootstrap
    for i in reversed(range(len(rewards))):
        delta = rewards[i] + gamma * values[i+1] * (1 - dones[i]) - values[i]
        gae = delta + gamma * lam * (1 - dones[i]) * gae
        advs.insert(0, gae)
    return np.array(advs)

# Main Loop
env = gym.make('Humanoid-v4')
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]

agent = PPOAgent(obs_dim, act_dim)

epochs = 10
steps_per_epoch = 2048

for epoch in range(epochs):
    obs, _ = env.reset()
    batch_obs, batch_act, batch_rew, batch_val, batch_log_prob, batch_done = [], [], [], [], [], []
    
    for t in range(steps_per_epoch):
        action, mean, std = agent.get_action(obs)
        val = agent.get_value(obs)
        
        next_obs, rew, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        batch_obs.append(obs)
        batch_act.append(action)
        batch_rew.append(rew)
        batch_val.append(val)
        batch_log_prob.append(log_prob(action, mean, std))
        batch_done.append(done)
        
        obs = next_obs
        if done:
            obs, _ = env.reset()
            
    # Compute GAE
    batch_obs = np.array(batch_obs)
    batch_act = np.array(batch_act)
    batch_rew = np.array(batch_rew)
    batch_val = np.array(batch_val)
    batch_done = np.array(batch_done)
    
    advs = compute_gae(batch_rew, batch_val, batch_done)
    returns = advs + batch_val
    
    # Normalize Advantages
    advs = (advs - advs.mean()) / (advs.std() + 1e-8)
    
    # Update
    old_log_probs = np.array(batch_log_prob)
    agent.update(batch_obs, batch_act, returns, advs, old_log_probs)
    
    print(f"Epoch {epoch+1}, Mean Reward: {np.sum(batch_rew)}")

env.close()