# Freenove Robot Dog - RL Locomotion Training

Train a PPO policy to walk using **mjlab** (MuJoCo Warp + rsl_rl).

**Requirements**: GPU runtime (A100/V100/T4). Go to Runtime > Change runtime type > GPU.

## Architecture
```
MuJoCo Warp (GPU physics)  -->  mjlab (env managers)  -->  rsl_rl (PPO)  -->  Trained Policy
     |                              |                          |
  freenove_dog.xml          env_cfgs.py (rewards,        rl_cfg.py (network,
  (robot model)            sensors, terminations)        learning rate)
```

### Key training parameters
| Parameter | Value | Notes |
|-----------|-------|-------|
| Action scale | HAA: 0.20, HFE/KFE: 0.25 rad | ~14 deg per action step |
| Velocity curriculum | 0.1 -> 0.2 -> 0.3 m/s | 3-stage ramp-up |
| Foot clearance target | 20mm | Scaled for small 99mm-tall robot |
| Standing pose | HFE=0.35, KFE=0.6 rad | Deep bend for ground clearance |
| Illegal contact | base_collision only | Shanks/thighs allowed near ground |
| Spawn height | 120mm | Extra margin for settling |

## 1. Setup

Clone the project from GitHub.

In [None]:
!rm -rf /content/quadruped
!git clone https://github.com/Solace-Stephane/freenove-quadruped-rl.git /content/quadruped

# Verify the project structure
!ls /content/quadruped/src/freenove_velocity/
!ls /content/quadruped/README.md

In [None]:
# Install uv (fast Python package manager)
!pip install uv

# Verify GPU is available
!nvidia-smi

## 2. Configure Weights & Biases (optional)

W&B tracks training metrics (rewards, losses, episode lengths).
Run ONE of the cells below.

In [None]:
# Option A: Use W&B offline (no account needed, logs saved locally)
!wandb offline

In [None]:
# Option B: Login to W&B for online tracking (enter API key when prompted)
# !wandb login

In [None]:
# Mount Google Drive for checkpoint backup
from google.colab import drive
drive.mount('/content/drive')
!mkdir -p /content/drive/MyDrive/quadruped_checkpoints
print('Google Drive mounted. Checkpoints will be backed up to: /content/drive/MyDrive/quadruped_checkpoints/')

## 3. Sanity Check: Zero-Action Playback

Watch the robot stand and fall under gravity with zero actions.
This verifies the MJCF model loads correctly.

In [None]:
import subprocess
import sys

process = subprocess.Popen(
  [
    "uv", "run",
    "--project", "/content/quadruped",
    "play",
    "Mjlab-Velocity-Flat-Freenove-Dog",
    "--agent", "zero",
  ],
  stdout=subprocess.PIPE,
  stderr=subprocess.STDOUT,
  universal_newlines=True,
  bufsize=1,
  cwd="/content/quadruped",
)

for line in process.stdout:
  print(line, end="")
  sys.stdout.flush()
  if "serving" in line.lower() or "running on" in line.lower() or "listening" in line.lower() or "8081" in line or "8082" in line:
    print("\n" + "=" * 50)
    print("Server is running! Execute the next cell to view.")
    print("=" * 50)
    break

In [None]:
from google.colab import output
output.serve_kernel_port_as_iframe(8082)

In [None]:
# Stop the playback server before training
process.terminate()
process.wait()
print("Playback stopped.")

## 4. Train the Policy

Train PPO on flat terrain. Key parameters:
- `--env.scene.num-envs`: parallel environments (higher = faster, more VRAM)
- `--agent.max-iterations`: training steps (8000 for full convergence)

**Expected time**: ~1-2 hours on A100, ~3-4 hours on T4.

Training runs **in the background** so you can use the live preview cells below.

**What to watch for** (healthy training):
- `mean_episode_length` climbing above 10+ within the first 200 iterations
- `illegal_contact` terminations should be LOW (only base body touching ground)
- `mean_reward` steadily increasing, with `track_linear_velocity` becoming the dominant reward

> **Note**: If `mean_episode_length` stays below 5, the robot is still collapsing immediately.
> This was fixed by narrowing illegal contact detection to `base_collision` only.

In [None]:
import subprocess, glob, os, time, shutil, threading

# --- Step 1: Sync venv ---
print('Syncing dependencies...')
subprocess.run(['uv', 'sync', '--project', '/content/quadruped'], check=True, capture_output=True)
print('Dependencies synced.')

