PILCO baseline for on‑car RC drift control
=========================================

First-cut, *read‑and‑extend* implementation that trains a Gaussian‑Process
model of the car dynamics and optimises an RBF policy via analytic moment
matching (original PILCO method). Meant to run **solely on on‑car data**.

Dependencies
------------
$ pip install numpy tensorflow==2.* gpflow==2.9.0 tensorflow_probability

File layout
-----------
• `CarEnv`  – Gym‑like interface wrapping ROS 2 topics (same as in DQN code)
• `collect_rollout` – executes current policy, logs (s, a, s′)
• `fit_gp_model`  – fits independent GPs to each state dimension
• `RBFLayer` + `RBFPolicy` – nonlinear controller π(s)=W·φ(s) with tanh squash
• `PILCOTrainer` – core optimise‑policy loop
• CLI  – `python pilco_baseline.py train`, `... eval model.npz`

NOTE 1: This is *minimal*; safety wrappers (kill‑switch) should run in ROS.
NOTE 2: Closed‑form moment matching needs small state dims – we keep 3‑dim state.

In [None]:
from __future__ import annotations
import argparse
import os
from typing import Tuple
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import gpflow
from gpflow.utilities import set_trainable

In [None]:
# --- Hyper‑parameters ---
STATE_DIM      = 3   # [vx, vy, yaw_rate]
ACTION_DIM     = 1   # steering angle (rad)
RBF_FEATURES   = 50  # number of random RBF features in policy
HORIZON        = 50  # steps per rollout (≈0.8 s @60 Hz)
ROLL_OUTS      = 150 # total budget
GAMMA          = 0.99
LEARNING_RATE  = 1e-2
SAVE_EVERY     = 10

In [None]:
# --- Environment wrapper stub (fill ROS specifics) ---
try:
    import gymnasium as gym
except ImportError:
    import gym

class CarEnv(gym.Env):
    def __init__(self):
        self.state_dim = STATE_DIM
        self.action_dim = ACTION_DIM
        high = np.array([10.0, 10.0, np.pi])
        self.observation_space = gym.spaces.Box(-high, high, dtype=np.float32)
        self.action_space      = gym.spaces.Box(-1.0, 1.0, (ACTION_DIM,), dtype=np.float32)
        self._connect()

    def _connect(self):
        # TODO: init ROS 2 clients/subscribers here
        pass

    def reset(self):
        # TODO: call service to reset car pose
        state = np.zeros(STATE_DIM, dtype=np.float32)
        return state, {}

    def step(self, action: np.ndarray):
        # TODO: publish steering, fixed throttle
        # get next state + reward Dm
        s2 = np.zeros(STATE_DIM, dtype=np.float32)
        reward = self._compute_dm(s2)
        done = False
        info = {}
        return s2, reward, done, info

    def _compute_dm(self, s):
        vx_t, vy_t, wz_t = 3.0, 1.5, 0.3
        err = np.array([s[0]-vx_t, s[1]-vy_t, s[2]-wz_t])
        return float(np.exp(-np.linalg.norm(err)))

In [None]:
# --- Data collection ---

