<a href="https://colab.research.google.com/github/baronase/ml-residual-pendulum/blob/master/notebooks/pendulum_residual_rl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
!pip -q install "gymnasium[classic-control]" stable-baselines3 torch numpy matplotlib

In [8]:
# --- REPO SETUP --- Only run once
!git clone https://github.com/baronase/ml-residual-pendulum.git

fatal: destination path 'ml-residual-pendulum' already exists and is not an empty directory.


In [10]:
%cd ml-residual-pendulum
!git pull

[Errno 2] No such file or directory: 'ml-residual-pendulum'
/content/ml-residual-pendulum
remote: Enumerating objects: 10, done.[K
remote: Counting objects: 100% (10/10), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 6 (delta 1), reused 6 (delta 1), pack-reused 0 (from 0)[K
Unpacking objects: 100% (6/6), 1.17 KiB | 1.17 MiB/s, done.
From https://github.com/baronase/ml-residual-pendulum
   3c881f9..2ba64cf  master     -> origin/master
Updating 3c881f9..2ba64cf
Fast-forward
 pyproject.toml                           |  3 [32m+++[m
 src/pendulum_residual_rl/residual_env.py | 42 [32m++++++++++++++++++++++++++++++++[m
 2 files changed, 45 insertions(+)
 create mode 100644 src/pendulum_residual_rl/residual_env.py


In [4]:
from google.colab import drive
drive.mount("/content/drive")

PURE_SAC_MODEL_SAVE_PATH = "/content/drive/MyDrive/pendulum_residual_rl/models/sac_pure_pendulum"


Mounted at /content/drive


In [11]:
import sys
!{sys.executable} -m pip uninstall -y pendulum_residual_rl
!{sys.executable} -m pip install -e .


Found existing installation: pendulum_residual_rl 0.1.0
Uninstalling pendulum_residual_rl-0.1.0:
  Successfully uninstalled pendulum_residual_rl-0.1.0
Obtaining file:///content/ml-residual-pendulum
  Installing build dependencies ... [?25l[?25hdone
  Checking if build backend supports build_editable ... [?25l[?25hdone
  Getting requirements to build editable ... [?25l[?25hdone
  Preparing editable metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: pendulum_residual_rl
  Building editable for pendulum_residual_rl (pyproject.toml) ... [?25l[?25hdone
  Created wheel for pendulum_residual_rl: filename=pendulum_residual_rl-0.1.0-0.editable-py3-none-any.whl size=1362 sha256=4ecc7faaf0efb1e1139387c1e6f42a95e50282740b47ebfb8fe8293270bb4ebd
  Stored in directory: /tmp/pip-ephem-wheel-cache-lbniofhq/wheels/60/f4/4f/d02c1d5f55036909f72c6126ce700a1c121f66bb9d9c603732
Successfully built pendulum_residual_rl
Installing collected packages: pendulum_residual

In [None]:
#should restart the session here
import os
os.kill(os.getpid(), 9)

In [1]:
# verifying import
import pendulum_residual_rl
print("ok", pendulum_residual_rl.__file__)

ok /content/ml-residual-pendulum/src/pendulum_residual_rl/__init__.py


In [2]:
import gymnasium as gym
import numpy as np

env = gym.make("Pendulum-v1")
obs, info = env.reset(seed=0)

print("obs:", obs)
print("obs shape:", obs.shape)
print("action space:", env.action_space)
print("obs space:", env.observation_space)

# take a few random steps
for i in range(5):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    print(i, "action:", action, "reward:", reward)


obs: [ 0.6520163   0.758205   -0.46042657]
obs shape: (3,)
action space: Box(-2.0, 2.0, (1,), float32)
obs space: Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
0 action: [-1.1358683] reward: -0.763045506059638
1 action: [-0.91034794] reward: -0.7364321191171552
2 action: [0.97577316] reward: -0.7816729541931741
3 action: [0.76915765] reward: -0.9852585065005565
4 action: [1.6398454] reward: -1.3714675297078276


In [2]:
# Debugging ;
!pwd
!ls

/content
drive  ml-residual-pendulum  sample_data


## Code Files (additions to repo)

No need to run this again, just for reference

In [19]:
%cd ml-residual-pendulum/

/content/ml-residual-pendulum


In [None]:
%%writefile src/pendulum_residual_rl/controllers.py
from __future__ import annotations

from dataclasses import dataclass
import math
import numpy as np
#lalala

def obs_to_theta_theta_dot(obs: np.ndarray) -> tuple[float, float]:
    """Pendulum-v1 obs = [cos(theta), sin(theta), theta_dot], with theta in [-pi, pi]."""
    cos_t, sin_t, theta_dot = float(obs[0]), float(obs[1]), float(obs[2])
    theta = math.atan2(sin_t, cos_t)
    return theta, theta_dot


def wrap_to_pi(angle: float) -> float:
    """Wrap angle to [-pi, pi]."""
    return (angle + math.pi) % (2 * math.pi) - math.pi


@dataclass
class EnergyPDController:
    """
    Baseline controller: energy-shaping swing-up + PD stabilize near upright.

    Conventions:
      - Pendulum-v1 uses theta=0 as upright.
      - We use a normalized energy:
            E = 0.5 * theta_dot^2 + (1 + cos(theta))
        so:
            E* (upright, zero velocity) = 2.0
            E (hanging down at pi, zero velocity) = 0.0

    Action is torque u in [-max_torque, max_torque].
    """
    # PD gains (used near upright)
    kp: float
    kd: float

    # Energy shaping gain (used far from upright)
    ke: float

    # Switch/blend region (radians)
    theta_switch: float # 0.5 ~ around ~28.6 degrees

    # Residual scale not used here (for later); controller outputs full torque
    max_torque: float

    def __init__(self, kp_in: float = 10.0, kd_in: float = 1.0,
                 ke_in: float = 2.0, theta_switch_in: float = 0.5,
                 max_torque_in: float = 2.0, log_interval_in = 10) -> None:
        self.kp = kp_in
        self.kd = kd_in
        self.ke = ke_in
        self.theta_switch = theta_switch_in
        self.max_torque = max_torque_in
        # DEBUG
        self.step_for_print = 0
        self.log_interval = log_interval_in

        # pendulum
        self.pen_l = 1
        self.pen_m = 1
        self.pen_I = self.pen_m * (self.pen_l ** 2) / 3.0


    def energy(self, theta: float, theta_dot: float) -> float:
        # Ek = 0.5 * I * omega**2
        Ek = 0.5 * self.pen_I * theta_dot**2
        # Ep = m * g * l * (1.0 + math.cos(theta))   # Pendulum-v1 convention
        Ep = 1 * 10 * self.pen_l / 2 * (1.0 + math.cos(theta))
        return Ek + Ep

    # def energy(self, theta: float, theta_dot: float) -> float:  # <--- ORIGINAL
    #     return 0.5 * (theta_dot ** 2) + (1.0 + math.cos(theta))

    def u_pd(self, theta: float, theta_dot: float) -> float:
        # stabilize around theta=0
        theta = wrap_to_pi(theta)
        return -self.kp * theta - self.kd * theta_dot

    def u_energy(self, theta: float, theta_dot: float) -> float:
        # energy target: upright (theta=0, theta_dot=0) => E*=2
        e = self.energy(theta, theta_dot) - 10.0  # positive => too much energy
        # Pumping direction: "push with the swing" when energy is low, oppose when high
        # direction = math.copysign(1.0, theta_dot * math.cos(theta) + 1e-6)  # <-- ORIGINAL
        direction = math.copysign(1.0, theta_dot + 1e-6)
        return -self.ke * e * direction


    def blend_weight(self, theta: float) -> float:
        """
        Weight for PD vs energy:
          w=1 near upright, w=0 when |theta| >= theta_switch.
        """
        a = abs(wrap_to_pi(theta))
        if a >= self.theta_switch:
            return 0.0
        # smooth-ish ramp (quadratic)
        x = 1.0 - (a / self.theta_switch)
        return x * x

    def __call__(self, obs: np.ndarray) -> np.ndarray:
        # DEBUG
        self.step_for_print += 1

        theta, theta_dot = obs_to_theta_theta_dot(obs)
        u_e = self.u_energy(theta, theta_dot)
        u_p = self.u_pd(theta, theta_dot)

        if self.step_for_print % self.log_interval == 0:
          print(f"step: {self.step_for_print} - x:{obs[0]:.2f}, y:{obs[1]:.2f} | theta:{theta:.2f} theta_dot:{theta_dot:.2f}")


        w = self.blend_weight(theta)
        u = (1.0 - w) * u_e + w * u_p
        if self.step_for_print % self.log_interval == 0:
          print(f"step: {self.step_for_print} - u_e:{u_e:.2f}, u_p:{u_p:.2f} | w:{w} u:{u:.2f}")
        # clip to env action bounds
        u = float(np.clip(u, -self.max_torque, self.max_torque))
        return np.array([u], dtype=np.float32)


Overwriting src/pendulum_residual_rl/controllers.py


In [20]:
%%writefile src/pendulum_residual_rl/residual_env.py
import gymnasium as gym
import numpy as np

class ResidualWrapper(gym.Wrapper):
    """
    Wraps an env so that the agent outputs a residual action.
    Env receives: u_total = clip(u_base + alpha * u_res, action_low, action_high)
    """
    def __init__(self, env: gym.Env, baseline_fn, alpha: float = 0.3):
        super().__init__(env)
        self.baseline_fn = baseline_fn
        self.alpha = float(alpha)

        # Agent outputs residual in [-1, 1] (normalized)
        self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)

        # original env bounds
        self.low = float(env.action_space.low[0])
        self.high = float(env.action_space.high[0])

    def step(self, action):
        obs = self.env.unwrapped.state if False else None  # (unused, just a hint)
        # action is residual in [-1, 1]
        u_res = float(np.clip(action[0], -1.0, 1.0))

        # baseline action in env units
        u_base = self.baseline_fn(self._last_obs)
        u_base = float(u_base[0])

        u_total = np.clip(u_base + self.alpha * u_res * (self.high - self.low) / 2.0, self.low, self.high)
        self._last_obs, reward, term, trunc, info = self.env.step(np.array([u_total], dtype=np.float32))
        info["u_base"] = u_base
        info["u_res"] = u_res
        info["u_total"] = float(u_total)
        return self._last_obs, reward, term, trunc, info

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self._last_obs = obs
        return obs, info


Writing src/pendulum_residual_rl/residual_env.py


## Helper Functions

In [3]:
def print_plots(thetas, dots, us, rewards):
  plt.figure(figsize=(2, 2))
  plt.plot(thetas)
  plt.title("theta(t)")
  plt.xlabel("t")
  plt.ylabel("theta (rad)")
  # plt.figsize(2,2)
  plt.show()

  plt.figure(figsize=(2, 2))
  plt.plot(dots)
  plt.title("dot(t)")
  plt.xlabel("t")
  plt.ylabel("angular velocity")
  plt.show()


  plt.figure(figsize=(2, 2))
  plt.plot(us)
  plt.title("u(t)")
  plt.xlabel("t")
  plt.ylabel("torque")
  plt.show()

  plt.figure(figsize=(2, 2))
  plt.plot(rewards)
  plt.title("reward(t)")
  plt.xlabel("t")
  plt.ylabel("reward")
  plt.show()


In [None]:
# Debugging ;
def obs_to_theta_theta_dot(obs: np.ndarray) -> tuple[float, float]:
    """Pendulum-v1 obs = [cos(theta), sin(theta), theta_dot], with theta in [-pi, pi]."""
    cos_t, sin_t, theta_dot = float(obs[0]), float(obs[1]), float(obs[2])
    theta = math.atan2(sin_t, cos_t)
    return theta, theta_dot

class TestCtrl:
  # tests
  def __init__(self):
    self.hit_bot = False
    self.step_for_print = 0
    self.log_interval = 10
  def __call__(self, obs: np.ndarray):
    theta, theta_dot = obs_to_theta_theta_dot(obs)

    self.step_for_print += 1
    if self.step_for_print % self.log_interval == 0:
      print(f"step: {self.step_for_print} - x:{obs[0]}, y:{obs[1]} | theta:{theta} theta_dot:{theta_dot}")

    self.log_interval

    if (3.13 < theta or theta < -3.13) and -0.01 < theta_dot < 0.01:
      print(f"HIT BOT NOW: step:{self.step_for_print}")
      self.hit_bot = True

    if self.hit_bot:
      return np.array([2], dtype=np.float32)
    else:
      u = -np.sign(theta_dot) * 0.2
      return np.array([u], dtype=np.float32)

In [None]:
# Debugging ;
env = gym.make("Pendulum-v1")
p = env.unwrapped   # unwrap TimeLimit / other wrappers

print("m =", p.m)
print("l =", p.l)
print("g =", p.g)
print("dt =", p.dt)
print("max_torque =", p.max_torque)

In [4]:
import gymnasium as gym
import numpy as np
import math
import matplotlib.pyplot as plt

from pendulum_residual_rl.controllers import EnergyPDController, obs_to_theta_theta_dot

env = gym.make("Pendulum-v1")

if True:
  print("using baseline control !!")
  ctrl = EnergyPDController(log_interval_in=100000)
  print(f"kp:{ctrl.kp}, kd:{ctrl.kd}, ke:{ctrl.ke}, theta_switch:{ctrl.theta_switch}")
else:
  print("using test control !!")
  ctrl = TestCtrl()

def rollout(seed=0, steps=200):
    obs, _ = env.reset(seed=seed)
    thetas, dots, us, rewards = [], [], [], []
    for _ in range(steps):
        u = ctrl(obs)
        obs, r, term, trunc, _ = env.step(u)
        theta, theta_dot = obs_to_theta_theta_dot(obs)
        thetas.append(theta)
        dots.append(theta_dot)
        us.append(float(u[0]))
        rewards.append(r)
        if term or trunc:
            break
    return np.array(thetas), np.array(dots), np.array(us), np.array(rewards)

if True:
  print(f"running a few seeds")
  for i in (0,1,2,3,4,5):
    thetas, dots, us, rewards = rollout(seed=i, steps=200)
    print(f"episode {i} return:", rewards.sum())
else:
  _seed = 3
  print(f"doing a single test run, seed={_seed}")
  thetas, dots, us, rewards = rollout(seed=_seed, steps=200)
  print_plots(thetas, dots, us, rewards)
  print("episode return:", rewards.sum())


using baseline control !!
kp:10.0, kd:1.0, ke:5.0, theta_switch:0.3
running a few seeds
episode 0 return: -123.07517840332312
episode 1 return: -125.99658863321612
episode 2 return: -115.93665980187
episode 3 return: -233.36031988440666
episode 4 return: -240.00514565727724
episode 5 return: -113.97715523907242


In [5]:
# measure - eval over many episodes
import gymnasium as gym
import numpy as np
from pendulum_residual_rl.controllers import EnergyPDController

env = gym.make("Pendulum-v1")
ctrl = EnergyPDController(log_interval_in=1000000)

def eval_controller(n_episodes=50, seed=0, steps=200):
    returns = []
    for i in range(n_episodes):
        obs, _ = env.reset(seed=seed + i)
        total = 0.0
        for _ in range(steps):
            action = ctrl(obs)
            obs, r, term, trunc, _ = env.step(action)
            total += r
            if term or trunc:
                break
        returns.append(total)
    return float(np.mean(returns)), float(np.std(returns))

mean_r, std_r = eval_controller(n_episodes=50, seed=0)
print("Baseline mean return:", mean_r)
print("Baseline std:", std_r)

Baseline mean return: -162.2092957654325
Baseline std: 74.28399628048706


In [None]:
# Debugging ;
for i, th in enumerate(thetas[:40]):
  print(f"{i}:{th}")

In [None]:
# Debugging ;
for i, rw in enumerate(rewards[:70]):
  print(f"{i}:{rw}")

# Training Pure RL SAC Benchmark

In [5]:
# Eval function
import gymnasium as gym
import numpy as np

def eval_policy(env_id, policy_fn, n_episodes=50, seed=0, steps=200):
    env = gym.make(env_id)
    returns = []
    for i in range(n_episodes):
        obs, _ = env.reset(seed=seed + i)
        total = 0.0
        for _ in range(steps):
            action = policy_fn(obs)
            obs, r, term, trunc, _ = env.step(action)
            total += r
            if term or trunc:
                break
        returns.append(total)
    env.close()
    return float(np.mean(returns)), float(np.std(returns))


In [8]:
!pip -q install stable-baselines3

In [None]:
# Train Pure SAC
from stable_baselines3 import SAC
from stable_baselines3.common.env_util import make_vec_env

ENV_ID = "Pendulum-v1"

# vectorized env is best practice for SB3
train_env = make_vec_env(ENV_ID, n_envs=8, seed=0)

# a pretty standard SAC config
model_sac = SAC(
    policy="MlpPolicy",
    env=train_env,
    verbose=1,
    seed=0,
)

model_sac.learn(total_timesteps=200_000)
model_sac.save("sac_pure_pendulum")


In [7]:
# Save Trained model
model_sac.save(PURE_SAC_MODEL_SAVE_PATH)

In [2]:
PURE_SAC_MODEL_SAVE_PATH = "/content/drive/MyDrive/pendulum_residual_rl/models/sac_pure_pendulum"
ENV_ID = "Pendulum-v1"

In [3]:
# Load Trained model
from stable_baselines3 import SAC

model_sac_load = SAC.load(PURE_SAC_MODEL_SAVE_PATH)

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


In [6]:
def sac_policy(obs):
    action, _ = model_sac_load.predict(obs, deterministic=True)
    return action

mean_sac, std_sac = eval_policy(ENV_ID, sac_policy, n_episodes=50, seed=0)
print(f"Pure SAC mean return:{mean_sac:.5}")
print(f"Pure SAC std:{std_sac:.5}")

Pure SAC mean return:-134.26
Pure SAC std:79.293


In [7]:
from pendulum_residual_rl.controllers import EnergyPDController

ctrl = EnergyPDController(log_interval_in=100000)
ctrl_mean, ctrl_std = eval_policy(ENV_ID, ctrl, n_episodes=50, seed=0)
print(f"CTRL mean return:{ctrl_mean:.5}")
print(f"CTRL std: {ctrl_std:.5}")

CTRL mean return:-162.21
CTRL std: 74.284


In [9]:
import gymnasium as gym
from stable_baselines3 import SAC
from stable_baselines3.common.env_util import make_vec_env

from pendulum_residual_rl.residual_env import ResidualWrapper
from pendulum_residual_rl.controllers import EnergyPDController

baseline_ctrl = EnergyPDController(log_interval_in=100000)

def make_residual_env():
    env = gym.make("Pendulum-v1")
    return ResidualWrapper(env, baseline_fn=baseline_ctrl, alpha=0.3)

train_env_res = make_vec_env(make_residual_env, n_envs=8, seed=0)

policy_kwargs = dict(net_arch=[64, 64])  # smaller than SB3 default

model_res = SAC(
    "MlpPolicy",
    train_env_res,
    verbose=1,
    seed=0,
    policy_kwargs=policy_kwargs,
)

model_res.learn(total_timesteps=100_000)
model_res.save("sac_residual_pendulum")


Using cpu device
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 200      |
|    ep_rew_mean     | -398     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 798      |
|    time_elapsed    | 2        |
|    total_timesteps | 1600     |
| train/             |          |
|    actor_loss      | 1.59     |
|    critic_loss     | 2.59     |
|    ent_coef        | 0.945    |
|    ent_coef_loss   | -0.0938  |
|    learning_rate   | 0.0003   |
|    n_updates       | 187      |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 200      |
|    ep_rew_mean     | -398     |
| time/              |          |
|    episodes        | 8        |
|    fps             | 797      |
|    time_elapsed    | 2        |
|    total_timesteps | 1600     |
---------------------------------
---------------------------------
| rollout/           |         

In [13]:
# ENV_ID = "Pendulum-v1"

# baseline policy
from pendulum_residual_rl.controllers import EnergyPDController
baseline_ctrl = EnergyPDController(log_interval_in=100000)

def baseline_policy(obs):
    return baseline_ctrl(obs)

# # residual policy needs its wrapper env
# import gymnasium as gym
# from pendulum_residual_rl.residual_env import ResidualWrapper

def eval_residual(model, n_episodes=50, seed=0, steps=200):
    env = ResidualWrapper(gym.make(ENV_ID), baseline_fn=baseline_ctrl, alpha=0.3)
    returns = []
    for i in range(n_episodes):
        obs, _ = env.reset(seed=seed+i)
        total = 0.0
        for _ in range(steps):
            a_res, _ = model.predict(obs, deterministic=True)
            obs, r, term, trunc, _ = env.step(a_res)
            total += r
            if term or trunc:
                break
        returns.append(total)
    env.close()
    return float(np.mean(returns)), float(np.std(returns))

mean_base, std_base = eval_policy(ENV_ID, baseline_policy, n_episodes=50, seed=0)
mean_sac, std_sac = eval_policy(ENV_ID, lambda obs: model_sac_load.predict(obs, deterministic=True)[0], n_episodes=50, seed=0)
mean_res, std_res = eval_residual(model_res, n_episodes=50, seed=0)

print("Baseline:", mean_base, "+/-", std_base)
print("Pure SAC:", mean_sac, "+/-", std_sac)
print("Residual SAC:", mean_res, "+/-", std_res)


Baseline: -162.2092957654325 +/- 74.28399628048706
Pure SAC: -134.25982974604057 +/- 79.29318225426385
Residual SAC: -140.88147682075984 +/- 79.27135137017504


In [10]:
def res_policy(obs):
    action, _ = model_res.predict(obs, deterministic=True)
    return action

mean_res, std_res = eval_policy(ENV_ID, res_policy, n_episodes=50, seed=0)
print(f"Residual mean return:{mean_res:.5}")
print(f"Residual std:{std_res:.5}")

Residual mean return:-1120.0
Residual std:515.89