# --- Step 2: Patch rsl_rl (clamp noise std >= 0.01) ---
for fp in glob.glob('/content/quadruped/.venv/**/rsl_rl/models/mlp_model.py', recursive=True):
    with open(fp) as f: code = f.read()
    if 'scale.clamp' not in code:
        old = 'return self.distribution.sample()'
        for line in code.splitlines():
            if old in line:
                indent = line[:len(line) - len(line.lstrip())]
                break
        clamp_line = indent + 'self.distribution = __import__("torch").distributions.Normal(self.distribution.loc, self.distribution.scale.clamp(min=0.01))'
        code = code.replace(indent + old, clamp_line + '\n' + indent + old, 1)
        with open(fp, 'w') as f: f.write(code)
        print(f'Patched: {fp}')
    else:
        print(f'Already patched: {fp}')

# --- Step 3: Auto-backup to Google Drive every 5 minutes ---
def backup_checkpoints():
    src = '/content/quadruped/logs/freenove_dog_velocity/'
    dst = '/content/drive/MyDrive/quadruped_checkpoints/'
    while True:
        time.sleep(300)  # every 5 minutes
        if not os.path.exists(src): continue
        runs = sorted(glob.glob(os.path.join(src, '*')))
        if not runs: continue
        latest = runs[-1]
        dst_run = os.path.join(dst, os.path.basename(latest))
        os.makedirs(dst_run, exist_ok=True)
        pts = glob.glob(os.path.join(latest, '*.pt'))
        for pt in pts:
            shutil.copy2(pt, dst_run)
        print(f'[Backup] {len(pts)} checkpoints saved to Google Drive')

if os.path.exists('/content/drive/MyDrive'):
    os.makedirs('/content/drive/MyDrive/quadruped_checkpoints/', exist_ok=True)
    backup_thread = threading.Thread(target=backup_checkpoints, daemon=True)
    backup_thread.start()
    print('Auto-backup to Google Drive enabled (every 5 min)')
else:
    print('Google Drive not mounted - skipping auto-backup')

# --- Step 4: Start training ---
cmd = [
    'uv', 'run', '--no-sync', '--project', '/content/quadruped',
    'train',
    'train',
    'Mjlab-Velocity-Flat-Freenove-Dog',
    '--env.scene.num-envs', '4096',
    '--agent.max-iterations', '8000',
]
print(f'Starting training...')
train_process = subprocess.Popen(
    cmd,
    stdout=open('/content/quadruped/train.log', 'w'),
    stderr=subprocess.STDOUT,
    cwd='/content/quadruped',
)
print(f'Training PID: {train_process.pid}')

# Wait for initial output
time.sleep(10)
if train_process.poll() is not None:
    print(f'Training FINISHED (return code: {train_process.returncode})')
    with open('/content/quadruped/train.log') as f:
        print(f.read()[-2000:])
else:
    print('Training running in background!')
    with open('/content/quadruped/train.log') as f:
        print(f.read()[-500:])


In [None]:
# Check training status and recent output (run anytime)
import subprocess

rc = train_process.poll()
if rc is None:
  print("Training is RUNNING...\n")
else:
  print(f"Training FINISHED (return code: {rc})\n")

# Show last 20 lines of output
!tail -20 /content/quadruped/train.log

## 5. Live Preview During Training

Watch the robot in the simulator **while training is still running**.

This launches `play` with `--wandb-run-path`, which automatically fetches
the latest checkpoint from W&B and replays it. Re-run these cells at any
time to see updated behavior as the policy improves.

**Requires**: W&B online mode (section 2, option B).

In [None]:
import subprocess
import sys

# Set your W&B run path here (printed in training output, or check wandb.ai)
WANDB_RUN_PATH = "stfcodesyt-saberai/mjlab/"  # @param {type:"string"}

# If not set, try to find it from the training log
if WANDB_RUN_PATH == "stfcodesyt-saberai/mjlab/":
  import re
  try:
    log_text = open("/content/quadruped/train.log").read()
    # wandb prints the run path as "Run page: https://wandb.ai/org/project/runs/XXXX"
    match = re.search(r'wandb\.ai/(\S+/\S+/runs/\S+)', log_text)
    if match:
      WANDB_RUN_PATH = match.group(1)
      print(f"Auto-detected W&B run path: {WANDB_RUN_PATH}")
    else:
      print("Could not auto-detect run path from log.")
      print("Set WANDB_RUN_PATH manually above (e.g. 'stfcodesyt-saberai/mjlab/runs/abc123')")
  except FileNotFoundError:
    print("Training log not found. Start training first (section 4).")

