# Neural EV Dynamics Simulation Walkthrough

This notebook demonstrates end-to-end usage of the transformer-based EV longitudinal controllers:

1. Load Stage 1 (forward/inverse) and Stage 2 (feedback) models.
2. Generate synthetic speed and grade profiles (or load from disk).
3. Run forward-only, inverse-only, and closed-loop simulations with perturbations.
4. Visualize the trajectories and error statistics.



In [1]:
import sys
from pathlib import Path

ROOT = Path.cwd().resolve().parent
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))



In [2]:
import json
from pathlib import Path
from typing import Dict, Optional

import matplotlib.pyplot as plt
import numpy as np
import torch
import time as pytime

from scripts.simulate_profiles import (
    PerturbationConfig,
    load_models,
    simulate_forward,
    simulate_inverse,
    simulate_closed_loop,
    SequenceWindowConfig,
)


## 1. Paths and Configuration
Fill in the checkpoint paths and simulation hyperparameters. The defaults below assume you ran Stage 1/2 trainings from the earlier walkthrough.



In [3]:
# Update these paths to match your environment
STAGE1_CKPT = Path("../checkpoints/stage1/ecentro_ha_03_bs512/stage1.pth")
STAGE2_CKPT = Path("../checkpoints/stage2/ecentro_ha_03_bs512/stage2.pth")
DATASET_PATH = Path("../data/processed/ECentro/ECENTRO_HA_03/all_trips_data.pt")

HISTORY = 100
HORIZON = 100
TOTAL_LENGTH = 1200  # total timesteps to simulate for closed-loop mode
SAMPLE_TIME = 0.1  # controller timestep in seconds
WARMUP_DURATION = 15.0  # seconds for ramp-to-initial-speed warmup

# Target speed profile options
TARGET_PROFILE_TYPE = "steps"  # options: "sine", "steps"
# Sequence of (duration_seconds, speed_mps) applied after the warm-up when TARGET_PROFILE_TYPE == "steps"
STEP_PROFILE_SEGMENTS = [
    (30.0, 1.5),
    (30.0, 5.0),
    (30.0, 3.0),
    (30.0, 7.0),
    (10.0, 0.0),
]

STATE_FEATURES = ["speed", "grade", "acceleration"]

window = SequenceWindowConfig(history=HISTORY, horizon=HORIZON)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



## 2. Load Models



In [4]:
forward_model, inverse_model, feedback_model = load_models(
    STAGE1_CKPT,
    STAGE2_CKPT,
    state_dim=len(STATE_FEATURES),
    action_dim=2,
    device=device,
)

def count_params(model: torch.nn.Module) -> int:
    return sum(p.numel() for p in model.parameters())

print("Models loaded on", device)
print(
    f"Forward params: {count_params(forward_model):,}",
    f"Inverse params: {count_params(inverse_model):,}",
    f"Feedback params: {count_params(feedback_model):,}",
    sep="\n",
)


  stage1_state = torch.load(stage1, map_location="cpu")


