In [2]:
import casadi as ca
import numpy as np
from utils_functions import *
from utils_functions import linearize_system, discretize_system, steady_state, find_equilibrium
from utils_Modified_FourTank_functions import Modified_FourTankSystem, sim22
from utils_Models import  OpenLoop_ModelB
from parameters import p as params

In [3]:
def Modified_FourTankSystem_casadi(x, u, d, p):
    """
    CasADi implementation of the modified four-tank ODE:
        xdot = f(x,u,d,p)

    Inputs
    ------
    x : casadi.SX/MX (4,1) or (4,)
        Mass in tanks [g]
    u : casadi.SX/MX (2,1) or (2,)
        Pump flows [cm^3/s] -> [F1, F2]
    d : casadi.SX/MX (2,1) or (2,)
        Disturbance inflows [cm^3/s] -> [F3, F4]
    p : casadi.SX/MX/DM (12,1) or (12,)
        Parameter vector:
            p[0:4]   a     outlet pipe areas [cm^2]
            p[4:8]   A     tank cross sectional areas [cm^2]
            p[8:10]  gamma valve split [-]
            p[10]    g     gravity [cm/s^2]
            p[11]    rho   density [g/cm^3]

    Returns
    -------
    xdot : casadi.SX/MX (4,1)
        Time derivative of mass [g/s]
    """

    # Ensure column vectors
    x = ca.vertcat(*list(x)) if (hasattr(x, "shape") and len(x.shape) == 1) else x
    u = ca.vertcat(*list(u)) if (hasattr(u, "shape") and len(u.shape) == 1) else u
    d = ca.vertcat(*list(d)) if (hasattr(d, "shape") and len(d.shape) == 1) else d
    p = ca.vertcat(*list(p)) if (hasattr(p, "shape") and len(p.shape) == 1) else p

    # Unpack
    m = x  # (4,1)
    F1, F2 = u[0], u[1]
    F3, F4 = d[0], d[1]

    a = p[0:4]       # (4,1)
    A = p[4:8]       # (4,1)
    gamma = p[8:10]  # (2,1)
    g = p[10]
    rho = p[11]

    # Inflows (cm^3/s)
    qin1 = gamma[0] * F1
    qin2 = gamma[1] * F2
    qin3 = (1 - gamma[1]) * F2
    qin4 = (1 - gamma[0]) * F1

    # Levels (cm)
    # NOTE: if m can ever go slightly negative (numerics), clamp to avoid sqrt issues.
    h = m / (rho * A)
    h = ca.fmax(h, 0)  # elementwise max

    # Outflows (cm^3/s): a_i * sqrt(2 g h_i)
    qout = ca.dot(a, ca.sqrt(2 * g * h))  # elementwise

    # Mass balances (g/s)
    xdot1 = rho * (qin1 + qout[2] - qout[0])
    xdot2 = rho * (qin2 + qout[3] - qout[1])
    xdot3 = rho * (qin3 + F3     - qout[2])
    xdot4 = rho * (qin4 + F4     - qout[3])

    return ca.vertcat(xdot1, xdot2, xdot3, xdot4)


# Optional: build a CasADi Function object you can call fast in optimizations/integrators
def make_Modified_FourTankSystem_fun():
    x = ca.MX.sym("x", 4, 1)
    u = ca.MX.sym("u", 2, 1)
    d = ca.MX.sym("d", 2, 1)
    p = ca.MX.sym("p", 12, 1)
    xdot = Modified_FourTankSystem_casadi(x, u, d, p)
    return ca.Function("f_four_tank", [x, u, d, p], [xdot], ["x", "u", "d", "p"], ["xdot"])



In [4]:
x = ca.MX.sym("x", 4, 1)
u = ca.MX.sym("u", 2, 1)
d = ca.MX.sym("d", 2, 1)
p = ca.MX.sym("p", 12, 1)

F = ca.Function('f', [x, u, d, p], [x, ca.sin(x), ca.cos(x), 2*x])

