# Demo of Using A Forecast with MPC

In [None]:
import casadi as cas
import do_mpc
import numpy as np

In [None]:
# Construct mixing tank model
model_type = "continuous"
model = do_mpc.model.Model(model_type)

# Level (m)
L = model.set_variable(var_type="_x", var_name="L", shape=(1, 1))

# Mass of mineral (tons)
m = model.set_variable(var_type="_x", var_name="m", shape=(1, 1))

# Augmented model state for volumetric outflow (m^3/h)
F_out = model.set_variable(var_type="_x", var_name="F_out", shape=(1, 1))

# Volumetric flow rates in and out of tank (m^3/h)
F_in_1 = model.set_variable(var_type="_u", var_name="F_in_1", shape=(1, 1))
F_in_2 = model.set_variable(var_type="_u", var_name="F_in_2", shape=(1, 1))

# Concentration of mineral in feeds (tons/m^3)
C_in_1 = 0.5
C_in_2 = 1.5

# Cross-sectional area (m^2)
A = 7.0

# Differential equations (ODE right-hand side)
dL_dt = (F_in_1 + F_in_2 - F_out) / A
dm_dt = F_in_1 * C_in_1 + F_in_2 * C_in_2 - F_out * m / (L * A)
model.set_rhs("L", dL_dt)
model.set_rhs("m", dm_dt)

# Augmented state: volumetric outflow rate to be estimated
model.set_rhs("F_out", cas.DM(0))  # process noise added by estimator

# Outputs
C_out = m / (L * A)
model.set_expression("C_out", C_out)

model.setup()

In [None]:
mpc = do_mpc.controller.MPC(model)

setup_mpc = {
    "n_horizon": 20,
    "t_step": 1.0,
    "n_robust": 1,
    "store_full_solution": True,
}
mpc.set_param(**setup_mpc)

# Constraints
mpc.bounds["lower", "_x", "L"] = 0.1
mpc.bounds["upper", "_x", "L"] = 4.0
mpc.bounds["lower", "_u", "F_in_1"] = 0.0
mpc.bounds["upper", "_u", "F_in_1"] = 10.0
mpc.bounds["lower", "_u", "F_in_2"] = 0.0
mpc.bounds["upper", "_u", "F_in_2"] = 10.0

# Objective: keep output concentration at setpoint
# Reference states and expressions through mpc.model, not the original variables
C_out_sp = 0.5
L_sp = 2.0
lterm = (C_out_sp - mpc.model.aux["C_out"]) ** 2 + 0.1 * (
    L_sp - mpc.model.x["L"]
) ** 2
mterm = cas.DM(0.0)  # Terminal cost (not used)
mpc.set_objective(mterm=mterm, lterm=lterm)

# MV weights
mpc.set_rterm(F_in_1=0.1, F_in_2=0.1)

mpc.setup()

In [None]:
# Create simulator
simulator = do_mpc.simulator.Simulator(model)

simulator.set_param(t_step=1.0)

# Define the time-varying parameter function for F_out (random walk)
# This will be called at each simulation step
tvp_template = simulator.get_tvp_template()


def tvp_fun_sim(t_now):
    return tvp_template


simulator.set_tvp_fun(tvp_fun_sim)

simulator.setup()

In [None]:
# Create estimator (EKF) to estimate F_out
estimator = do_mpc.estimator.EKF(model)

estimator.settings.t_step = 1.0

# Process noise covariance (for state dynamics)
# F_out is the unmeasured disturbance with process noise
P_v = np.diag([0.0, 0.0, 0.01])  # Only F_out has process noise (random walk)

# Measurement noise covariance (for state measurements)
# L and m are measured states, F_out is not measured
P_w = np.diag(
    [0.01, 0.01, 1e6]
)  # Large value for F_out means it's not measured

estimator.P_v = P_v
estimator.P_w = P_w

# Initial state covariance
P_x = np.eye(3) * 0.1
estimator.P_x = P_x

estimator.setup()

In [None]:
# Set initial state
x0 = np.array([[2.0], [7.0], [5.0]])  # L=2.0 m, m=7.0 tons, F_out=5.0 m^3/h

# Set initial state for all components
mpc.x0 = x0
simulator.x0 = x0
estimator.x0 = x0

# Initialize MPC and estimator
mpc.set_initial_guess()
estimator.set_initial_guess()

In [None]:
# Simulation parameters
n_steps = 100
np.random.seed(42)  # For reproducibility

# Random walk parameters for F_out
F_out_true = 5.0  # Initial value
random_walk_std = 0.1  # Standard deviation of random walk increments

