In [1]:
# @title 1. Environment Setup & Installation

!apt-get install -y \
    libgl1-mesa-dev \
    libgl1-mesa-glx \
    libglew-dev \
    libosmesa6-dev \
    software-properties-common \
    patchelf \
    ffmpeg \
    xvfb

!pip install gymnasium[mujoco] stable-baselines3 shimmy imageio[ffmpeg] pyvirtualdisplay


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
software-properties-common is already the newest version (0.99.22.9).
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
libgl1-mesa-glx is already the newest version (23.0.4-0ubuntu1~22.04.1).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.16).
The following additional packages will be installed:
  libegl-dev libgl-dev libgles-dev libgles1 libglew2.2 libglu1-mesa
  libglu1-mesa-dev libglvnd-core-dev libglvnd-dev libglx-dev libopengl-dev
  libosmesa6
Suggested packages:
  glew-utils
The following NEW packages will be installed:
  libegl-dev libgl-dev libgl1-mesa-dev libgles-dev libgles1 libglew-dev
  libglew2.2 libglu1-mesa libglu1-mesa-dev libglvnd-core-dev libglvnd-dev
  libglx-dev libopengl-dev libosmesa6 libosmesa6-dev patchelf
0 upgraded, 16 newly installed, 0 to remove and 2 not upgraded.
Need to get 4,275 kB of archives.
After this operation, 20.5 MB of a

In [None]:
#   ✗ NO alive bonus — zero reward for standing still
#   ✗ NO phase state machine — too hard to discover in sequence
#   ✓ Upward velocity as PRIMARY signal (always on, not gated)
#   ✓ LOW flight threshold (z > 0.8) — micro-hops get rewarded
#   ✓ Flight reward scales with height — incentivizes jumping HIGHER
#   ✓ Smooth gradient: standing (≈0.1) → extending (≈0.6) → hop (≈3) → jump (≈6)
# =============================================================================

import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize
from stable_baselines3.common.callbacks import BaseCallback
import torch.nn as nn
import torch
import os

# ── Geom IDs (Humanoid-v5) ──────────────────────────────────────────────────
FLOOR_GEOM_ID      = 0
RIGHT_FOOT_GEOM_ID = 8
LEFT_FOOT_GEOM_ID  = 11
FOOT_GEOM_IDS      = {RIGHT_FOOT_GEOM_ID, LEFT_FOOT_GEOM_ID}

STANDING_Z = 1.4