In [5]:
x = ca.SX.sym('x'); y = ca.SX.sym('y'); z = ca.SX.sym('z')
nlp = {'x':ca.vertcat(x,y,z), 'f':x**2+100*z**2, 'g':z+(1-x)**2-y}
S = ca.nlpsol('S', 'ipopt', nlp)
print(S)

S:(x0[3],p[],lbx[3],ubx[3],lbg,ubg,lam_x0[3],lam_g0)->(x[3],f,g,lam_x[3],lam_g,lam_p[]) IpoptInterface


In [6]:
def make_Modified_FourTankSystem_fun():
    # Ensure column vectors
    x = ca.MX.sym('x', 4,1)
    u = ca.MX.sym('u', 2,1)
    d = ca.MX.sym('d', 2,1)
    p = ca.MX.sym('p', 16,1)

    # Unpack
    m = x  # (4,1)
    F1, F2 = u[0], u[1]
    F3, F4 = d[0], d[1]

    a = p[0:4]       # (4,1)
    A = p[4:8]       # (4,1)
    gamma = p[8:10]  # (2,1)
    g = p[10]
    rho = p[11]
    lambdF3 = p[12]
    lambdF4 = p[13]
    meanF3 = p[14]
    meanF4 = p[15]

    # Inflows (cm^3/s)
    qin1 = gamma[0] * F1
    qin2 = gamma[1] * F2
    qin3 = (1 - gamma[1]) * F2
    qin4 = (1 - gamma[0]) * F1

    # Levels (cm)
    # NOTE: if m can ever go slightly negative (numerics), clamp to avoid sqrt issues.
    h = m / (rho * A)
    h = ca.fmax(h, 0)  # elementwise max

    # Outflows (cm^3/s): a_i * sqrt(2 g h_i)
    qout = a* ca.sqrt(2 * g * h)  # elementwise

    # Mass balances (g/s)
    xdot1 = rho * (qin1 + qout[2] - qout[0])
    xdot2 = rho * (qin2 + qout[3] - qout[1])
    xdot3 = rho * (qin3 + F3     - qout[2])
    xdot4 = rho * (qin4 + F4     - qout[3])
    xdot5 = lambdF3*(meanF3-F3)
    xdot6 = lambdF4*(meanF4-F4)
    xdot = ca.vertcat(xdot1, xdot2, xdot3, xdot4, xdot5, xdot6)
    
    sigmasystem = ca.DM.zeros(6,2)
    sigmasystem[4,0] = 1
    sigmasystem[5,1] = 1 
    
    
    drift = ca.Function("f_four_tank", [x, u, d, p], [xdot], ["x", "u", "d", "p"], ["xdot"])
    diffusion = ca.Function("g_four_tank", [x, u, d, p], [sigmasystem], ["x", "u", "d", "p"], ["sigmadot"])
    return drift, diffusion