RuntimeError: Error(s) in loading state_dict for ForwardDynamicsModel:
	Missing key(s) in state_dict: "backbone.gru.weight_ih_l0", "backbone.gru.weight_hh_l0", "backbone.gru.bias_ih_l0", "backbone.gru.bias_hh_l0", "backbone.gru.weight_ih_l0_reverse", "backbone.gru.weight_hh_l0_reverse", "backbone.gru.bias_ih_l0_reverse", "backbone.gru.bias_hh_l0_reverse", "backbone.gru.weight_ih_l1", "backbone.gru.weight_hh_l1", "backbone.gru.bias_ih_l1", "backbone.gru.bias_hh_l1", "backbone.gru.weight_ih_l1_reverse", "backbone.gru.weight_hh_l1_reverse", "backbone.gru.bias_ih_l1_reverse", "backbone.gru.bias_hh_l1_reverse", "backbone.projection.weight", "backbone.projection.bias". 
	Unexpected key(s) in state_dict: "backbone.encoder.layers.0.self_attn.in_proj_weight", "backbone.encoder.layers.0.self_attn.in_proj_bias", "backbone.encoder.layers.0.self_attn.out_proj.weight", "backbone.encoder.layers.0.self_attn.out_proj.bias", "backbone.encoder.layers.0.linear1.weight", "backbone.encoder.layers.0.linear1.bias", "backbone.encoder.layers.0.linear2.weight", "backbone.encoder.layers.0.linear2.bias", "backbone.encoder.layers.0.norm1.weight", "backbone.encoder.layers.0.norm1.bias", "backbone.encoder.layers.0.norm2.weight", "backbone.encoder.layers.0.norm2.bias", "backbone.encoder.layers.1.self_attn.in_proj_weight", "backbone.encoder.layers.1.self_attn.in_proj_bias", "backbone.encoder.layers.1.self_attn.out_proj.weight", "backbone.encoder.layers.1.self_attn.out_proj.bias", "backbone.encoder.layers.1.linear1.weight", "backbone.encoder.layers.1.linear1.bias", "backbone.encoder.layers.1.linear2.weight", "backbone.encoder.layers.1.linear2.bias", "backbone.encoder.layers.1.norm1.weight", "backbone.encoder.layers.1.norm1.bias", "backbone.encoder.layers.1.norm2.weight", "backbone.encoder.layers.1.norm2.bias", "backbone.encoder.layers.2.self_attn.in_proj_weight", "backbone.encoder.layers.2.self_attn.in_proj_bias", "backbone.encoder.layers.2.self_attn.out_proj.weight", "backbone.encoder.layers.2.self_attn.out_proj.bias", "backbone.encoder.layers.2.linear1.weight", "backbone.encoder.layers.2.linear1.bias", "backbone.encoder.layers.2.linear2.weight", "backbone.encoder.layers.2.linear2.bias", "backbone.encoder.layers.2.norm1.weight", "backbone.encoder.layers.2.norm1.bias", "backbone.encoder.layers.2.norm2.weight", "backbone.encoder.layers.2.norm2.bias", "backbone.encoder.layers.3.self_attn.in_proj_weight", "backbone.encoder.layers.3.self_attn.in_proj_bias", "backbone.encoder.layers.3.self_attn.out_proj.weight", "backbone.encoder.layers.3.self_attn.out_proj.bias", "backbone.encoder.layers.3.linear1.weight", "backbone.encoder.layers.3.linear1.bias", "backbone.encoder.layers.3.linear2.weight", "backbone.encoder.layers.3.linear2.bias", "backbone.encoder.layers.3.norm1.weight", "backbone.encoder.layers.3.norm1.bias", "backbone.encoder.layers.3.norm2.weight", "backbone.encoder.layers.3.norm2.bias", "backbone.encoder.norm.weight", "backbone.encoder.norm.bias". 
	size mismatch for history_proj.net.0.weight: copying a param with shape torch.Size([256, 4]) from checkpoint, the shape in current model is torch.Size([256, 5]).
	size mismatch for output_head.2.weight: copying a param with shape torch.Size([2, 256]) from checkpoint, the shape in current model is torch.Size([3, 256]).
	size mismatch for output_head.2.bias: copying a param with shape torch.Size([2]) from checkpoint, the shape in current model is torch.Size([3]).

In [None]:
autoencoder = None
auto_mapper = None
print("Autoencoder warm-start disabled; using synthetic ramp warmup instead.")



## 3. Generate Profiles
We will synthesise target speed and grade profiles. You can swap these with `np.load` calls if you have recorded trajectories.



In [None]:
time = np.arange(TOTAL_LENGTH)

INITIAL_TARGET_SPEED = 1.5 # m/s

WARMUP_GRADIENT = 1.0  # m/s^2


def build_linear_warmup_schedule(gradient: float,
                                 target_speed: float = INITIAL_TARGET_SPEED,
                                 dt: float = SAMPLE_TIME) -> np.ndarray:
    gradient = float(max(gradient, 1e-4))
    steps = int(np.ceil(target_speed / (gradient * dt)))
    if steps < 1:
        steps = 1
    speeds = gradient * dt * np.arange(1, steps + 1, dtype=np.float32)
    return np.clip(speeds, 0.0, target_speed)


