# Quadruped Locomotion: Reward Ablation

이 노트북은 `quadruped_locomotion.ipynb` 이후에 실행되는 **reward ablation 실험용** 노트북입니다.

이전 노트북:
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)]
(https://colab.research.google.com/github/DrcdKAIST/RL_DEMO/blob/main/notebooks/0.quadruped_locomotion_basic.ipynb)

1. Task Reward (Velocity Tracking) 과 Termination Reward만 있는 가장 단순한 경우,
2. 1. 의 세팅에서 기본적인 Regularization Reward (Action smoothness, Joint velocity/acceleration ...) 가 추가된 경우,
3. 2. 의 세팅에서 Motion 및 Gait Regularization Reward 추가된 경우

총 세 가지의 세팅으로 변경시켜 보며 리워드들이 어떠한 영향을 주는지 확인합니다.

아래 셀부터 공통 세팅을 다시 수행합니다.


---

## 0. Environment Setup

Colab이면 `/content`를 기준 경로로 사용하고, 그렇지 않으면 현재 작업 디렉터리를 기준 경로로 사용합니다.


In [14]:
# Clone repository
import os, sys

import yaml

# Detect Colab by availability of /content or google.colab.
try:
    import google.colab  # noqa: F401
    in_colab = True
except Exception:
    in_colab = os.path.isdir("/content")

try:
    base_dir
except NameError:
    base_dir = "/content" if in_colab else os.getcwd()
os.chdir(base_dir)

repo_dir = os.path.join(base_dir, "RL_DEMO")

print(f"Base directory: {base_dir}")
print(f"Repo directory: {repo_dir}")

if not os.path.isdir(repo_dir):
  !git clone https://github.com/DrcdKAIST/RL_DEMO.git --recursive
else:
  print("Cloned Directory already exists")

os.chdir(repo_dir)
print("Current Directory: ", os.getcwd())

sys.path.insert(0, repo_dir)
os.environ["MUJOCO_GL"] = "egl"


Base directory: /home/jaehyun/etc_ws/RL_DEMO/notebooks
Repo directory: /home/jaehyun/etc_ws/RL_DEMO/notebooks/RL_DEMO
Cloned Directory already exists
Current Directory:  /home/jaehyun/etc_ws/RL_DEMO/notebooks/RL_DEMO


In [15]:
# Install dependencies
os.chdir(repo_dir + "/thirdParty/stable_baselines3")
!pip install -e .[extra]
!pip install torch numpy tensorboard gymnasium==0.29.1 mujoco==3.1.5 imageio[ffmpeg] pygments


Obtaining file:///home/jaehyun/etc_ws/RL_DEMO/notebooks/RL_DEMO/thirdParty/stable_baselines3
  Installing build dependencies ... [?25ldone
[?25h  Checking if build backend supports build_editable ... [?25ldone
[?25h  Getting requirements to build editable ... [?25ldone
[?25h  Preparing editable metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: stable_baselines3
  Building editable for stable_baselines3 (pyproject.toml) ... [?25ldone
[?25h  Created wheel for stable_baselines3: filename=stable_baselines3-2.3.0-0.editable-py3-none-any.whl size=6381 sha256=2106b8e4de6973904452953da0d796a39fe7008437c9d30316bbc7569db099c5
  Stored in directory: /tmp/pip-ephem-wheel-cache-gk3g9z79/wheels/c6/fc/56/26a54a9f6b94b4cf616c2dedeafe4df25d4be42386578fecd2
Successfully built stable_baselines3
Installing collected packages: stable_baselines3
  Attempting uninstall: stable_baselines3
    Found existing installation: stable_baselines3 2.3.0
    Can't uni

In [16]:
from pathlib import Path
from IPython.display import HTML
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
import inspect

def _render_code(code, title="code", max_height=400, bg="transparent", indent=16):
    style_name = "native" if in_colab else "friendly"
    formatter = HtmlFormatter(style=style_name, noclasses=True, linenos="inline")
    html = highlight(code, PythonLexer(), formatter)
    css = """
    <style>
    .highlight pre { margin: 0; text-align: left; }
    </style>
    """
    return HTML(f"""
    {css}
    <details>
      <summary>{title}</summary>
      <div style="margin-top:8px; margin-left:{indent}px; max-height:{max_height}px; overflow:auto; border:1px solid #ddd; padding:10px; background:{bg};">
        {html}
      </div>
    </details>
    """)


def show_code(path, max_height=400, title="code", bg="transparent"):
    code = Path(path).read_text()
    return _render_code(code, title=title, max_height=max_height, bg=bg)

def show_func(obj, max_height=400, title="code", bg="transparent"):
    code = inspect.getsource(obj)
    return _render_code(code, title=title, max_height=max_height, bg=bg)

---

## 1-1. Reward Ablation Setup - Observations

실험 1,2의 경우 phase observation이 필요없기 때문에, 이를 제외하고, 실험 3의 경우 포함시켜야 하기 때문에 두가지 버전의 get_obs 함수를 준비하고이후 실험에 맞게 선택합니다.

<div align="center">
  <img src="rsc/phase.png">
</div>


In [17]:
# Define alternate observation functions and monkey-patch (no file edits)
# NOTE: Edit the two functions below to run ablations.
import importlib
import src.go1_mujoco_env as go1_mujoco_env
importlib.reload(go1_mujoco_env)

Go1MujocoEnv = go1_mujoco_env.Go1MujocoEnv

# ---- Define variants ----

def _get_obs_with_phase(self):
    # TODO: update observation for ablation (with phase)
    dofs_position = self.data.qpos[7:].flatten() - self.model.key_qpos[0, 7:]
    velocity = self.data.qvel.flatten()
    base_linear_velocity = velocity[:3]
    base_angular_velocity = velocity[3:6]
    dofs_velocity = velocity[6:]
    desired_vel = self._desired_velocity
    last_action = self._last_action
    projected_gravity = self.projected_gravity

    curr_obs = np.concatenate(
        (
            base_linear_velocity * self._obs_scale["linear_velocity"],
            base_angular_velocity * self._obs_scale["angular_velocity"],
            projected_gravity,
            desired_vel * self._obs_scale["linear_velocity"],
            dofs_position * self._obs_scale["dofs_position"],
            dofs_velocity * self._obs_scale["dofs_velocity"],
            last_action,
            self._phase_sin,
        )
    ).clip(-self._clip_obs_threshold, self._clip_obs_threshold)

    return curr_obs


def _get_obs_without_phase(self):
    # TODO: update observation for ablation (without phase)
    dofs_position = self.data.qpos[7:].flatten() - self.model.key_qpos[0, 7:]
    velocity = self.data.qvel.flatten()
    base_linear_velocity = velocity[:3]
    base_angular_velocity = velocity[3:6]
    dofs_velocity = velocity[6:]
    desired_vel = self._desired_velocity
    last_action = self._last_action
    projected_gravity = self.projected_gravity

    curr_obs = np.concatenate(
        (
            base_linear_velocity * self._obs_scale["linear_velocity"],
            base_angular_velocity * self._obs_scale["angular_velocity"],
            projected_gravity,
            desired_vel * self._obs_scale["linear_velocity"],
            dofs_position * self._obs_scale["dofs_position"],
            dofs_velocity * self._obs_scale["dofs_velocity"],
            last_action,
        )
    ).clip(-self._clip_obs_threshold, self._clip_obs_threshold)

    return curr_obs



---

## 1-2. Reward Ablation Setup

학습에 필요한 함수 및 변수들을 선언합니다.


In [18]:
# Check observation / action space

import importlib
import numpy as np
import os
import gc
import time
import imageio
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, CallbackList
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv
from IPython.display import Video, display
from pathlib import Path
import shutil

import src.go1_mujoco_env as go1_env

from src.utils.reward_logging_callback import RewardLoggingCallback
importlib.reload(go1_env)


DEFAULT_CAMERA_CONFIG = {
    "azimuth": 90.0,
    "distance": 3.0,
    "elevation": -25.0,
    "lookat": np.array([0., 0., 0.]),
    "fixedcamid": 0,
    "trackbodyid": -1,
    "type": 2,
}

policy_cfg_path = Path(repo_dir + "/src/params.yaml")
with policy_cfg_path.open("r", encoding="utf-8") as f:
    policy_cfg = yaml.safe_load(f)

# colab에서 실행하기 위한 설정
policy_cfg['n_envs'] = 12
policy_cfg['policy']['batch_size'] = 64

# Create environment (no rendering)
env = go1_env.Go1MujocoEnv(
    prj_path=repo_dir,
    render_mode=None,
)

obs, info = env.reset()

print(policy_cfg['n_envs'])
print(policy_cfg['policy']['batch_size'])
print(f"Observation shape: {np.array(obs).shape}\n")
print(f"Action space: {env.action_space}\n")
print(f"Observation space: {env.observation_space}\n")

def train_run(env_cfg_path=None):
    importlib.reload(go1_env)

    USE_PRETRAINED = False
    PRETRAINED_MODEL_PATH = f"{repo_dir}/models/pretrained3/final_model.zip"

    # Train
    MODEL_DIR = f"{repo_dir}/models"
    LOG_DIR = f"{repo_dir}/logs"

    os.makedirs(MODEL_DIR, exist_ok=True)
    os.makedirs(LOG_DIR, exist_ok=True)

    class Go1MujocoEnvPatched(go1_env.Go1MujocoEnv):
        def _get_obs(self):
            return (_get_obs_with_phase if use_phase else _get_obs_without_phase)(self)

    print("Use phase: ", use_phase)

    vec_env = make_vec_env(
        Go1MujocoEnvPatched,
        env_kwargs={"prj_path": repo_dir, "cfg_path": env_cfg_path},
        n_envs=policy_cfg["n_envs"],
        seed=policy_cfg["seed"],
        vec_env_cls=SubprocVecEnv,
    )


    train_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    run_name = f"{train_time}"

    model_path = f"{MODEL_DIR}/{run_name}"
    print(
        f"Training on {policy_cfg['n_envs']} parallel training environments and saving models to '{model_path}'"
    )

    # 예: model_dir, model_name이 이미 정해져 있다고 가정
    if env_cfg_path is None:
        envs_src = Path(repo_dir) / "src" / "envs.yaml"
    else:
        envs_src = Path(env_cfg_path)
    envs_dst = Path(model_path) / "envs.yaml"
    envs_dst.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy2(envs_src, envs_dst)

    print("Copied to:", envs_dst)


    checkpoint_callback = CheckpointCallback(
        save_freq=policy_cfg["policy"]["n_steps"] * policy_cfg["log"]["interval"],  # e.g. 100_000
        save_path=model_path,  # directory
        name_prefix="model",  # checkpoint_model_100000_steps.zip
        save_replay_buffer=False,
        save_vecnormalize=False,
    )

    eval_callback = EvalCallback(
        vec_env,
        best_model_save_path=model_path,
        log_path=LOG_DIR,
        eval_freq=policy_cfg["eval_freq"],
        n_eval_episodes=5,
        deterministic=True,
        render=False,
    )

    reward_logging_callback = RewardLoggingCallback()

    callbacks = CallbackList([
        eval_callback,
        checkpoint_callback,
        reward_logging_callback,
    ])

    if USE_PRETRAINED:
        print(f"Using Pretrained model from {PRETRAINED_MODEL_PATH}")
        model = PPO.load(path=PRETRAINED_MODEL_PATH, env=vec_env,
                    learning_rate=policy_cfg["policy"]["learning_rate"],
                    n_steps=policy_cfg["policy"]["n_steps"],
                    batch_size=policy_cfg["policy"]["batch_size"],
                    n_epochs=policy_cfg["policy"]["n_epochs"],
                    gamma=policy_cfg["policy"]["gamma"],
                    gae_lambda=policy_cfg["policy"]["gae_lambda"],
                    clip_range=policy_cfg["policy"]["clip_range"],
                    normalize_advantage=policy_cfg["policy"]["normalize_advantage"],
                    ent_coef=policy_cfg["policy"]["ent_coef"],
                    vf_coef=policy_cfg["policy"]["vf_coef"],
                    max_grad_norm=policy_cfg["policy"]["max_grad_norm"],
                    verbose=1,
                    tensorboard_log=LOG_DIR)
        model.tensorboard_log = LOG_DIR
    else:
        print("Training from Network model from scratch")
        model = PPO("MlpPolicy",
                    env=vec_env,
                    learning_rate=policy_cfg["policy"]["learning_rate"],
                    n_steps=policy_cfg["policy"]["n_steps"],
                    batch_size=policy_cfg["policy"]["batch_size"],
                    n_epochs=policy_cfg["policy"]["n_epochs"],
                    gamma=policy_cfg["policy"]["gamma"],
                    gae_lambda=policy_cfg["policy"]["gae_lambda"],
                    clip_range=policy_cfg["policy"]["clip_range"],
                    normalize_advantage=policy_cfg["policy"]["normalize_advantage"],
                    ent_coef=policy_cfg["policy"]["ent_coef"],
                    vf_coef=policy_cfg["policy"]["vf_coef"],
                    max_grad_norm=policy_cfg["policy"]["max_grad_norm"],
                    verbose=1,
                    tensorboard_log=LOG_DIR)

    model.learn(
        total_timesteps=policy_cfg["total_timestep"],
        reset_num_timesteps=True,
        progress_bar=True,
        tb_log_name=run_name,
        callback=callbacks,
    )
    # Save final model
    model.save(f"{model_path}/final_model")

    vec_env.close()

    del model
    del eval_callback
    del vec_env

    gc.collect()

def eval_run():
    # Test
    import time
    from tqdm.auto import tqdm

    ep_len = 0
    importlib.reload(go1_env)
    model_path = f"{repo_dir}/models/{model_name}/final_model.zip"
    print(f"Loading model from {model_path}")
    WIDTH, HEIGHT = 320, 240

    # Set a fixed command for testing [vx (m/s), vy (m/s), wz (rad/s)]
    given_command = [0.8, 0.0, 0.0]

    go1_env.Go1MujocoEnv._get_obs = _get_obs_with_phase if use_phase else _get_obs_without_phase

    env = go1_env.Go1MujocoEnv(
        prj_path=f"{repo_dir}",
        cfg_path=f"{repo_dir}/models/{model_name}/envs.yaml",
        given_command=given_command,  # Use fixed command instead of random
        render_mode="rgb_array",
        camera_name="tracking",
        width=WIDTH,
        height=HEIGHT,
    )



    env._reset_noise_scale = 0.05 # reduce intial random noise

    inter_frame_sleep = 0.0

    model = PPO.load(path=model_path, env=env, verbose=1)

    video_path = f"{repo_dir}/../rollout_{model_name}.mp4"

    obs, _ = env.reset()
    max_time_step_s = policy_cfg["test"]["max_time_step_s"]
    ep_len = 0
    t_render = 0.0
    n_render = 0
    last_render = 0.0
    start = time.perf_counter()
    video_fps = 10

    # Ctrl Hz: 50
    render_interval = 50 // video_fps
    max_steps = int(max_time_step_s * 50)

    writer = imageio.get_writer(
        video_path,
        fps=video_fps,
        codec="libx264",
        quality=8,
        pixelformat="yuv420p",
    )

    frames = []
    pbar = tqdm(total=max_steps, desc="rollout", unit="step", dynamic_ncols=True)

    print("max time:", max_time_step_s)
    print("max step: ", max_steps)

    while ep_len < max_steps:
      with torch.no_grad():
        action, _ = model.predict(obs, deterministic=True)
      obs, reward, terminated, truncated, info = env.step(action)

      if ep_len % render_interval == 0:
        t0 = time.perf_counter()
        frame = env.render()
        frames.append(frame)
        current_dur = time.perf_counter() - t0
        t_render += current_dur
        n_render += 1

      # ---- status bar 업데이트 ----
      elapsed = time.perf_counter() - start
      steps_per_sec = ep_len / max(elapsed, 1e-9)
      avg_render = (t_render / n_render) if n_render else 0.0

      pbar.set_postfix({
          "steps/s": f"{steps_per_sec:6.1f}",
          "renders": n_render,
          "r_last(s)": f"{last_render:5.3f}",
          "r_avg(s)": f"{avg_render:5.3f}",
      })
      pbar.update(1)


      ep_len += 1

    imageio.mimwrite(
        video_path,
        frames,
        fps=video_fps,
        codec="libx264",
        quality=8,
        pixelformat="yuv420p",
    )

    env.close()

    print("avg render sec:", t_render / max(n_render, 1))
    print("Saved video to:", video_path)

    return video_path


12
64
Observation shape: (48,)

Action space: Box([-0.863     -1.3859999 -1.218     -0.863     -1.3859999 -1.218
 -0.863     -1.586     -1.218     -0.863     -1.586     -1.218    ], [0.863     3.8009999 0.712     0.863     3.8009999 0.712     0.863
 3.6009998 0.712     0.863     3.6009998 0.712    ], (12,), float32)

Observation space: Box(-inf, inf, (48,), float32)



## 1-3. 실험 선택

진행할 실험 블럭을 실행합니다.

사용하는 Reward, Reward coef등을 각 실험에 맞게 조정합니다. 미리 체크포인트 안쪽에 사용했던 세팅 yaml 파일을 넣어놓았습니다. 이를 불러와 src/envs.yaml 을 대체합니다.

만약 새로운 reward func를 추가하거나 변경하고 싶다면, src/mdp/reward.py, go1_mujoco_env.py 내부의 _get_reward() 및 src/envs.yaml을 수정합니다.

### 실험 1 : Task Reward (Velocity Tracking) 과 Termination Reward만 있는 가장 단순한 경우

In [19]:
# 실험 1
use_phase = False
model_name = "pretrained"
env_cfg_path = f"{repo_dir}/models/pretrained/envs.yaml"

show_code(env_cfg_path, title= model_name + " yaml", max_height=800)


### 실험 1-1: 실험 1의 세팅에서 Torque Regularization Reward가 추가된 경우

In [None]:
# 실험 1
use_phase = False
model_name = "pretrained_0.5"
env_cfg_path = f"{repo_dir}/models/pretrained_0.5/envs.yaml"

show_code(env_cfg_path, title= model_name + " yaml", max_height=800)


### 실험 1-2: 실험 1-1의 세팅에서 Calf Contact 시 Early Termination 시키는 경우

In [None]:
# 실험 1
use_phase = False
model_name = "pretrained_0.5_earlytermination"
env_cfg_path = f"{repo_dir}/models/pretrained_0.5_earlytermination/envs.yaml"

show_code(env_cfg_path, title= model_name + " yaml", max_height=800)


### 실험 2 : 실험 1-2의 세팅에서 기본적인 Regularization Reward가 추가된 경우

In [None]:
use_phase = False
model_name = "pretrained2"
env_cfg_path = f"{repo_dir}/models/pretrained2/envs.yaml"

show_code(env_cfg_path, title= model_name + " yaml", max_height=800)

### 실험 3: 실험 2의 세팅에서 Motion 및 Gait Regularization Reward 추가된 경우

In [None]:
use_phase = True
model_name = "pretrained3"
env_cfg_path = f"{repo_dir}/models/pretrained3/envs.yaml"

show_code(env_cfg_path, title= model_name + " yaml", max_height=800)

---
## 2. Training


In [None]:
train_run(env_cfg_path=env_cfg_path)

---
## 3. Evaluation

학습한 policy를 확인합니다.

실습 시간에는 아래 Pretrained된 정책을 이용하여 학습 결과를 확인합니다.

In [20]:
video_path = eval_run()

Loading model from /home/jaehyun/etc_ws/RL_DEMO/notebooks/RL_DEMO/models/pretrained/final_model.zip
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


rollout:   0%|          | 0/200 [00:00<?, ?step/s]

max time: 4.0
max step:  200
avg render sec: 0.0031242349039530383
Saved video to: /home/jaehyun/etc_ws/RL_DEMO/notebooks/RL_DEMO/../rollout_pretrained.mp4


In [21]:
display(
    Video(
        video_path,
        embed=True,
        html_attributes="controls autoplay loop"
    )
)

---

## 4. 결과 비교

각 실험에 대하여 Training 및 Evaluation을 진행 한 뒤, 결과를 비교합니다.

실습 시간에는 미리 학습된 pretrained model로 Evaluation만 진행한 후, 결과를 비교합니다.


In [None]:
import base64
from IPython.display import HTML
from pathlib import Path

def video_embed(path, caption, width=320, autoplay=True):
    data = Path(path).read_bytes()
    b64 = base64.b64encode(data).decode("ascii")
    attrs = "controls"
    if autoplay:
        attrs += " autoplay muted loop"
    return f"""
    <figure style="margin:0; text-align:center;">
      <video {attrs} width="{width}">
        <source src="data:video/mp4;base64,{b64}" type="video/mp4">
      </video>
      <figcaption style="margin-top:6px; font-size:12px;">{caption}</figcaption>
    </figure>
    """

videos = [
    (f"{repo_dir}/../rollout_pretrained.mp4",  "Pretrained 1: (Task + Termination)"),
    (f"{repo_dir}/../rollout_pretrained_0.5.mp4",  "Pretrained 1-1: (Task + Termination + Torque Reg)"),
    (f"{repo_dir}/../rollout_pretrained_0.5_earlytermination.mp4",  "Pretrained 1-2: Pre 1-1 + (Calf Contact Early Termination)"),
    (f"{repo_dir}/../rollout_pretrained2.mp4", "Pretrained 2: (Pre 1-2 + Basic Regularization)"),
    (f"{repo_dir}/../rollout_pretrained3.mp4", "Pretrained 3: (Pre 2 + Motion / Gait Regularization)"),
]

HTML(f"""
<div style="display:flex; gap:12px;">
  {''.join([video_embed(p, cap, width=320) for p, cap in videos])}
</div>
""")