In [7]:
def make_Modified_FourTankSystem_aug_fun():
    """
    Augmented state:
      X = [m1,m2,m3,m4,F3,F4]  (6x1)
    Input:
      u = [F1,F2]              (2x1)

    Parameters p (16x1):
      p[0:4]   a
      p[4:8]   A
      p[8:10]  gamma
      p[10]    g
      p[11]    rho
      p[12]    lambda_F3
      p[13]    lambda_F4
      p[14]    mean_F3
      p[15]    mean_F4
    """
    X = ca.MX.sym('X', 6, 1)   # <-- augmented state
    u = ca.MX.sym('u', 2, 1)
    p = ca.MX.sym('p', 16, 1)

    # Split augmented state
    m = X[0:4]      # masses
    F3 = X[4]
    F4 = X[5]

    F1, F2 = u[0], u[1]

    # Parameters
    a = p[0:4]
    A = p[4:8]
    gamma = p[8:10]
    g = p[10]
    rho = p[11]
    lambdF3 = p[12]
    lambdF4 = p[13]
    meanF3 = p[14]
    meanF4 = p[15]

    # Inflows (cm^3/s)
    qin1 = gamma[0] * F1
    qin2 = gamma[1] * F2
    qin3 = (1 - gamma[1]) * F2
    qin4 = (1 - gamma[0]) * F1

    # Levels (cm): h = m/(rho*A)
    h = m / (rho * A)
    h = ca.fmax(h, 0)  # avoid sqrt of negative due to numerics

    # Outflows (cm^3/s)
    qout = a * ca.sqrt(2 * g * h)  # elementwise

    # Mass balances (g/s)
    xdot1 = rho * (qin1 + qout[2] - qout[0])
    xdot2 = rho * (qin2 + qout[3] - qout[1])
    xdot3 = rho * (qin3 + F3     - qout[2])
    xdot4 = rho * (qin4 + F4     - qout[3])

    # Disturbance models (OU / mean-reverting)
    xdot5 = lambdF3 * (meanF3 - F3)
    xdot6 = lambdF4 * (meanF4 - F4)

    f = ca.vertcat(xdot1, xdot2, xdot3, xdot4, xdot5, xdot6)

    # Diffusion: noise only on disturbance states (F3,F4)
    # Here G is 6x2; scaling by sigma happens outside or via p if you prefer.
    G = ca.DM.zeros(6, 2)
    G[4, 0] = 1.0
    G[5, 1] = 1.0

    drift = ca.Function("f_four_tank_aug", [X, u, p], [f], ["X", "u", "p"], ["f"])
    diffusion = ca.Function("G_four_tank_aug", [X, u, p], [G], ["X", "u", "p"], ["G"])
    return drift, diffusion


In [8]:
params_alt =np.concatenate([params[:-2], np.array([0, 0, 100,100])])

In [9]:
f, g = make_Modified_FourTankSystem_fun()

# numeric values (use DM or numpy)
x_val = ca.DM([1000, 1000, 1000, 1000])              # masses
u_val = ca.DM([300, 300])                # pumps
d_val = ca.DM([100, 100])                # disturbances
p_val = ca.DM(params_alt)  # length 12

xdot_val = f(x=x_val, u=u_val, d=d_val, p=p_val)["xdot"]
sigmadot_val = g(x=x_val, u=u_val, d=d_val, p=p_val)["sigmadot"]
print("xdot =", np.array(xdot_val).squeeze())
print("sigmadot =", np.array(sigmadot_val).squeeze())

RuntimeError: Error in Function::call for 'f_four_tank' [MXFunction] at .../casadi/core/function.cpp:1466:
Error in Function::call for 'f_four_tank' [MXFunction] at .../casadi/core/function.cpp:362:
.../casadi/core/function_internal.hpp:1728: Input 3 (p) has mismatching shape. Got 14-by-1. Allowed dimensions, in general, are:
 - The input dimension N-by-M (here 16-by-1)
 - A scalar, i.e. 1-by-1
 - M-by-N if N=1 or M=1 (i.e. a transposed vector)
 - N-by-M1 if K*M1=M for some K (argument repeated horizontally)
 - N-by-P*M, indicating evaluation with multiple arguments (P must be a multiple of 1 for consistency with previous inputs)

In [None]:
def make_em_step(drift_fun, diffusion_fun, nx=6, nu=2, np_=16, nw=2):
    X  = ca.MX.sym("X", nx, 1)
    u  = ca.MX.sym("u", nu, 1)
    p  = ca.MX.sym("p", np_, 1)
    dt = ca.MX.sym("dt", 1, 1)
    dW = ca.MX.sym("dW", nw, 1)  # Brownian increment ~ N(0, I*dt)

    f = drift_fun(X, u, p)       # (nx,1)
    G = diffusion_fun(X, u, p)   # (nx,nw)

    X_next = X + f * dt + G @ dW

    return ca.Function(
        "em_step",
        [X, u, p, dt, dW],
        [X_next],
        ["X", "u", "p", "dt", "dW"],
        ["X_next"],
    )


In [None]:
drift, diffusion = make_Modified_FourTankSystem_aug_fun()
em_step = make_em_step(drift, diffusion, nx=6, nu=2, np_=16, nw=2)