def build_target_speed_profile(
    warmup_schedule: np.ndarray,
    profile_type: str = "sine",
) -> np.ndarray:
    profile = np.ones_like(time, dtype=np.float32) * INITIAL_TARGET_SPEED
    warmup_steps = int(np.round(warmup_schedule.size))
    after_warmup_steps = warmup_steps
    remaining_steps = profile.size - after_warmup_steps
    if remaining_steps > 0:
        if profile_type == "sine":
            phase = np.arange(remaining_steps)
            profile[after_warmup_steps:] = INITIAL_TARGET_SPEED + 1.5 * np.sin(phase / 60.0)
        elif profile_type == "steps":
            idx = after_warmup_steps
            segments = STEP_PROFILE_SEGMENTS or [(remaining_steps * SAMPLE_TIME, float(INITIAL_TARGET_SPEED))]
            last_speed = float(INITIAL_TARGET_SPEED)
            for duration, speed in segments:
                steps = int(np.round(duration / SAMPLE_TIME))
                if steps <= 0:
                    continue
                end = min(idx + steps, profile.size)
                profile[idx:end] = float(speed)
                last_speed = float(speed)
                idx = end
                if idx >= profile.size:
                    break
            if idx < profile.size:
                profile[idx:] = last_speed
        else:
            raise ValueError(f"Unknown target profile type: {profile_type}")
    return profile


DEFAULT_WARMUP_SCHEDULE = build_linear_warmup_schedule(WARMUP_GRADIENT)
WARMUP_DURATION = DEFAULT_WARMUP_SCHEDULE.size * SAMPLE_TIME

TARGET_SPEED = build_target_speed_profile(DEFAULT_WARMUP_SCHEDULE, TARGET_PROFILE_TYPE)
GRADE_PROFILE = np.zeros_like(time, dtype=np.float32)

initial_speed = np.zeros(HISTORY, dtype=np.float32)
initial_grade_profile = np.zeros(HISTORY, dtype=np.float32)
initial_actions = np.zeros((HISTORY, 2), dtype=np.float32)

print(
    "Warm-start summary → speed=",
    float(initial_speed[-1]),
    "warmup gradient=",
    WARMUP_GRADIENT,
    "warmup duration [s]=",
    DEFAULT_WARMUP_SCHEDULE.size * SAMPLE_TIME,
    "first target speed=",
    float(TARGET_SPEED[0]),
    "grade=",
    float(initial_grade_profile[-1]),
    "actions(throttle, brake)=",
    tuple(initial_actions[-1]),
)
print("Target profile type=", TARGET_PROFILE_TYPE)
if TARGET_PROFILE_TYPE == "steps":
    total_segment_duration = sum(float(seg[0]) for seg in STEP_PROFILE_SEGMENTS)
    print("Step segments (duration [s], speed [m/s]):", STEP_PROFILE_SEGMENTS)
    print("Step profile total duration [s]=", total_segment_duration)



In [None]:
fig, ax = plt.subplots(figsize=(10, 4))
t_profile = np.arange(1, DEFAULT_WARMUP_SCHEDULE.size + 1) * SAMPLE_TIME
ax.plot(t_profile, DEFAULT_WARMUP_SCHEDULE, label=f"Warm-up (gradient {WARMUP_GRADIENT:.1f} m/s^2)")
ax.set_xlabel("Warm-up time [s]")
ax.set_ylabel("Target speed [m/s]")
ax.set_title("Warm-up Target Speed Profile")
ax.grid(True, alpha=0.3)
ax.legend(loc="lower right")
plt.tight_layout()
plt.show()



## 4. Forward Simulation
Given a user-defined actuation horizon, predict the vehicle's future speed.



### Feedforward vs Feedback (Noiseless Simulation)



