# Learning to balance

In [1]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo, TimeLimit
from stable_baselines3 import PPO
import numpy as np
import os
from pathlib import Path
import control


In [2]:
current = Path.cwd()

# If launched from a subfolder (VS Code), go one level up
if (current / "notebooks").exists():
    PROJECT_ROOT = current
else:
    PROJECT_ROOT = current.parent

os.chdir(PROJECT_ROOT)

DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

## Set up the environment

In [3]:
N_STEPS = 1500 # 30 seconds

In [4]:
env = gym.make("CartPole-v1", max_episode_steps=N_STEPS)

In [5]:
env_gui = gym.make("CartPole-v1", render_mode="rgb_array", max_episode_steps=N_STEPS)

env_gui = RecordVideo(
    env_gui,
    video_folder="videos",
    episode_trigger=lambda ep: True,
    name_prefix="balance_demo_test"
)

  logger.warn(


## Helpers

In [6]:
def rollout_episode(env, model, max_steps=1500, deterministic=True):

    obs, _ = env.reset()

    states = []
    actions = []
    rewards = []

    for _ in range(max_steps):
        states.append(obs.copy())

        action, _ = model.predict(obs, deterministic=deterministic)
        actions.append(action)

        obs, reward, terminated, truncated, _ = env.step(action)
        rewards.append(reward)

        if terminated or truncated:
            break

    return np.array(states), np.array(actions), np.array(rewards)


In [7]:
def rollout_episode_video(env, model, max_steps=1500, deterministic=True):
    obs, _ = env.reset()
    done = False
    steps = 0

    while not done and steps < max_steps:
        action, _ = model.predict(obs, deterministic=deterministic)
        obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        steps += 1


## Loading the data

In [8]:
data = np.load("data/ppo_balance_clean_30s.npz")

obs      = data["observations"]
acts     = data["actions"]
ep_ids   = data["episode_ids"]
ep_lens  = data["episode_lens"]
dt       = float(data["dt"])

print(obs.shape, acts.shape, ep_lens)

(15000, 4) (15000,) [1500 1500 1500 1500 1500 1500 1500 1500 1500 1500]


Our dataset contains several episodes concatenated in time. However, for system identification we must only use transitions that stay inside the same episode, because the transition from the last state of one episode to the first state of the next episode is not physical (it comes from a reset).

To handle this safely, we use the array ep_ids, which tells us which episode each sample belongs to.

In [9]:
FORCE_MAG = 10.0

same_ep = ep_ids[1:] == ep_ids[:-1]
X      = obs[:-1][same_ep]
X_next = obs[1:][same_ep]
a      = acts[:-1][same_ep]          # in {0, 1}

U_force = FORCE_MAG * (2 * a - 1)    # shape (N,)
U_force = U_force.reshape(-1, 1)     # (N, 1)

print("Transitions:", X.shape, U_force.shape, X_next.shape)

Transitions: (14990, 4) (14990, 1) (14990, 4)


The linear model we want to identify is only valid **locally**, i.e., for small deviations around the upright equilibrium of the pendulum ($\theta \approx 0$). If we include samples where the pole has a large angle or spins quickly, the linear approximation becomes invalid and the estimated $A, B$ matrices will be inaccurate.

For this reason, we filter the dataset to keep only samples close to equilibrium.


In [10]:
theta     = X[:, 2]
theta_dot = X[:, 3]
local_mask = (np.abs(theta) < 0.25) & (np.abs(theta_dot) < 1.0)

X_loc      = X[local_mask]
U_loc      = U_force[local_mask]
X_next_loc = X_next[local_mask]

print("Local samples:", X_loc.shape[0], "/", X.shape[0])

Local samples: 14990 / 14990


## Fitting the linear model

In [11]:
nx = X_loc.shape[1]
nu = U_loc.shape[1]

Z = np.hstack([X_loc, U_loc])
Y = X_next_loc

W, *_ = np.linalg.lstsq(Z, Y, rcond=None)
A = W[:nx, :].T
B = W[nx:, :].T

print("A:\n", A)
print("B:\n", B)


A:
 [[ 1.00000000e+00  2.00000001e-02 -5.91644400e-09 -1.20333543e-11]
 [-2.61345310e-07  9.99999817e-01 -1.43165615e-02 -4.53362821e-07]
 [ 6.98933665e-12  1.90414282e-11  9.99999999e-01  2.00000000e-02]
 [ 4.99900964e-06  2.79076010e-05  3.15628352e-01  1.00002093e+00]]
B:
 [[ 5.96547464e-12]
 [ 1.95121642e-02]
 [ 4.76748849e-13]
 [-2.92680156e-02]]


## Testing the result

In [12]:
Q = np.diag([1.0, 0.1, 10.0, 0.1])   # state penalty
R = np.array([[0.01]])             # control effort penalty

K, S, eigvals = control.dlqr(A, B, Q, R)

print("K =", K)
print("Closed-loop eigenvalues:", eigvals)

K = [[ -8.59109027 -10.77893589 -71.75759377 -17.36454168]]
Closed-loop eigenvalues: [0.87775882+0.05701775j 0.87775882-0.05701775j 0.97329889+0.01926108j
 0.97329889-0.01926108j]


In [13]:
FORCE_MAG = 10.0

def lqr_to_action(u):
    return 1 if u > 0 else 0

In [14]:
class LQRModel:
    def __init__(self, K):
        self.K = K

    def predict(self, obs, deterministic=True):
        x = obs.reshape(-1, 1)
        u = -self.K @ x

        action = 1 if u.item() > 0 else 0

        return action, None


In [15]:
lqr_model = LQRModel(K)

rollout_episode_video(
    env_gui,
    lqr_model,
    max_steps=1500,
    deterministic=True
)

env_gui.close()

  from pkg_resources import resource_stream, resource_exists


**It works!**