# =============================================================================
# Jump Wrapper v2c — Simplified
# =============================================================================
class JumpRewardWrapper(gym.Wrapper):
    """
    Simplified jump reward. No phase machine, no alive bonus.

    Reward gradient:
        standing still    →  ~0.08 / step  (just the height baseline)
        extending legs    →  ~0.6  / step  (upward velocity kicks in)
        micro-hop         →  ~3.0  / step  (flight bonus at any height)
        real jump         →  ~6.0  / step  (flight + height scaling)
    """

    def __init__(self, env, max_episode_steps=300):
        super().__init__(env)
        self.max_episode_steps = max_episode_steps
        self._reset_tracking()

    def _reset_tracking(self):
        self.step_count = 0
        self.max_z = -np.inf
        self.total_flight_steps = 0       # steps with feet off ground + z > 0.8
        self.max_flight_z = -np.inf
        self.entered_flight = False

        # Action history for jerk penalty
        self.action_hist = [
            np.zeros(self.action_space.shape),
            np.zeros(self.action_space.shape),
        ]

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self._reset_tracking()
        return obs, info

    # ── Contact helpers ──────────────────────────────────────────────────
    def _get_floor_contacts(self):
        d = self.unwrapped.data
        touching_floor = set()
        for i in range(d.ncon):
            c = d.contact[i]
            pair = {c.geom1, c.geom2}
            if FLOOR_GEOM_ID in pair:
                other = (pair - {FLOOR_GEOM_ID}).pop()
                touching_floor.add(other)
        return touching_floor

    def _feet_on_ground(self, floor_contacts):
        return len(FOOT_GEOM_IDS & floor_contacts)

    def _non_foot_on_ground(self, floor_contacts):
        return len(floor_contacts - FOOT_GEOM_IDS) > 0

    # ── Main step ────────────────────────────────────────────────────────
    def step(self, action):
        obs, _, terminated, truncated, info = self.env.step(action)
        self.step_count += 1

        # ── Gather state ──
        x, y, z = self.unwrapped.data.qpos[0:3]
        vx, vy, vz = self.unwrapped.data.qvel[0:3]
        floor_contacts = self._get_floor_contacts()
        n_feet = self._feet_on_ground(floor_contacts)
        body_fell = self._non_foot_on_ground(floor_contacts)
        airborne = (n_feet == 0) and (not body_fell)

        self.max_z = max(self.max_z, z)

        # ══════════════════════════════════════════════════════════════════
        # REWARD — smooth gradient from standing → jumping
        # ══════════════════════════════════════════════════════════════════
        reward = 0.0

        # ▸ 1. UPWARD VELOCITY — primary gradient signal
        #   Always on, not gated by phase or position.
        #   Rewards any upward movement: leg extension, push-off, etc.
        #   Capped to prevent outliers from messing up VecNormalize.
        reward += 0.5 * np.clip(vz, 0.0, 5.0)

        # ▸ 2. FLIGHT BONUS — the big reward for actual airborne time
        #   LOW threshold: z > 0.8 (just "not collapsed on the floor")
        #   so even tiny hops where COM dips to z≈1.2 during push-off
        #   get rewarded. Then scales heavily with height.
        if airborne and z > 0.8:
            reward += 2.0                                     # base: "you're airborne!"
            reward += 5.0 * max(0.0, z - STANDING_Z)         # bonus for height
            reward += 1.0 * np.clip(vz, 0.0, 5.0)           # going up while flying

            # Track for metrics
            self.total_flight_steps += 1
            self.max_flight_z = max(self.max_flight_z, z)
            self.entered_flight = True

        # ▸ 3. HEIGHT BASELINE — tiny always-on signal
        #   Provides gradient toward "be tall / extend legs."
        #   Standing at z=1.4 gives 0.08, which is almost nothing.
        reward += 0.2 * max(0.0, z - 1.0)

        # ▸ 4. NO ALIVE BONUS — standing still ≈ 0.08 reward (just height)

        # ▸ 5. POSTURE — penalize body parts (not feet) touching floor
        if body_fell:
            reward -= 3.0

        # ▸ 6. STAY CENTERED
        drift = np.sqrt(x**2 + y**2)
        reward -= 0.1 * drift

        # ▸ 7. JERK PENALTY — catches vibration, allows explosive moves
        jerk = action - 2.0 * self.action_hist[-1] + self.action_hist[-2]
        reward -= 0.02 * np.sum(np.square(jerk))

        # ▸ 8. CONTROL COST
        reward -= 0.005 * np.sum(np.square(action))

        # ▸ 9. EPISODE END — info for metrics (no terminal bonus)
        is_done = terminated or truncated or (self.step_count >= self.max_episode_steps)
        if is_done:
            if self.entered_flight:
                info["jump_height"] = self.max_flight_z - STANDING_Z
                info["flight_steps"] = self.total_flight_steps
            if not terminated and not truncated:
                truncated = True

        # ── TERMINATION ──
        if z < 0.3:
            terminated = True
            reward -= 3.0
        if drift > 1.0:
            terminated = True
            reward -= 2.0

        # Update action history
        self.action_hist[-2] = self.action_hist[-1].copy()
        self.action_hist[-1] = action.copy()

        return obs, reward, terminated, truncated, info