# Run simulation
for k in range(n_steps):
    # Get control input from MPC
    u0 = mpc.make_step(estimator.x0)

    # Update F_out with random walk (this is the true disturbance)
    F_out_true += np.random.randn() * random_walk_std
    # Keep F_out within reasonable bounds
    F_out_true = np.clip(F_out_true, 2.0, 10.0)

    # Manually set the true F_out state in the simulator
    # We need to do this before calling simulator.make_step
    simulator.x0["F_out"] = F_out_true

    # Simulate the system (with true F_out)
    y_next = simulator.make_step(u0)

    # Get measurement (L and m are measured, but not F_out)
    # In practice, we would measure outputs, but here we can measure states directly
    y_meas = y_next.copy()

    # Estimate states (including F_out) using EKF
    # Pass measurement, control input, process noise covariance, and measurement noise covariance
    x_est = estimator.make_step(y_meas, u0, P_v, P_w)

print(f"Simulation completed: {n_steps} steps")

In [None]:
import matplotlib.pyplot as plt
from do_mpc.graphics import Graphics, default_plot

# Create graphics object
graphics = Graphics(simulator.data)

In [None]:
# Create figure for states
fig, ax = plt.subplots(3, 1, figsize=(7, 5.5), sharex=True)

# Plot Level (L)
ax[0].plot(
    simulator.data["_time"],
    simulator.data["_x", "L"],
    label="L (simulator)",
    linewidth=2,
)
ax[0].plot(
    estimator.data["_time"],
    estimator.data["_x", "L"],
    "--",
    label="L (estimator)",
    linewidth=2,
)
ax[0].axhline(y=L_sp, color="r", linestyle=":", label="L setpoint")
ax[0].set_ylabel("Level (m)")
ax[0].legend()
ax[0].grid(True)

# Plot mass (m)
ax[1].plot(
    simulator.data["_time"],
    simulator.data["_x", "m"],
    label="m (simulator)",
    linewidth=2,
)
ax[1].plot(
    estimator.data["_time"],
    estimator.data["_x", "m"],
    "--",
    label="m (estimator)",
    linewidth=2,
)
ax[1].set_ylabel("Mass (tons)")
ax[1].legend()
ax[1].grid(True)

# Plot F_out (true vs estimated)
ax[2].plot(
    simulator.data["_time"],
    simulator.data["_x", "F_out"],
    label="F_out (true)",
    linewidth=2,
)
ax[2].plot(
    estimator.data["_time"],
    estimator.data["_x", "F_out"],
    "--",
    label="F_out (estimated)",
    linewidth=2,
)
ax[2].set_ylabel("Outflow (m続/h)")
ax[2].set_xlabel("Time (h)")
ax[2].legend()
ax[2].grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Create figure for inputs
fig, ax = plt.subplots(2, 1, figsize=(7, 4), sharex=True)

# Plot F_in_1
ax[0].step(
    mpc.data["_time"],
    mpc.data["_u", "F_in_1"],
    where="post",
    label="F_in_1",
    linewidth=2,
)
ax[0].set_ylabel("F_in_1 (m続/h)")
ax[0].legend()
ax[0].grid(True)

# Plot F_in_2
ax[1].step(
    mpc.data["_time"],
    mpc.data["_u", "F_in_2"],
    where="post",
    label="F_in_2",
    linewidth=2,
)
ax[1].set_ylabel("F_in_2 (m続/h)")
ax[1].set_xlabel("Time (h)")
ax[1].legend()
ax[1].grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Create figure for output concentration
fig, ax = plt.subplots(1, 1, figsize=(7, 2.5))

# Calculate C_out from estimated states (since EKF doesn't store aux expressions)
# C_out = m / (L * A)
C_out_estimated = estimator.data["_x", "m"] / (estimator.data["_x", "L"] * A)

# Plot C_out
ax.plot(
    simulator.data["_time"],
    simulator.data["_aux", "C_out"],
    label="C_out (simulator)",
    linewidth=2,
)
ax.plot(
    estimator.data["_time"],
    C_out_estimated,
    "--",
    label="C_out (estimator)",
    linewidth=2,
)
ax.axhline(
    y=C_out_sp, color="r", linestyle=":", label="C_out setpoint", linewidth=2
)
ax.set_ylabel("Concentration (tons/m続)")
ax.set_xlabel("Time (h)")
ax.legend()
ax.grid(True)
ax.set_title("Output Concentration")

plt.tight_layout()
plt.show()

In [None]:
# Use do-mpc's default_plot for standard visualization
fig, ax, graphics = default_plot(mpc.data, figsize=(7, 10))
plt.tight_layout()
plt.show()

In [None]:
# Plot simulator data
fig, ax, graphics = default_plot(simulator.data, figsize=(7, 10))
plt.tight_layout()
plt.show()

In [None]:
# Plot estimator data
fig, ax, graphics = default_plot(estimator.data, aux_list=[], figsize=(7, 8.5))
plt.tight_layout()
plt.show()