In [None]:
# Example actuation sequence: moderate throttle, no brake
actuation_horizon = np.zeros((HORIZON, 2), dtype=np.float32)
actuation_horizon[:, 0] = 40  # 40% throttle equivalent

forward_preds = simulate_forward(
    forward_model,
    initial_speed,
    initial_actions,
    actuation_horizon,
    GRADE_PROFILE[:HISTORY + HORIZON],
    window,
    STATE_FEATURES,
    device,
)
print("Forward-predicted speeds (first 10):", forward_preds[:10, 0])



## 5. Inverse Simulation
Given a target speed horizon, compute the required actuation plan (feedforward controller).



In [None]:
target_window = TARGET_SPEED[HISTORY : HISTORY + HORIZON]

inverse_actions = simulate_inverse(
    inverse_model,
    initial_speed,
    initial_actions,
    target_window,
    GRADE_PROFILE[:HISTORY + HORIZON],
    window,
    STATE_FEATURES,
    device,
)
print("Inverse-predicted throttle/brake (first 10 rows):\n", inverse_actions[:10])



## 6. Closed-loop Simulation
At each time step:
1. Feed the latest noisy measurements to the inverse (and optional feedback) model to obtain actuations.
2. Apply optional actuation noise.
3. Propagate the true state with the forward model (vehicle response).
4. Update history windows and continue.



In [None]:
def run_closed_loop_case(
    feedback_module,
    perturb,
    warmup_schedule: Optional[np.ndarray] = None,
):
    schedule = (
        np.asarray(warmup_schedule, dtype=np.float32)
        if warmup_schedule is not None
        else DEFAULT_WARMUP_SCHEDULE
    )
    warmup_seconds = schedule.size * SAMPLE_TIME
    return simulate_closed_loop(
        forward_model,
        inverse_model,
        feedback_module,
        initial_speed,
        initial_actions,
        TARGET_SPEED.copy(),
        GRADE_PROFILE.copy(),
        window,
        STATE_FEATURES,
        device,
        perturb=perturb,
        warmup_seconds=warmup_seconds,
        time_step=SAMPLE_TIME,
        warmup_speed_schedule=schedule,
    )

results_clean_ff = run_closed_loop_case(None, None)
results_clean_fb = run_closed_loop_case(feedback_model, None)

print(
    "Feedforward-only speed (first 10):",
    results_clean_ff["simulated_speed"][:10],
)
print(
    "With feedback speed (first 10):",
    results_clean_fb["simulated_speed"][:10],
)
print("Warmup duration [s] =", WARMUP_DURATION)



In [None]:
warmup_ff = results_clean_ff.get("warmup", {})
warmup_fb = results_clean_fb.get("warmup", {})

if warmup_ff and warmup_fb:
    t_ff = np.arange(warmup_ff["speed"].size) * SAMPLE_TIME
    t_fb = np.arange(warmup_fb["speed"].size) * SAMPLE_TIME

    fig_speed, axes_speed = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
    axes_speed[0].plot(t_fb, warmup_fb["speed"], label="With feedback")
    axes_speed[0].plot(t_fb, warmup_fb["target_speed"], linestyle="--", label="Target")
    axes_speed[0].set_title("Warm-up Speed (Feedback Enabled)")
    axes_speed[0].set_ylabel("Speed [m/s]")
    axes_speed[0].grid(True, alpha=0.3)
    axes_speed[0].legend(loc="lower right")

    axes_speed[1].plot(t_ff, warmup_ff["speed"], label="Feedforward only")
    axes_speed[1].plot(t_ff, warmup_ff["target_speed"], linestyle="--", label="Target")
    axes_speed[1].set_title("Warm-up Speed (Feedforward Only)")
    axes_speed[1].set_xlabel("Time [s]")
    axes_speed[1].set_ylabel("Speed [m/s]")
    axes_speed[1].grid(True, alpha=0.3)
    axes_speed[1].legend(loc="lower right")
    plt.tight_layout()
    plt.show()

    fig_act, axes_act = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
    axes_act[0].plot(t_fb, warmup_fb["actions"][:, 0], label="Throttle")
    axes_act[0].set_title("Warm-up Throttle (Feedback Enabled)")
    axes_act[0].set_ylabel("Throttle [%]")
    axes_act[0].grid(True, alpha=0.3)

    axes_act[1].plot(t_ff, warmup_ff["actions"][:, 0], label="Throttle", color="tab:orange")
    axes_act[1].set_title("Warm-up Throttle (Feedforward Only)")
    axes_act[1].set_xlabel("Time [s]")
    axes_act[1].set_ylabel("Throttle [%]")
    axes_act[1].grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