def sample_dW(nw, dt):
    return np.sqrt(dt) * np.random.randn(nw, 1)

# Example initial condition: masses + initial disturbances
Xk = np.array([[16488.13700152], [25133.33227088], [4355.5317119], [6570.86299967], [100.0], [100.0]])  # 6x1
dt = 0.1
TN = 1800
N = int(TN / dt)
ts = np.linspace(0, TN, N+1)

p = params_alt
# ... set p[0:4]=a, p[4:8]=A, p[8:10]=gamma, p[10]=g, p[11]=rho, etc.
states = []
for k in range(N+1):
    u_k = u[:,k]  # (2x1) example
    dWk = sample_dW(2, dt)
    Xk = np.array(em_step(Xk, u_k, p, dt, dWk)).reshape(6, 1)
    states.append(Xk)
states = np.hstack(states)  # (6, N)

RuntimeError: Error in Function::call for 'em_step' [MXFunction] at .../casadi/core/function.cpp:380:
.../casadi/core/function_internal.hpp:1728: Input 2 (p) has mismatching shape. Got 14-by-1. Allowed dimensions, in general, are:
 - The input dimension N-by-M (here 16-by-1)
 - A scalar, i.e. 1-by-1
 - M-by-N if N=1 or M=1 (i.e. a transposed vector)
 - N-by-M1 if K*M1=M for some K (argument repeated horizontally)
 - N-by-P*M, indicating evaluation with multiple arguments (P must be a multiple of 1 for consistency with previous inputs)

In [None]:
plt.plot(ts,states.T, label='F3')

AttributeError: 'list' object has no attribute 'T'

In [None]:
def plot_four_tank_simulation(
    t,
    states,
    U,
    p,
    title_prefix=""
):
    """
    Plot inputs u, disturbances d, masses x, and heights y.

    Parameters
    ----------
    t : array_like, shape (N,)
        Time vector
    states : ndarray, shape (6, N)
        States [m1,m2,m3,m4,F3,F4]
    U : ndarray, shape (2, N)
        Inputs [F1,F2]
    p : ndarray, shape (16,1) or (16,)
        Parameter vector
    title_prefix : str
        Optional prefix for plot titles
    """

    # --- unpack ---
    m = states[0:4, :]      # masses
    d = states[4:6, :]      # disturbances F3,F4
    rho = float(p[11])
    A = np.array(p[4:8]).reshape(4, 1)

    # Avoid numerical issues
    m = np.maximum(m, 0.0)

    # Heights
    h = m / (rho * A)

    # ---------------- Inputs ----------------
    fig, axs = plt.subplots(2, 1, figsize=(9, 6), sharex=True)
    axs[0].plot(t, U[0], label=r"$F_1$")
    axs[1].plot(t, U[1], label=r"$F_2$", color="tab:orange")

    axs[0].set_ylabel("Flow [cm$^3$/s]")
    axs[1].set_ylabel("Flow [cm$^3$/s]")
    axs[1].set_xlabel("Time [s]")

    axs[0].legend()
    axs[1].legend()
    fig.suptitle(f"{title_prefix} Inputs")
    plt.tight_layout()
    plt.show()

    # ---------------- Disturbances ----------------
    fig, axs = plt.subplots(2, 1, figsize=(9, 6), sharex=True)
    axs[0].plot(t, d[0], label=r"$F_3$")
    axs[1].plot(t, d[1], label=r"$F_4$", color="tab:orange")

    axs[0].set_ylabel("Flow [cm$^3$/s]")
    axs[1].set_ylabel("Flow [cm$^3$/s]")
    axs[1].set_xlabel("Time [s]")

    axs[0].legend()
    axs[1].legend()
    fig.suptitle(f"{title_prefix} Disturbances")
    plt.tight_layout()
    plt.show()

    # ---------------- Masses ----------------
    fig, axs = plt.subplots(4, 1, figsize=(9, 9), sharex=True)
    for i in range(4):
        axs[i].plot(t, m[i], label=rf"$m_{i+1}$")
        axs[i].set_ylabel("Mass [g]")
        axs[i].legend()

    axs[-1].set_xlabel("Time [s]")
    fig.suptitle(f"{title_prefix} Tank Masses")
    plt.tight_layout()
    plt.show()

    # ---------------- Heights ----------------
    fig, axs = plt.subplots(2, 1, figsize=(9, 9), sharex=True)
    for i in range(2):
        axs[i].plot(t, h[i], label=rf"$h_{i+1}$")
        axs[i].set_ylabel("Height [cm]")
        axs[i].legend()

    axs[-1].set_xlabel("Time [s]")
    fig.suptitle(f"{title_prefix} Tank Heights")
    plt.tight_layout()
    plt.show()