def collect_rollout(env: CarEnv, policy, horizon: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    s, _ = env.reset()
    states, actions, next_states = [], [], []
    for _ in range(horizon):
        a = policy(s)
        s2, r, done, _ = env.step(a)
        states.append(s)
        actions.append(a)
        next_states.append(s2)
        s = s2
        if done:
            break
    return np.array(states), np.array(actions), np.array(next_states)


In [None]:
# --- GP Dynamics Model ---

def fit_gp_model(X: np.ndarray, Y: np.ndarray):
    """Fit independent GPs: f: (s,a)->Δs."""
    input_dim = X.shape[1]
    out_dim   = Y.shape[1]
    gps = []
    for d in range(out_dim):
        kernel = gpflow.kernels.RBF(lengthscales=np.ones(input_dim))
        m = gpflow.models.GPR(data=(X, Y[:, d:d+1]), kernel=kernel, mean_function=None)
        set_trainable(m.likelihood, False)  # noise fixed to default
        opt = gpflow.optimizers.Scipy()
        opt.minimize(m.training_loss, m.trainable_variables, options=dict(maxiter=500))
        gps.append(m)
    return gps

In [None]:
# --- RBF Policy ---
class RBFLayer(tf.keras.layers.Layer):
    def __init__(self, num_features: int, input_dim: int):
        super().__init__()
        self.W = self.add_weight(shape=(num_features, input_dim), initializer="random_normal")
        self.b = self.add_weight(shape=(num_features,), initializer="random_uniform")

    def call(self, x):
        # x shape (batch, input_dim)
        proj = tf.matmul(x, self.W, transpose_b=True) + self.b
        return tf.exp(-tf.square(proj))

class RBFPolicy(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.rbf = RBFLayer(RBF_FEATURES, STATE_DIM)
        self.head = tf.keras.layers.Dense(ACTION_DIM, activation="tanh")

    @tf.function
    def call(self, x):
        phi = self.rbf(x)
        a   = self.head(phi)
        return a

    def __call__(self, s: np.ndarray):
        s_tf = tf.convert_to_tensor(s.reshape(1,-1), dtype=tf.float32)
        a_tf = super().call(s_tf)
        return a_tf.numpy().flatten()

In [None]:
# --- PILCO Trainer (simplified) ---
class PILCOTrainer:
    def __init__(self, env: CarEnv):
        self.env = env
        self.policy = RBFPolicy()
        self.data_X = None  # will hold [s,a]
        self.data_Y = None  # Δs
        self.gps    = None

    def update_dataset(self, S, A, S2):
        X = np.concatenate([S, A], axis=1)
        Y = S2 - S
        if self.data_X is None:
            self.data_X, self.data_Y = X, Y
        else:
            self.data_X = np.vstack([self.data_X, X])
            self.data_Y = np.vstack([self.data_Y, Y])

    def train_gp(self):
        self.gps = fit_gp_model(self.data_X, self.data_Y)

    def optimise_policy(self, iterations: int = 100):
        # Placeholder: simple supervised trick — behavioural cloning of next‑state targets (NOT true PILCO optimization) for first version
        opt = tf.keras.optimizers.Adam(LEARNING_RATE)
        X_tf = tf.convert_to_tensor(self.data_X[:, :STATE_DIM], dtype=tf.float32)
        for _ in range(iterations):
            with tf.GradientTape() as tape:
                a_pred = self.policy(tf.convert_to_tensor(X_tf))
                loss = tf.reduce_mean(tf.square(a_pred))  # dummy loss, replace with analytic cost
            grads = tape.gradient(loss, self.policy.trainable_variables)
            opt.apply_gradients(zip(grads, self.policy.trainable_variables))

    def save(self, path="pilco_weights.npz"):
        weights = self.policy.get_weights()
        np.savez(path, *weights)

    def load(self, path="pilco_weights.npz"):
        data = np.load(path, allow_pickle=True)
        self.policy.set_weights([data[f"arr_{i}"] for i in range(len(data.files))])

    def run(self):
        for rollout_idx in range(ROLL_OUTS):
            S, A, S2 = collect_rollout(self.env, self.policy, HORIZON)
            self.update_dataset(S, A, S2)
            self.train_gp()
            self.optimise_policy(50)
            if rollout_idx % SAVE_EVERY == 0:
                self.save()
                print(f"Saved model at rollout {rollout_idx}, dataset size {len(self.data_X)}")

In [None]:
# --- CLI ---
if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    sub = ap.add_subparsers(dest="cmd", required=True)
    tr = sub.add_parser("train")
    ev = sub.add_parser("eval"); ev.add_argument("model")
    args = ap.parse_args()

    env = CarEnv()
    trainer = PILCOTrainer(env)

    if args.cmd == "train":
        trainer.run()
    else:
        trainer.load(args.model)
        S, A, S2 = collect_rollout(env, trainer.policy, HORIZON)
        print("Collected eval rollout, mean reward:", np.mean([env._compute_dm(s) for s in S2]))