else:
    print("Warm-up data unavailable.")


In [None]:
warmup_clean = results_clean_fb.get("warmup", {})
warmup_speed = warmup_clean.get("speed", np.array([]))
warmup_target_speed = warmup_clean.get("target_speed", np.array([]))
warmup_grade = warmup_clean.get("grade", np.array([]))
warmup_actions = warmup_clean.get("actions", np.zeros((0, 2), dtype=np.float32))
final_history_speed = results_clean_fb.get("final_history_speed", initial_speed)
final_history_actions = results_clean_fb.get("final_history_actions", initial_actions)
final_history_grade = results_clean_fb.get("final_history_grade", np.zeros_like(initial_speed))



In [None]:
# Replace the placeholder history with the post-warmup history for downstream cells
if final_history_speed.size:
    initial_speed = final_history_speed.astype(np.float32)
if final_history_actions.size:
    initial_actions = final_history_actions.astype(np.float32)
if final_history_grade.size:
    initial_grade_profile = final_history_grade.astype(np.float32)



### Warm-up Ramp Diagnostics



In [None]:
if warmup_speed.size:
    fig, ax = plt.subplots(3, 1, figsize=(12, 8), sharex=True)
    t_warmup = np.arange(warmup_speed.size) * SAMPLE_TIME

    ax[0].plot(t_warmup, warmup_target_speed, label="Target", linestyle="--")
    ax[0].plot(t_warmup, warmup_speed, label="Vehicle speed")
    ax[0].set_ylabel("Speed [m/s]")
    ax[0].set_title("Warm-up Speed Tracking")
    ax[0].legend()
    ax[0].grid(True, alpha=0.3)

    ax[1].plot(t_warmup, np.degrees(warmup_grade), label="Grade (target)", color="tab:brown")
    ax[1].set_ylabel("Grade [deg]")
    ax[1].set_title("Warm-up Grade Target")
    ax[1].grid(True, alpha=0.3)

    ax[2].plot(t_warmup, warmup_actions[:, 0], label="Throttle", color="tab:orange")
    ax[2].plot(t_warmup, warmup_actions[:, 1], label="Brake", color="tab:red")
    ax[2].set_ylabel("Actuation [%]")
    ax[2].set_xlabel("Warm-up time [s]")
    ax[2].set_title("Warm-up Actuation Commands")
    ax[2].grid(True, alpha=0.3)
    ax[2].legend()

    plt.tight_layout()
    plt.show()
else:
    print("Warm-up trace unavailable (no ramp executed).")



### Plot Closed-loop Results (Clean)



### Warm-up Configuration



In [None]:
sim_speed_ff = results_clean_ff["simulated_speed"]
sim_speed_fb = results_clean_fb["simulated_speed"]
actuations_ff = results_clean_ff["applied_actions"]
actuations_fb = results_clean_fb["applied_actions"]
feedforward_ff = results_clean_ff.get("feedforward_actions")
feedforward_fb = results_clean_fb.get("feedforward_actions")
feedback_fb = results_clean_fb.get("feedback_actions")
time_closed = np.arange(sim_speed_fb.shape[0]) * SAMPLE_TIME

fig, ax = plt.subplots(3, 1, figsize=(12, 9), sharex=True)
ax[0].plot(time_closed, TARGET_SPEED[HISTORY:HISTORY + sim_speed_fb.shape[0]], label="Target", linestyle="--")
ax[0].plot(time_closed, sim_speed_ff, label="Feedforward only", color="tab:orange", alpha=0.6)
ax[0].plot(time_closed, sim_speed_fb, label="With feedback", color="tab:blue")
ax[0].set_ylabel("Speed [m/s]")
ax[0].set_title("Clean Speed Tracking")
ax[0].legend()
ax[0].grid(True, alpha=0.3)