plot_four_tank_simulation(ts, states, U=np.array(u), p=params_alt, title_prefix="Stochastic Simulation")

Exception: Implicit conversion of symbolic CasADi type to numeric matrix not supported.
This may occur when you pass a CasADi object to a numpy function.
Use an equivalent CasADi function instead of that numpy function.

In [None]:
def make_linearization_funs(drift_fun, nx=6, nu=2, np_=16):
    """
    drift_fun: CasADi Function f(X,u,p)-> (nx,1)
    returns:
      A_fun(X,u,p) -> Ac (nx,nx)
      B_fun(X,u,p) -> Bc (nx,nu)
      f_fun(X,u,p) -> f  (nx,1)
    """
    X = ca.MX.sym("X", nx, 1)
    u = ca.MX.sym("u", nu, 1)
    p = ca.MX.sym("p", np_, 1)

    f = drift_fun(X, u, p)                    # (nx,1)
    Ac = ca.jacobian(f, X)                    # (nx,nx)
    Bc = ca.jacobian(f, u)                    # (nx,nu)

    A_fun = ca.Function("A_lin", [X, u, p], [Ac])
    B_fun = ca.Function("B_lin", [X, u, p], [Bc])
    f_fun = ca.Function("f_eval", [X, u, p], [f])

    return A_fun, B_fun, f_fun

In [None]:
make_linearization_funs(drift, nx=6, nu=2, np_=16)

(Function(A_lin:(i0[6],i1[2],i2[16])->(o0[6x6,10nz]) MXFunction),
 Function(B_lin:(i0[6],i1[2],i2[16])->(o0[6x2,4nz]) MXFunction),
 Function(f_eval:(i0[6],i1[2],i2[16])->(o0[6]) MXFunction))

In [None]:
def c2d_zoh(Ac, Bc, dt):
    """
    Exact ZOH discretization using matrix exponential.
    """
    Ac = np.asarray(Ac, dtype=float)
    Bc = np.asarray(Bc, dtype=float)

    nx = Ac.shape[0]
    nu = Bc.shape[1]

    M = np.zeros((nx + nu, nx + nu))
    M[:nx, :nx] = Ac
    M[:nx, nx:] = Bc

    Md = expm(M * dt)

    Ad = Md[:nx, :nx]
    Bd = Md[:nx, nx:]

    return Ad, Bd

In [None]:
import casadi as ca

# Symbols
x = ca.MX.sym("x", 3, 1)
u = ca.MX.sym("u", 1, 1)

# Dynamics (MUST be a CasADi vector)
f = ca.vertcat(
    (1 - x[1]**2)*x[0] - x[1] + u,
    x[0],
    x[0]**2 + x[1]**2 + u**2
)

# Terminal cost
E = x[2]

# Bounds
xf_min = [0, 0, -ca.inf]
xf_max = [0, 0,  ca.inf]
u_min  = -0.75
u_max  =  1.0

# CasADi functions (modern API)
ffcn = ca.Function("ffcn", [x, u], [f])
Efcn = ca.Function("Efcn", [x, u], [E])

# Test evaluation
print(ffcn([0,1,0], [0.2]))
print(Efcn([0,1,0], [0.2]))


N = 20; U = ca.MX.sym("U", N, 1)

[-0.8, 0, 1.04]
0
