In [1]:
import os
import shutil
from pathlib import Path

import casadi as ca
import numpy as np
from acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver


In [2]:

"""
Moving‑Horizon Estimator (MHE) for identifying per‑joint dynamical parameters
(k, viscous, coulomb and rotor inertia – each with forward & reverse coefficients)
of the Alpha‑5 underwater arm.

The script
~~~~~~~~~~
1. Loads the discrete forward‑dynamics function ``Mnext`` from *arm.casadi*.
2. Builds an augmented **discrete** model where the unknown parameter vector
   θ is appended to the system state as *constant* dynamics.
3. Configures an ACADOS OCP in estimator (MHE) form with a sliding horizon.
4. Generates and compiles the solver, then copies the resulting
   shared library to *libMHEarm.so* so it can be linked from C++.
5. Exposes ``mhe_step`` for real‑time iterative updates.

Edit the *CONFIGURATION* block as needed (horizon length, weights,
initial covariances, payload, etc.).
"""

# ───────────────────────────────────────── CONFIGURATION ─────────────────────────────────────────
N_HORIZON = 20            # number of MHE shooting nodes
DT        = 0.01          # [s] sampling period
G_VAL     = -9.81         # gravity (signed)
N_JOINTS  = 4             # Alpha‑5 active joints

# Each of the four parameter groups has *two* coefficients per joint → 8·n_joints
N_THETA   = 8 * N_JOINTS

# Constant payload assumption (mass, Ixx, Iyy, Izz)
PAYLOAD_PROPS = np.zeros(4)

# Disable joint‑limit saturation inside estimation (the guard logic in
# the model already enforces safety)
LOWER_JOINT_LIMIT = -np.inf * np.ones(N_JOINTS)
UPPER_JOINT_LIMIT = +np.inf * np.ones(N_JOINTS)
EPS_TORQUE       = np.zeros(N_JOINTS)

# Cost weights — tune to sensor noise & desired convergence behaviour
QY   = 1e3 * np.ones(2 * N_JOINTS)   # state residual (q, q̇)
R_U  = 1e-2 * np.ones(N_JOINTS)       # control regularisation

# Arrival‑cost (prior) covariance – large numbers ⇒ weak prior
P0_THETA_INV = 1e-4 * np.eye(N_THETA)  # (unused for now, but could feed into W_0)

# Paths
try:
    THIS_DIR = Path(__file__).resolve().parent
except NameError:                          # interactive / notebook fallback
    THIS_DIR = Path.cwd()

CASADI_FN = THIS_DIR / "arm.casadi"
LIB_NAME  = THIS_DIR / "libMHEarm.so"

# ──────────────────────────────────────── MODEL DEFINITION ───────────────────────────────────────

def build_augmented_model() -> AcadosModel:
    """Return an :class:`AcadosModel` with θ appended as constant states."""

    if not CASADI_FN.exists():
        raise FileNotFoundError("arm.casadi not found – generate it before running the MHE script.")

    F_next = ca.Function.load(str(CASADI_FN))

    # ── symbolic variables ────────────────────────────────────────────────
    x_mech = ca.SX.sym("x_mech", 2 * N_JOINTS)  # [q; q̇]
    theta  = ca.SX.sym("theta",  N_THETA)       # parameters to estimate
    u      = ca.SX.sym("u", N_JOINTS)           # commanded torques

    # Constants are hard‑wired here; change to SX if you need them time‑varying
    x_next_mech = F_next(
        x_mech,
        u,
        DT,
        G_VAL,
        PAYLOAD_PROPS,
        theta,                     # ← unknowns enter here
        LOWER_JOINT_LIMIT,
        UPPER_JOINT_LIMIT,
        EPS_TORQUE,
    )

    # θ has zero dynamics (identity map)
    x_next = ca.vertcat(x_next_mech, theta)
    x_aug  = ca.vertcat(x_mech,      theta)

    model = AcadosModel()
    model.name            = "arm_mhe"
    model.x               = x_aug
    model.u               = u
    model.disc_dyn_expr   = x_next

    return model

# ────────────────────────────────────────── OCP BUILDER ──────────────────────────────────────────