ax[1].plot(time_closed, actuations_ff[:, 0], label="Throttle (FF only)", color="tab:orange", alpha=0.6)
ax[1].plot(time_closed, actuations_fb[:, 0], label="Throttle (with FB)", color="tab:blue")
if feedforward_fb is not None:
    ax[1].plot(time_closed, feedforward_fb[:, 0], label="Feedforward", linestyle="--", color="tab:brown")
if feedback_fb is not None:
    ax[1].plot(time_closed, feedback_fb[:, 0], label="Feedback Δ", linestyle=":", color="tab:green")
ax[1].set_ylabel("Throttle [%]")
ax[1].set_title("Noisy Throttle Commands")
ax[1].grid(True, alpha=0.3)
ax[1].legend(loc="upper right")

ax[2].plot(time_closed, actuations_ff[:, 1], label="Brake (FF only)", color="tab:red", alpha=0.4)
ax[2].plot(time_closed, actuations_fb[:, 1], label="Brake (with FB)", color="tab:pink")
if feedforward_fb is not None:
    ax[2].plot(time_closed, feedforward_fb[:, 1], label="Feedforward", linestyle="--", color="tab:purple")
if feedback_fb is not None:
    ax[2].plot(time_closed, feedback_fb[:, 1], label="Feedback Δ", linestyle=":", color="tab:gray")
ax[2].set_ylabel("Brake [%]")
ax[2].set_xlabel("Time [s]")
ax[2].set_title("Noisy Brake Commands")
ax[2].grid(True, alpha=0.3)
ax[2].legend(loc="upper right")

plt.tight_layout()
plt.show()



### Closed-loop Simulation with Measurement/Actuation Noise



In [None]:
perturb_cfg = PerturbationConfig(
    speed_noise_std=0.2,   # m/s measurement noise
    speed_delay=1,         # 1-step delay in speed measurement
    grade_noise_std=0.002, # radians
    actuation_noise_std=0.05,
)

results_noisy_ff = run_closed_loop_case(None, perturb_cfg, warmup_schedule=DEFAULT_WARMUP_SCHEDULE)
results_noisy_fb = run_closed_loop_case(feedback_model, perturb_cfg, warmup_schedule=DEFAULT_WARMUP_SCHEDULE)

print(
    "Feedforward-only speed (first 10, noisy):",
    results_noisy_ff["simulated_speed"][:10],
)
print(
    "With feedback speed (first 10, noisy):",
    results_noisy_fb["simulated_speed"][:10],
)



### Plot Closed-loop Results (Noisy)



In [None]:
sim_speed_noisy_ff = results_noisy_ff["simulated_speed"]
sim_speed_noisy_fb = results_noisy_fb["simulated_speed"]
actuations_noisy_ff = results_noisy_ff["applied_actions"]
actuations_noisy_fb = results_noisy_fb["applied_actions"]
feedforward_noisy_fb = results_noisy_fb.get("feedforward_actions")
feedback_noisy_fb = results_noisy_fb.get("feedback_actions")
time_noisy = np.arange(sim_speed_noisy_fb.shape[0]) * SAMPLE_TIME

fig, ax = plt.subplots(3, 1, figsize=(12, 9), sharex=True)
ax[0].plot(time_noisy, TARGET_SPEED[HISTORY:HISTORY + sim_speed_noisy_fb.shape[0]], label="Target", linestyle="--")
ax[0].plot(time_noisy, sim_speed_noisy_ff, label="Feedforward only", color="tab:orange", alpha=0.6)
ax[0].plot(time_noisy, sim_speed_noisy_fb, label="With feedback", color="tab:blue")
ax[0].set_ylabel("Speed [m/s]")
ax[0].set_title("Clean Speed Tracking")
ax[0].legend()
ax[0].grid(True, alpha=0.3)