if WANDB_RUN_PATH and WANDB_RUN_PATH != "stfcodesyt-saberai/mjlab/":
  # Kill any previous preview process
  try:
    preview_process.terminate()
    preview_process.wait(timeout=5)
  except (NameError, Exception):
    pass

  preview_process = subprocess.Popen(
    [
      "uv", "run",
      "--project", "/content/quadruped",
      "play",
      "Mjlab-Velocity-Flat-Freenove-Dog",
      "--wandb-run-path", WANDB_RUN_PATH,
    ],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    universal_newlines=True,
    bufsize=1,
    cwd="/content/quadruped",
  )

  for line in preview_process.stdout:
    print(line, end="")
    sys.stdout.flush()
    if "serving" in line.lower() or "running on" in line.lower() or "listening" in line.lower() or "8081" in line or "8082" in line:
      print("\n" + "=" * 50)
      print("Live preview ready! Run the next cell to view.")
      print("=" * 50)
      break

In [None]:
from google.colab import output
output.serve_kernel_port_as_iframe(8082)

In [None]:
# Stop live preview (run this before section 6)
try:
  preview_process.terminate()
  preview_process.wait(timeout=5)
  print("Live preview stopped.")
except (NameError, Exception):
  print("No preview running.")

In [None]:
# Wait for training to finish (run this when you're done previewing)
if train_process.poll() is None:
  print("Waiting for training to finish...")
  train_process.wait()
print(f"Training finished with return code: {train_process.returncode}")

# Show final output
!tail -5 /content/quadruped/train.log

## 6. Play the Trained Policy (after training)

Visualize the final trained policy in the simulator.

In [None]:
import subprocess
import sys
import glob

# Find the latest checkpoint
log_dirs = sorted(glob.glob("/content/quadruped/logs/rsl_rl/freenove_dog_velocity/*/"))
if log_dirs:
  latest_run = log_dirs[-1]
  print(f"Latest run: {latest_run}")
else:
  print("No training runs found. Train first!")
  latest_run = None

if latest_run:
  process = subprocess.Popen(
    [
      "uv", "run",
      "--project", "/content/quadruped",
      "play",
      "Mjlab-Velocity-Flat-Freenove-Dog",
      "--log-dir", latest_run,
    ],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    universal_newlines=True,
    bufsize=1,
    cwd="/content/quadruped",
  )

  for line in process.stdout:
    print(line, end="")
    sys.stdout.flush()
    if "serving" in line.lower() or "running on" in line.lower() or "listening" in line.lower() or "8081" in line or "8082" in line:
      print("\n" + "=" * 50)
      print("Server is running! Execute the next cell to view.")
      print("=" * 50)
      break

In [None]:
from google.colab import output
output.serve_kernel_port_as_iframe(8082)

## 7. Export Policy for Deployment

Save the trained checkpoint for deployment on the Raspberry Pi.

In [None]:
import glob
import os
import torch

# Find the latest checkpoint
log_dirs = sorted(glob.glob("/content/quadruped/logs/rsl_rl/freenove_dog_velocity/*/"))
latest_run = log_dirs[-1] if log_dirs else None

if latest_run:
  # Find the model checkpoint
  ckpt_files = sorted(glob.glob(os.path.join(latest_run, "model_*.pt")))
  if not ckpt_files:
    ckpt_files = sorted(glob.glob(os.path.join(latest_run, "**/*.pt"), recursive=True))
  
  if ckpt_files:
    latest_ckpt = ckpt_files[-1]
    print(f"Latest checkpoint: {latest_ckpt}")
    
    # Load checkpoint
    checkpoint = torch.load(latest_ckpt, map_location="cpu", weights_only=False)
    print(f"Checkpoint keys: {list(checkpoint.keys())}")
    
    # Save the actor model state dict separately for deployment
    export_path = "/content/quadruped/deploy/policy_checkpoint.pt"
    torch.save(checkpoint, export_path)
    print(f"\nCheckpoint saved to: {export_path}")
    print(f"File size: {os.path.getsize(export_path) / 1024:.1f} KB")
  else:
    print("No checkpoint files found!")
else:
  print("No training runs found!")

In [None]:
# Download the checkpoint and deploy script
from google.colab import files

# Download the policy checkpoint
files.download("/content/quadruped/deploy/policy_checkpoint.pt")

# Download the deployment script
files.download("/content/quadruped/deploy/deploy.py")

print("\nDownloaded! Copy these to your Raspberry Pi and run:")
print("  scp deploy.py policy_checkpoint.pt sxn@192.168.100.234:~/")
print("  ssh sxn@192.168.100.234 'python3 deploy.py'")