# Neurotheon - Reinforcement Learning on Humanoid-v5

```bash
| \ | | _____ _____ _ | | _ __ _ __()_ __ __ _
| | |/ _ \ / / _ \ '| _| ' | '| | '_ \ / ` |
| |\ | __/> < __/ | | || |) | | | | | | | (| |_
|| _|___//__|| _| ./|| ||| ||_, ()
|| |__/
NEUROTHEON · Project Code: NRTH
```

**Goal:** Train a bipedal humanoid to walk and balance using a custom reward function.
Using PPO, multiple parallel environments, and a shaped reward that encourages:
- Moving forward
- Staying upright
- Smooth actions
- Sustained walking

I'm logging reward components to TensorBoard for analysis.


In [None]:
import os
import torch
import numpy as np
from PIL import Image
import gymnasium as gym
from collections import deque
import matplotlib.pyplot as plt
from stable_baselines3 import PPO
from PIL import ImageDraw, ImageFont
from IPython.display import Video, display
from moviepy.editor import ImageSequenceClip
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.callbacks import BaseCallback

In [None]:
# Change working directory if needed
dirpath = os.path.dirname(os.getcwd())
os.chdir(dirpath)

In [None]:
# Check for GPU availability
print("Torch CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    print("Using CPU")

In [None]:
# Paths and logs
log_dir = "./logs/ppo_humanoid/"
os.makedirs(log_dir, exist_ok=True)
model_dir = "./models"
os.makedirs(model_dir, exist_ok=True)

In [None]:
# Define parallel environment setup (16 workers)
NUM_ENVS = min(os.cpu_count() // 2, 16)

def make_env():
    def _init():
        env = gym.make("Humanoid-v5")
        return Monitor(env)
    return _init

In [None]:
# Custom reward shaping wrapper with survival, forward movement, posture, and smoothness
def custom_reward_wrapper(env):
    class CustomReward(gym.Wrapper):
        def __init__(self, env):
            super().__init__(env)
            self.episode_length = 0
            self.past_lengths = deque(maxlen=50)  # Rolling window
            self.last_bonus = 0.0

        def reset(self, **kwargs):
            self.episode_length = 0
            return self.env.reset(**kwargs)

        def step(self, action):
            obs, reward, terminated, truncated, info = self.env.step(action)

            forward = info.get("reward_forward", 0)
            survive = info.get("reward_survive", 0)
            ctrl = info.get("reward_ctrl", 0)

            shaped = (
                2.5 * forward +
                1.0 * survive -
                0.25 * ctrl
            )

            # Upright posture
            torso_angle = obs[2] if len(obs) > 2 else 0
            upright_bonus = 0.5 if abs(torso_angle) < 0.2 else 0.0
            shaped += upright_bonus

            self.episode_length += 1
            walk_bonus = 0.0

            if terminated or truncated:
                # Add to history
                self.past_lengths.append(self.episode_length)

                # Compute moving average
                avg_len = np.mean(self.past_lengths) if self.past_lengths else 0

                # Bonus if current episode outperforms average
                if self.episode_length > avg_len:
                    walk_bonus = 0.5

                shaped += walk_bonus
                self.last_bonus = walk_bonus  # log for visualization

            # Logging
            info["shaped_reward"] = shaped
            info["bonus_upright"] = upright_bonus
            info["bonus_duration"] = self.last_bonus  # only gets non-zero at end
            info["ep_len_avg"] = np.mean(self.past_lengths) if self.past_lengths else 0

            return obs, shaped, terminated, truncated, info

    return CustomReward(env)

In [None]:
# Apply reward wrapper to parallel environments
env_wrapped = SubprocVecEnv([
    lambda: custom_reward_wrapper(make_env()()) for _ in range(NUM_ENVS)
])

n_steps = 2048  # good
batch_size = n_steps * NUM_ENVS // 4

# PPO with good hyperparams for humanoid locomotion
device = "cpu"
model = PPO(
    "MlpPolicy",
    env_wrapped,
    verbose=0,
    tensorboard_log=log_dir,
    device=device,
    n_steps=n_steps,
    batch_size=batch_size,
    n_epochs=10,
    learning_rate=2.5e-4
)

In [None]:
# Callback to log custom reward components to TensorBoard
class RewardLoggingCallback(BaseCallback):
    def __init__(self, verbose=0):
        super().__init__(verbose)

    def _on_step(self) -> bool:
        infos = self.locals.get("infos", [])
        for info in infos:
            if "shaped_reward" in info:
                self.logger.record("custom/reward_total", info["shaped_reward"])
                self.logger.record("custom/reward_forward", info.get("reward_forward", 0))
                self.logger.record("custom/reward_survive", info.get("reward_survive", 0))
                self.logger.record("custom/reward_ctrl", info.get("reward_ctrl", 0))
                self.logger.record("custom/bonus_upright", info.get("bonus_upright", 0))
                self.logger.record("custom/bonus_duration", info.get("bonus_duration", 0))
                self.logger.record("custom/ep_len_avg", info.get("ep_len_avg", 0))
        return True

In [None]:
# Start training with callback
callback = RewardLoggingCallback()
model.learn(total_timesteps=1_000_000, callback=callback, progress_bar=True)
model.save(f"{model_dir}/neurotheon_ppo_1m")

In [None]:
# Now launch TensorBoard in your terminal (not notebook):
# --------------------------------------------------------
# source .venv/bin/activate
# tensorboard --logdir=./logs/ppo_humanoid --port=6006
# Open http://localhost:6006 in your browser

### Visualization

In [None]:
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator

In [None]:
def load_rewards(tb_path):
    ea = EventAccumulator(tb_path)
    ea.Reload()
    vals = ea.Scalars("rollout/ep_rew_mean")
    steps = [v.step for v in vals]
    rews = [v.value for v in vals]
    return steps, rews

def render_milestone(model, step, reward_val):
    env = gym.make("Humanoid-v5", render_mode="rgb_array")
    obs, _ = env.reset()
    frames = []

    for _ in range(250):
        frame = env.render()
        action, _ = model.predict(obs, deterministic=True)
        obs, _, done, trunc, _ = env.step(action)
        frames.append(frame)
        if done or trunc:
            break

    return frames, reward_val

def draw_reward_plot(steps, rewards, current_step):
    fig, ax = plt.subplots(figsize=(9.6, 10.8))
    ax.plot(steps, rewards, lw=3, color="lightblue")
    ax.axvline(current_step, ls="--", color="white")
    ax.set_title("Reward over Time", color="white")
    ax.set_facecolor("black")
    for spine in ax.spines.values():
        spine.set_color('white')
    ax.tick_params(colors='white')
    fig.patch.set_facecolor("black")
    canvas = FigureCanvas(fig)
    canvas.draw()
    buf, (w, h) = canvas.print_to_buffer()
    img = np.frombuffer(buf, dtype=np.uint8).reshape(h, w, 4)[:, :, :3]
    plt.close(fig)
    return Image.fromarray(img)

def build_frame(video_frame, plot_img, step, reward_val):
    canvas = Image.new("RGB", (1920, 1080), (0, 0, 0))
    canvas.paste(plot_img, (0, 0))
    canvas.paste(Image.fromarray(video_frame).resize((960, 1080)), (960, 0))
    draw = ImageDraw.Draw(canvas)
    font = ImageFont.load_default()
    draw.text((1000, 60), f"Step: {step:,}", fill="white", font=font)
    draw.text((1000, 120), f"Reward: {reward_val:.2f}", fill="lightblue", font=font)
    return np.array(canvas)

In [None]:
model = PPO.load("models/neurotheon_ppo_1m")
steps, rewards = load_rewards("logs/ppo_humanoid/PPO_1")

milestones = [0, 250_000, 500_000, 750_000, 1_000_000]
frames_all = []

for step in milestones:
    reward_val = np.interp(step, steps, rewards)
    vid_frames, _ = render_milestone(model, step, reward_val)
    plot_img = draw_reward_plot(steps, rewards, step)

    for vf in vid_frames:
        combined = build_frame(vf, plot_img, step, reward_val)
        frames_all.append(combined)

clip = ImageSequenceClip(frames_all, fps=30)
clip.write_videofile(f"{"videos"}/humanoid_milestone_timelapse.mp4", codec="libx264")
display(Video(f"{"videos"}/humanoid_milestone_timelapse.mp4", embed=True))