ax[1].plot(time_noisy, actuations_noisy_ff[:, 0], label="Throttle (FF only)", color="tab:orange", alpha=0.6)
ax[1].plot(time_noisy, actuations_noisy_fb[:, 0], label="Throttle (with FB)", color="tab:blue")
if feedforward_noisy_fb is not None:
    ax[1].plot(time_noisy, feedforward_noisy_fb[:, 0], label="Feedforward", linestyle="--", color="tab:brown")
if feedback_noisy_fb is not None:
    ax[1].plot(time_noisy, feedback_noisy_fb[:, 0], label="Feedback Δ", linestyle=":", color="tab:green")
ax[1].set_ylabel("Throttle [%]")
ax[1].set_title("Noisy Throttle Commands")
ax[1].grid(True, alpha=0.3)
ax[1].legend(loc="upper right")

ax[2].plot(time_noisy, actuations_noisy_ff[:, 1], label="Brake (FF only)", color="tab:red", alpha=0.4)
ax[2].plot(time_noisy, actuations_noisy_fb[:, 1], label="Brake (with FB)", color="tab:pink")
if feedforward_noisy_fb is not None:
    ax[2].plot(time_noisy, feedforward_noisy_fb[:, 1], label="Feedforward", linestyle="--", color="tab:purple")
if feedback_noisy_fb is not None:
    ax[2].plot(time_noisy, feedback_noisy_fb[:, 1], label="Feedback Δ", linestyle=":", color="tab:gray")
ax[2].set_ylabel("Brake [%]")
ax[2].set_xlabel("Time [s]")
ax[2].set_title("Noisy Brake Commands")
ax[2].grid(True, alpha=0.3)
ax[2].legend(loc="upper right")

plt.tight_layout()
plt.show()



## 7. Compare Noise-Free vs Perturbed Controllers
Run the closed-loop simulation twice to quantify the effect of measurement noise.



In [None]:
def rmse(y_true, y_pred):
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))

def compute_rmse(results):
    target = TARGET_SPEED[HISTORY:HISTORY + results["simulated_speed"].shape[0]]
    return rmse(target, results["simulated_speed"])

print(f"Clean RMSE (FF only): {compute_rmse(results_clean_ff):.3f} m/s")
print(f"Clean RMSE (with FB): {compute_rmse(results_clean_fb):.3f} m/s")
print(f"Noisy RMSE (FF only): {compute_rmse(results_noisy_ff):.3f} m/s")
print(f"Noisy RMSE (with FB): {compute_rmse(results_noisy_fb):.3f} m/s")



### Feedforward vs Feedback (Clean Simulation)



In [None]:
clean_feedforward_fb = results_clean_fb.get("feedforward_actions")
clean_feedback_fb = results_clean_fb.get("feedback_actions")
if clean_feedforward_fb is not None and clean_feedback_fb is not None:
    time_clean = np.arange(clean_feedforward_fb.shape[0]) * SAMPLE_TIME

    fig, ax = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
    ax[0].plot(time_clean, clean_feedforward_fb[:, 0], label="Feedforward", color="tab:brown", linestyle="--")
    ax[0].plot(time_clean, clean_feedback_fb[:, 0], label="Feedback Δ", color="tab:green", linestyle=":")
    ax[0].plot(time_clean, clean_feedforward_fb[:, 0] + clean_feedback_fb[:, 0], label="Final", color="tab:orange")
    ax[0].set_ylabel("Throttle [%]")
    ax[0].set_title("Clean Throttle Decomposition")
    ax[0].grid(True, alpha=0.3)
    ax[0].legend(loc="upper right")

    ax[1].plot(time_clean, clean_feedforward_fb[:, 1], label="Feedforward", color="tab:pink", linestyle="--")
    ax[1].plot(time_clean, clean_feedback_fb[:, 1], label="Feedback Δ", color="tab:purple", linestyle=":")
    ax[1].plot(time_clean, clean_feedforward_fb[:, 1] + clean_feedback_fb[:, 1], label="Final", color="tab:red")
    ax[1].set_ylabel("Brake [%]")
    ax[1].set_xlabel("Time [s]")
    ax[1].set_title("Clean Brake Decomposition")
    ax[1].grid(True, alpha=0.3)
    ax[1].legend(loc="upper right")

    plt.tight_layout()
    plt.show()