# =============================================================================
# Callbacks
# =============================================================================
class JumpMetricsCallback(BaseCallback):
    def __init__(self, print_every=10_000, verbose=0):
        super().__init__(verbose)
        self.print_every = print_every
        self.jump_heights = []
        self.flight_steps_list = []
        self.ep_count = 0
        self.flight_ep_count = 0

    def _on_step(self) -> bool:
        for info in self.locals.get("infos", []):
            self.ep_count += 1
            if "jump_height" in info:
                self.jump_heights.append(info["jump_height"])
                self.flight_steps_list.append(info["flight_steps"])
                self.flight_ep_count += 1

        if self.num_timesteps % self.print_every == 0:
            if self.jump_heights:
                recent_h = self.jump_heights[-50:]
                recent_f = self.flight_steps_list[-50:]
                frac = self.flight_ep_count / max(self.ep_count, 1)
                print(
                    f"  [Jump] step={self.num_timesteps:,}  "
                    f"flight_rate={frac:.1%}  "
                    f"avg_h={np.mean(recent_h):.3f}m  "
                    f"max_h={np.max(recent_h):.3f}m  "
                    f"avg_flight={np.mean(recent_f):.1f}steps"
                )
                self.logger.record("jump/avg_height", np.mean(recent_h))
                self.logger.record("jump/max_height", np.max(recent_h))
                self.logger.record("jump/flight_rate", frac)
            else:
                print(
                    f"  [Jump] step={self.num_timesteps:,}  "
                    f"NO FLIGHTS YET ({self.ep_count} episodes)"
                )
        return True


class DiagnosticCallback(BaseCallback):
    def __init__(self, print_every=50_000, verbose=0):
        super().__init__(verbose)
        self.print_every = print_every

    def _on_step(self) -> bool:
        if self.num_timesteps % self.print_every == 0:
            logs = self.logger.name_to_value
            kl = logs.get("train/approx_kl", "?")
            clip = logs.get("train/clip_fraction", "?")
            ev = logs.get("train/explained_variance", "?")
            std = logs.get("train/std", "?")
            print(
                f"  [Health] step={self.num_timesteps:,}  "
                f"kl={kl}  clip={clip}  expl_var={ev}  std={std}"
            )
        return True


# =============================================================================
# Environment factory
# =============================================================================
def make_env(max_episode_steps=300, seed=0):
    def _init():
        env = gym.make("Humanoid-v5", render_mode=None)
        env = JumpRewardWrapper(env, max_episode_steps=max_episode_steps)
        env.reset(seed=seed)
        return env
    return _init


# =============================================================================
# Training
# =============================================================================
def run_training(
    n_envs: int = 8,
    total_timesteps: int = 5_000_000,
    max_episode_steps: int = 300,
):
    print(f"=== Humanoid Jump Training v2c ===")
    print(f"  Parallel envs:     {n_envs}")
    print(f"  Total timesteps:   {total_timesteps:,}")
    print(f"  Max episode steps: {max_episode_steps}")
    print(f"  Device:            {'cuda' if torch.cuda.is_available() else 'cpu'}")
    print(f"  Key changes:       no alive bonus, no phase machine,")
    print(f"                     low flight threshold, velocity gradient")
    print()

    # ── Environments ──
    venv = SubprocVecEnv([
        make_env(max_episode_steps=max_episode_steps, seed=i)
        for i in range(n_envs)
    ])
    env = VecNormalize(
        venv,
        norm_obs=True,
        norm_reward=True,
        clip_obs=10.0,
        clip_reward=10.0,
        gamma=0.99,
    )

    # ── PPO ──
    policy_kwargs = dict(
        activation_fn=nn.ELU,
        net_arch=dict(pi=[256, 256], vf=[256, 256]),
    )

    model = PPO(
        "MlpPolicy",
        env,
        verbose=1,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=1024,
        n_epochs=5,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01,
        vf_coef=0.5,
        max_grad_norm=0.5,
        target_kl=0.05,
        device="cuda" if torch.cuda.is_available() else "cpu",
        policy_kwargs=policy_kwargs,
        tensorboard_log="./jump_tb_logs/",
    )

    # ── Train ──
    print("--- Starting training ---\n")
    model.learn(
        total_timesteps=total_timesteps,
        callback=[
            JumpMetricsCallback(print_every=20_000, verbose=1),
            DiagnosticCallback(print_every=100_000, verbose=1),
        ],
    )
    print("\n--- Training complete ---")

    model.save("humanoid_jumper_v2")
    env.save("humanoid_jumper_v2_vecnorm.pkl")
    env.close()
    print("Saved: humanoid_jumper_v2.zip + humanoid_jumper_v2_vecnorm.pkl")

    return model