def export_mhe_solver(N: int = N_HORIZON) -> AcadosOcpSolver:
    """Create, compile and return an ACADOS solver instance for the MHE."""

    model     = build_augmented_model()
    nx        = model.x.size()[0]
    nu        = model.u.size()[0]
    ny_state  = 2 * N_JOINTS          # measured part of the state
    ny_total  = ny_state + nu         # stacked [state; control] residuals

    ocp = AcadosOcp()
    ocp.model = model

    # ── dimensions ────────────────────────────────────────────────────────
    ocp.dims.ny     = ny_total
    ocp.dims.ny_e   = ny_state
    ocp.dims.ny_0   = 0              # no separate initial LS cost

    # ── cost (linear LS) ──────────────────────────────────────────────────
    ocp.cost.cost_type   = "LINEAR_LS"
    ocp.cost.cost_type_e = "LINEAR_LS"

    # Vx maps states → outputs
    Vx_full = np.zeros((ny_total, nx))
    Vx_full[:ny_state, :ny_state] = np.eye(ny_state)   # extract q & q̇

    # Vu maps inputs → outputs (only penalise control magnitude)
    Vu_full = np.zeros((ny_total, nu))
    Vu_full[ny_state:, :] = np.eye(nu)                 # last rows correspond to u

    ocp.cost.Vx   = Vx_full
    ocp.cost.Vu   = Vu_full
    ocp.cost.W    = np.diag(np.concatenate([QY, R_U]))
    ocp.cost.yref = np.zeros(ny_total)

    # Terminal (arrival) cost – only on state residuals
    Vx_e = np.zeros((ny_state, nx))
    Vx_e[:, :ny_state] = np.eye(ny_state)

    ocp.cost.Vx_e   = Vx_e
    ocp.cost.W_e    = np.diag(QY)
    ocp.cost.yref_e = np.zeros(ny_state)

    # ── initial state guess ───────────────────────────────────────────────
    x0_bar     = np.zeros(2 * N_JOINTS)
    theta0_bar = np.zeros(N_THETA)
    ocp.constraints.x0 = np.concatenate([x0_bar, theta0_bar])

    # ── solver options ────────────────────────────────────────────────────
    ocp.solver_options.integrator_type = "DISCRETE"
    ocp.solver_options.qp_solver       = "FULL_CONDENSING_QPOASES"
    ocp.solver_options.hessian_approx  = "GAUSS_NEWTON"
    ocp.solver_options.nlp_solver_type = "SQP_RTI"
    ocp.solver_options.print_level     = 0
    ocp.solver_options.tf              = N * DT  # ← set total horizon length     = 0
    ocp.solver_options.N_horizon = N

    # ── code generation ───────────────────────────────────────────────────
    print("\n⏳  Generating and compiling MHE solver…")
    solver = AcadosOcpSolver(ocp, json_file="acados_ocp_mhe.json")

    # copy generated shared library next to this script for easy C++ linking
    lib_src = Path(solver.shared_lib_name)
    try:
        shutil.copy(lib_src, LIB_NAME)
        print(f"✔  Shared library saved to {LIB_NAME}")
    except Exception as exc:
        print(f"⚠  Could not copy shared library: {exc}")

    return solver

# ───────────────────────────────────── ONLINE UPDATE STEP ───────────────────────────────────────

def mhe_step(solver: AcadosOcpSolver, y_meas: np.ndarray, u_k: np.ndarray) -> np.ndarray:
    """Shift horizon, insert latest measurement & control, solve, return θ̂."""

    if y_meas.shape != (2 * N_JOINTS,):
        raise ValueError("y_meas must contain [q; q̇] for all joints.")
    if u_k.shape != (N_JOINTS,):
        raise ValueError("u_k must be the commanded torque vector.")

    # shift horizon
    solver.shift_states()
    solver.shift_inputs()

    # populate measurement & control across horizon (zero‑order hold)
    ny_state = 2 * N_JOINTS
    y_ref_k  = np.concatenate([y_meas, u_k])
    for i in range(solver.acados_ocp.dims.N):
        solver.set(i, "y_ref", y_ref_k)
        solver.set(i, "u",     u_k)
    solver.set(solver.acados_ocp.dims.N, "y_ref_e", y_meas)

    status = solver.solve()
    if status != 0:
        raise RuntimeError(f"ACADOS solver failure (status = {status})")

    # latest θ̂ is the tail of state vector at node 0
    x_est = solver.get(0, "x")
    theta_hat = x_est[-N_THETA:]
    return theta_hat

# ──────────────────────────────────────────── MAIN ─────────────────────────────────────────────
if __name__ == "__main__":
    solver = export_mhe_solver()

    # quick smoke‑test loop (replace with real data feed)
    import time
    meas   = np.zeros(2 * N_JOINTS)
    torque = np.zeros(N_JOINTS)
    for k in range(5):
        θ_hat = mhe_step(solver, meas, torque)
        print(f"Step {k:02d} — θ̂ = {θ_hat}")
        time.sleep(DT)



⏳  Generating and compiling MHE solver…
NOTE: The selected QP solver FULL_CONDENSING_QPOASES does not support one-sided constraints yet.
rm -f libacados_ocp_solver_arm_mhe.so
rm -f acados_solver_arm_mhe.o
cc -fPIC -std=c99   -O2 -DACADOS_WITH_QPOASES -I/home/mrrobot/acados/include -I/home/mrrobot/acados/include/acados -I/home/mrrobot/acados/include/blasfeo/include -I/home/mrrobot/acados/include/hpipm/include -I /home/mrrobot/acados/include/qpOASES_e/  -c -o acados_solver_arm_mhe.o acados_solver_arm_mhe.c
cc -fPIC -std=c99   -O2 -DACADOS_WITH_QPOASES -I/home/mrrobot/acados/include -I/home/mrrobot/acados/include/acados -I/home/mrrobot/acados/include/blasfeo/include -I/home/mrrobot/acados/include/hpipm/include -I /home/mrrobot/acados/include/qpOASES_e/  -c -o arm_mhe_model/arm_mhe_dyn_disc_phi_fun.o arm_mhe_model/arm_mhe_dyn_disc_phi_fun.c
cc -fPIC -std=c99   -O2 -DACADOS_WITH_QPOASES -I/home/mrrobot/acados/include -I/home/mrrobot/acados/include/acados -I/home/mrrobot/acados/include/blas

cc: fatal error: Killed signal terminated program cc1
compilation terminated.
make: *** [<builtin>: arm_mhe_model/arm_mhe_dyn_disc_phi_fun_jac.o] Error 1


acados was compiled with OpenMP.


OSError: /home/mrrobot/dev_ws/src/kinodynamic-underwater-manipulator/usage/c_generated_code/libacados_ocp_solver_arm_mhe.so: cannot open shared object file: No such file or directory