else:
    print("Feedforward/feedback signals not available in clean results.")



### Plot Comparison



In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 4))
ax.plot(results_clean_ff["simulated_speed"], label="Clean FF", alpha=0.6)
ax.plot(results_clean_fb["simulated_speed"], label="Clean FB", alpha=0.9)
ax.plot(results_noisy_ff["simulated_speed"], label="Noisy FF", alpha=0.6)
ax.plot(results_noisy_fb["simulated_speed"], label="Noisy FB", alpha=0.9)
ax.plot(
    TARGET_SPEED[HISTORY:HISTORY + len(results_clean_fb["simulated_speed"])],
    label="Target",
    linestyle="--",
    alpha=0.4,
)
ax.set_title("Speed Tracking: Feedforward vs Feedback")
ax.set_ylabel("Speed [m/s]")
ax.set_xlabel("Time [s]")
ax.legend(ncol=2)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()



### Inference Performance
We benchmark single forward passes for the trained models to understand realtime feasibility.



In [None]:
from src.data.datasets import EVSequenceDataset, SequenceWindowConfig as DSWindowConfig

timing_dataset = EVSequenceDataset(
    DATASET_PATH,
    window=DSWindowConfig(history=HISTORY, horizon=HORIZON),
)
sample = timing_dataset[0]

history_states_t = sample["history_states"].unsqueeze(0).to(device)
history_actions_t = sample["history_actions"].unsqueeze(0).to(device)
future_states_t = sample["future_states"].unsqueeze(0).to(device)
future_actions_t = sample["future_actions"].unsqueeze(0).to(device)
history_residual_t = torch.zeros_like(history_actions_t)

forward_model.eval()
inverse_model.eval()
feedback_model.eval()

if device.type == "cuda":
    torch.cuda.synchronize()


def measure_latency(fn, warmup: int = 10, iters: int = 100) -> float:
    if device.type == "cuda":
        torch.cuda.synchronize()
    for _ in range(warmup):
        fn()
    if device.type == "cuda":
        torch.cuda.synchronize()
    start = pytime.perf_counter()
    for _ in range(iters):
        fn()
    if device.type == "cuda":
        torch.cuda.synchronize()
    end = pytime.perf_counter()
    return (end - start) / iters


@torch.no_grad()
def run_inverse():
    return inverse_model(history_states_t, history_actions_t, future_states_t)


def run_forward(actions: torch.Tensor):
    with torch.no_grad():
        return forward_model(history_states_t, history_actions_t, actions)


feedforward_baseline = run_inverse()

@torch.no_grad()
def run_feedback():
    return feedback_model(
        history_states_t,
        history_actions_t,
        future_states_t,
        feedforward_baseline,
        history_residual_actions=history_residual_t,
    )

lat_inverse = measure_latency(run_inverse)
lat_feedback = measure_latency(run_feedback)
lat_forward_gt = measure_latency(lambda: run_forward(future_actions_t))
lat_forward_ff = measure_latency(lambda: run_forward(feedforward_baseline))

print(
    f"Inverse model latency: {lat_inverse * 1e3:.3f} ms",
    f"Feedback model latency: {lat_feedback * 1e3:.3f} ms",
    f"Forward model latency (GT actuations): {lat_forward_gt * 1e3:.3f} ms",
    f"Forward model latency (FF actuations): {lat_forward_ff * 1e3:.3f} ms",
    sep="\n",
)



## 8. Next Steps
- Swap the synthetic profiles with recorded data (`np.load`).
- Tune `PerturbationConfig` to emulate additional disturbances (e.g., wheel slip).
- Integrate the loop into larger planning stacks for MPC benchmarking.