# =============================================================================
# Run
# =============================================================================
model = run_training()


=== Humanoid Jump Training v2c ===
  Parallel envs:     8
  Total timesteps:   5,000,000
  Max episode steps: 300
  Device:            cpu
  Key changes:       no alive bonus, no phase machine,
                     low flight threshold, velocity gradient



Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


Using cpu device
--- Starting training ---

Logging to ./jump_tb_logs/PPO_1


  return datetime.utcnow().replace(tzinfo=utc)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  [Jump] step=1,520,000  flight_rate=1.5%  avg_h=-0.000m  max_h=0.034m  avg_flight=68.0steps
-----------------------------------------
| jump/                   |             |
|    avg_height           | -0.000384   |
|    flight_rate          | 0.0154      |
|    max_height           | 0.0339      |
| time/                   |             |
|    fps                  | 911         |
|    iterations           | 93          |
|    time_elapsed         | 1672        |
|    total_timesteps      | 1523712     |
| train/                  |             |
|    approx_kl            | 0.047659848 |
|    clip_fraction        | 0.385       |
|    clip_range           | 0.2         |
|    entropy_loss         | -27.4       |
|    explained_variance   | 0.91        |
|    learning_rate        | 0.0003      |
|    loss                 | -0.3        |
|    n_updates            | 458         |
|    policy_gradient_loss | -0.0439     |
| 

In [3]:
# @title 3. Display Video

from pyvirtualdisplay import Display
import os

display = Display(visible=0, size=(1400, 900))
display.start()
os.environ['MUJOCO_GL'] = 'egl'

import gymnasium as gym
import numpy as np
import imageio
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from IPython.display import Video


def record_jump_video(
    model_path="humanoid_jumper_v2",
    vecnorm_path="humanoid_jumper_v2_vecnorm.pkl",
    video_path="jump_demo_v2.mp4",
    duration=500,
):
    venv = DummyVecEnv([lambda: JumpRewardWrapper(
        gym.make("Humanoid-v5", render_mode="rgb_array"),
        max_episode_steps=duration,
    )])

    if os.path.exists(vecnorm_path):
        env = VecNormalize.load(vecnorm_path, venv)
        env.training = False
        env.norm_reward = False
        print("Loaded VecNormalize stats.")
    else:
        print("Warning: VecNormalize stats not found.")
        env = venv

    if not os.path.exists(model_path + ".zip"):
        print(f"Error: Model {model_path} not found.")
        return None

    model = PPO.load(model_path)
    obs = env.reset()
    frames = []

    print(f"Recording {duration} steps...")
    for _ in range(duration):
        frame = env.venv.envs[0].render()
        frames.append(frame)
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, info = env.step(action)

    env.close()
    imageio.mimsave(video_path, frames, fps=30)
    print(f"Video saved to {video_path}")
    return video_path


video_file = record_jump_video()

if video_file:
    from IPython.display import display as ipy_display
    ipy_display(Video(video_file, embed=True, html_attributes="controls autoplay loop"))


Loaded VecNormalize stats.
Recording 500 steps...


  return datetime.utcnow().replace(tzinfo=utc)


Video saved to jump_demo_v2.mp4


In [4]:
# @title 4. Download Model + Normalizer

from google.colab import files
import os

for f in ["humanoid_jumper_v2.zip", "humanoid_jumper_v2_vecnorm.pkl"]:
    if os.path.exists(f):
        print(f"Downloading {f}...")
        files.download(f)
    else:
        print(f"Not found: {f}")


Downloading humanoid_jumper_v2.zip...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloading humanoid_jumper_v2_vecnorm.pkl...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>