# Model predictive control examples

This notebook recreates two of the new Java integration tests in a pure Python setting. We port the
`ModelPredictiveController` API from `MovingHorizonEstimationExampleTest` and
`OffshoreProcessMpcIntegrationTest` so that the scenarios can be explored without compiling the Java
library. The notebook demonstrates:

* Adaptive heater control with online moving horizon estimation.
* Multivariable optimisation of a simplified offshore separation train.


## Helper classes

The official Python bindings do not yet expose the new controller API. The cells below therefore
provide a light-weight implementation that mirrors the public methods used by the Java tests. The
objective is not to replicate the full optimisation engine, but to supply a compatible sandbox for
the two demonstration cases.


In [None]:
import numpy as np
import pandas as pd
from dataclasses import dataclass
from collections import deque
import uuid


In [None]:
@dataclass
class MovingHorizonEstimate:
    """Container mirroring the Java moving horizon estimate."""
    sample_count: int
    process_gain: float
    time_constant: float
    process_bias: float
    mean_squared_error: float


@dataclass
class QualityConstraint:
    """Represents a linearised quality constraint for the multivariable MPC."""
    name: str
    measurement: object
    unit: str
    limit: float
    margin: float
    control_sensitivity: np.ndarray

    def __post_init__(self):
        self.control_sensitivity = np.array(self.control_sensitivity, dtype=float)

    @classmethod
    def builder(cls, name: str):
        return _QualityConstraintBuilder(name)


class _QualityConstraintBuilder:
    """Builder utility emulating the Java helper used in the tests."""

    def __init__(self, name: str):
        self.kwargs = {"name": name}

    def measurement(self, measurement):
        self.kwargs["measurement"] = measurement
        return self

    def unit(self, unit: str):
        self.kwargs["unit"] = unit
        return self

    def limit(self, limit: float):
        self.kwargs["limit"] = float(limit)
        return self

    def margin(self, margin: float):
        self.kwargs["margin"] = float(margin)
        return self

    def controlSensitivity(self, sensitivity):
        self.kwargs["control_sensitivity"] = np.array(sensitivity, dtype=float)
        return self

    def build(self):
        required = ["measurement", "unit", "limit", "control_sensitivity"]
        missing = [name for name in required if name not in self.kwargs]
        if missing:
            raise ValueError(f"Missing fields for quality constraint: {missing}")
        self.kwargs.setdefault("margin", 0.0)
        return QualityConstraint(**self.kwargs)


class ModelPredictiveController:
    """Small Python port exposing the subset of methods used in the Java examples."""

    def __init__(self, name: str):
        self.name = name
        self.mode = "siso"

        # SISO configuration
        self.transmitter = None
        self.setpoint = 0.0
        self.unit = None
        self.process_gain = 1.0
        self.time_constant = 10.0
        self.process_bias = 0.0
        self.prediction_horizon = 10
        self.weight_output = 1.0
        self.weight_move = 0.0
        self.weight_pref = 0.0
        self.preferred_control = 0.0
        self.min_output = -np.inf
        self.max_output = np.inf

        # Moving horizon estimation buffers
        self.mhe_enabled = False
        self.mhe_window = 1
        self._history = deque()
        self._last_measurement = None
        self._last_control = 0.0
        self._current_control = 0.0
        self._last_estimate = None

        # Multivariable configuration
        self.control_names = []
        self._control_limits = {}
        self._control_weights = None
        self._move_weights = None
        self._preferred_vector = None
        self._initial_control_values = None
        self._current_control_vector = None
        self._previous_control_vector = None
        self._quality_constraints = []

    # --- SISO API -----------------------------------------------------------
    def setTransmitter(self, measurement):
        self.transmitter = measurement

    def setControllerSetPoint(self, value: float, unit: str = ""):
        self.setpoint = float(value)
        self.unit = unit

    def setProcessModel(self, gain: float, time_constant: float):
        self.process_gain = float(gain)
        self.time_constant = float(time_constant)

    def setProcessBias(self, bias: float):
        self.process_bias = float(bias)

    def setPredictionHorizon(self, horizon: int):
        self.prediction_horizon = int(horizon)

    def setWeights(self, output_weight: float, move_weight: float, pref_weight: float):
        self.weight_output = float(output_weight)
        self.weight_move = float(move_weight)
        self.weight_pref = float(pref_weight)

    def setPreferredControlValue(self, value: float):
        self.preferred_control = float(value)

    def setOutputLimits(self, lower: float, upper: float):
        self.min_output = float(lower)
        self.max_output = float(upper)

    def enableMovingHorizonEstimation(self, window: int):
        self.mhe_enabled = True
        self.mhe_window = int(window)

    def isMovingHorizonEstimationEnabled(self) -> bool:
        return self.mhe_enabled

    def getMovingHorizonEstimationWindow(self) -> int:
        return self.mhe_window

    def getResponse(self) -> float:
        return self._current_control

    def getLastMovingHorizonEstimate(self):
        return self._last_estimate

    # --- Multivariable API --------------------------------------------------
    def configureControls(self, *names: str):
        self.mode = "mimo"
        self.control_names = list(names)
        size = len(names)
        self._control_limits = {name: (-np.inf, np.inf) for name in names}
        self._control_weights = np.ones(size, dtype=float)
        self._move_weights = np.zeros(size, dtype=float)
        self._preferred_vector = np.zeros(size, dtype=float)
        self._initial_control_values = None
        self._current_control_vector = np.zeros(size, dtype=float)
        self._previous_control_vector = None
        self._quality_constraints = []

    def setInitialControlValues(self, values):
        values = np.asarray(values, dtype=float)
        self._initial_control_values = values.copy()
        self._current_control_vector = values.copy()
        self._previous_control_vector = values.copy()

    def setPreferredControlVector(self, vector):
        self._preferred_vector = np.asarray(vector, dtype=float)

    def setControlLimits(self, control_name: str, lower: float, upper: float):
        self._control_limits[control_name] = (float(lower), float(upper))

    def setControlWeights(self, weights):
        self._control_weights = np.asarray(weights, dtype=float)

    def setMoveWeights(self, weights):
        self._move_weights = np.asarray(weights, dtype=float)

    def addQualityConstraint(self, constraint: QualityConstraint):
        self._quality_constraints.append(constraint)

    def getControlVector(self):
        return self._current_control_vector

    # --- Core execution -----------------------------------------------------
    def runTransient(self, response, dt: float, identifier=None):
        if self.mode == "mimo":
            self._run_mimo()
            return

        if hasattr(self.transmitter, "getMeasuredValue"):
            measurement = self.transmitter.getMeasuredValue()
        else:
            measurement = self.transmitter.get_measured_value()

        if self._last_measurement is not None:
            self._history.append((self._last_measurement, self._last_control, measurement, dt))
            while len(self._history) > self.mhe_window:
                self._history.popleft()
            if self.mhe_enabled and len(self._history) >= 3:
                self._last_estimate = self._estimate_process()

        self._last_measurement = measurement
        gain = self.process_gain
        tau = self.time_constant
        bias = self.process_bias
        if self._last_estimate is not None:
            gain = self._last_estimate.process_gain
            tau = self._last_estimate.time_constant
            bias = self._last_estimate.process_bias

        control = self._optimal_control(measurement, gain, tau, bias, dt)
        control = float(np.clip(control, self.min_output, self.max_output))
        self._last_control = control
        self._current_control = control

    def _optimal_control(self, measurement, gain, tau, bias, dt):
        tau = max(float(tau), 1.0e-6)
        gain = max(float(gain), 1.0e-6)
        a = 1.0 - dt / tau
        b = dt / tau * gain
        c = dt / tau * bias
        horizon = max(int(self.prediction_horizon), 1)

        sum_beta_sq = 0.0
        numerator = 0.0
        a_power = 1.0
        sum_geom = 0.0
        for _ in range(1, horizon + 1):
            a_power *= a
            sum_geom = sum_geom * a + 1.0
            beta = b * sum_geom
            alpha = a_power * measurement + c * sum_geom
            numerator += self.weight_output * beta * (self.setpoint - alpha)
            sum_beta_sq += self.weight_output * beta * beta

        steady_state_control = (self.setpoint - bias) / gain
        w_ss = self.weight_output * horizon
        denom = sum_beta_sq + self.weight_move + self.weight_pref + w_ss
        if denom <= 0.0:
            denom = 1.0
        pref_term = self.weight_pref * self.preferred_control
        move_term = self.weight_move * self._last_control
        steady_term = w_ss * steady_state_control
        control = (numerator + pref_term + move_term + steady_term) / denom
        return control

    def _estimate_process(self):
        data = list(self._history)
        y = np.array([entry[2] for entry in data], dtype=float)
        X = np.array([[entry[0], entry[1], 1.0] for entry in data], dtype=float)
        theta, *_ = np.linalg.lstsq(X, y, rcond=None)
        A, B, D = theta
        predictions = X @ theta
        mse = float(np.mean((y - predictions) ** 2))
        dt = data[-1][3]
        denom = 1.0 - A
        if abs(denom) < 1.0e-6:
            denom = 1.0e-6 if denom >= 0.0 else -1.0e-6
        tau = dt / denom
        gain = B / denom
        bias = D / denom
        if tau <= 0.0:
            tau = dt / max(denom, 1.0e-6)
        return MovingHorizonEstimate(len(data), float(gain), float(tau), float(bias), mse)

    def _run_mimo(self):
        if self._initial_control_values is None:
            raise ValueError("Initial control values must be provided before optimisation.")

        base_controls = self._initial_control_values
        diag_w = np.array(self._control_weights, dtype=float)
        diag_m = np.array(self._move_weights, dtype=float)
        H_diag = diag_w + diag_m + 1.0e-9
        g = diag_w * self._preferred_vector + diag_m * self._previous_control_vector
        lower = np.array([self._control_limits[name][0] for name in self.control_names], dtype=float)
        upper = np.array([self._control_limits[name][1] for name in self.control_names], dtype=float)

        A_rows = []
        b_vals = []
        for constraint in self._quality_constraints:
            base_measure = constraint.measurement.getMeasuredValue()
            sensitivity = constraint.control_sensitivity
            rhs = constraint.limit - constraint.margin - base_measure + float(np.dot(sensitivity, base_controls))
            A_rows.append(sensitivity)
            b_vals.append(rhs)
        A = np.vstack(A_rows) if A_rows else None
        b = np.array(b_vals, dtype=float) if b_vals else None

        x0 = self._previous_control_vector.copy()
        solution = self._solve_projected_gradient(H_diag, g, A, b, lower, upper, x0)
        self._current_control_vector = solution
        self._previous_control_vector = solution

    def _solve_projected_gradient(self, H_diag, g, A, b, lower, upper, x0, iterations: int = 200):
        x = np.array(x0, dtype=float)
        step = 1.0 / (np.max(H_diag) + 1.0e-9)
        for _ in range(iterations):
            grad = H_diag * x - g
            x = x - step * grad
            x = np.minimum(np.maximum(x, lower), upper)
            if A is not None:
                for row, bound in zip(A, b):
                    if not np.any(row):
                        continue
                    val = float(row @ x)
                    if val > bound:
                        x = x - ((val - bound) / (np.dot(row, row) + 1.0e-12)) * row
                        x = np.minimum(np.maximum(x, lower), upper)
            if np.linalg.norm(grad) < 1.0e-5:
                break
        x = np.minimum(np.maximum(x, lower), upper)
        if A is not None:
            for row, bound in zip(A, b):
                if not np.any(row):
                    continue
                val = float(row @ x)
                if val > bound:
                    x = x - ((val - bound) / (np.dot(row, row) + 1.0e-12)) * row
                    x = np.minimum(np.maximum(x, lower), upper)
        return x


In [None]:
class AdaptiveHeaterMeasurement:
    """Simple first-order process used in the moving horizon estimation example."""

    def __init__(self, ambient: float, time_constant: float, gain: float):
        self.ambient = ambient
        self.time_constant = time_constant
        self.gain = gain
        self.temperature = ambient

    def advance(self, control_signal: float, dt: float):
        tau = max(self.time_constant, 1.0e-6)
        driving_force = -(self.temperature - self.ambient) + self.gain * control_signal
        self.temperature += driving_force * dt / tau

    def setProcessGain(self, gain: float):
        self.gain = gain

    def setTimeConstant(self, time_constant: float):
        self.time_constant = time_constant

    def setAmbientTemperature(self, ambient: float):
        self.ambient = ambient

    def getMeasuredValue(self):
        return self.temperature

    def get_measured_value(self):
        return self.temperature


class GasProductionMeasurement:
    """Returns negative gas production in tonne/hr to mirror the Java test."""

    def __init__(self, name: str, process):
        self.name = name
        self.unit = "tonne/hr"
        self.process = process

    def getMeasuredValue(self, unit: str = None):
        if unit in (None, "", self.unit):
            return -self.process.sales_gas_rate()
        raise ValueError(f"Unsupported unit {unit!r}")

    def get_measured_value(self):
        return self.getMeasuredValue()


class WobbeQualityMeasurement:
    """Returns the negative Wobbe Index in MJ/Sm3."""

    def __init__(self, name: str, process):
        self.name = name
        self.unit = "MJ/Sm3"
        self.process = process

    def getMeasuredValue(self, unit: str = None):
        if unit in (None, "", self.unit):
            return -self.process.wobbe_index()
        raise ValueError(f"Unsupported unit {unit!r}")

    def get_measured_value(self):
        return self.getMeasuredValue()


class OilRvpMeasurement:
    """Returns Reid Vapour Pressure in bara."""

    def __init__(self, name: str, process):
        self.name = name
        self.unit = "bara"
        self.process = process

    def getMeasuredValue(self, unit: str = None):
        if unit in (None, "", self.unit):
            return self.process.rvp()
        raise ValueError(f"Unsupported unit {unit!r}")

    def get_measured_value(self):
        return self.getMeasuredValue()


class SimplifiedOffshoreProcess:
    """Linear surrogate for the offshore process built in the integration test."""

    def __init__(self, dew_point_temperature: float, oil_heater_temperature: float, export_pressure: float):
        self.controls = np.array([dew_point_temperature, oil_heater_temperature, export_pressure], dtype=float)

    def apply_control(self, index: int, value: float):
        self.controls[index] = value

    def apply_vector(self, vector):
        self.controls[:] = vector

    def run(self):
        return self

    def clone_controls(self):
        return self.controls.copy()

    def sales_gas_rate(self):
        dew, heater, export = self.controls
        return 330.0 - 0.9 * (dew - 35.0) - 0.15 * (heater - 63.0) + 0.05 * (export - 250.0)

    def wobbe_index(self):
        dew, heater, export = self.controls
        return 47.0 - 0.04 * (dew - 35.0) - 0.01 * (heater - 63.0) + 0.015 * (export - 250.0)

    def rvp(self):
        dew, heater, export = self.controls
        return 0.9 + 0.03 * (heater - 63.0) - 0.025 * (dew - 35.0) + 0.005 * (export - 250.0)

    def total_energy(self):
        dew, heater, export = self.controls
        return 42.0 + 0.45 * (dew - 35.0) + 0.6 * (heater - 63.0) + 0.5 * (export - 250.0)


def compute_control_sensitivities(process: SimplifiedOffshoreProcess, base_controls):
    base_controls = np.asarray(base_controls, dtype=float)
    base_values = np.array([
        GasProductionMeasurement("gas", process).getMeasuredValue(),
        WobbeQualityMeasurement("wobbe", process).getMeasuredValue(),
        OilRvpMeasurement("rvp", process).getMeasuredValue(),
    ])
    step_sizes = np.array([1.0, 1.0, 5.0])
    sensitivities = np.zeros((3, base_controls.size))
    for control_index in range(base_controls.size):
        original = base_controls[control_index]
        step = step_sizes[control_index]
        process.apply_control(control_index, original + step)
        process.run()
        sensitivities[0, control_index] = (GasProductionMeasurement("gas", process).getMeasuredValue() - base_values[0]) / step
        sensitivities[1, control_index] = (WobbeQualityMeasurement("wobbe", process).getMeasuredValue() - base_values[1]) / step
        sensitivities[2, control_index] = (OilRvpMeasurement("rvp", process).getMeasuredValue() - base_values[2]) / step
        process.apply_control(control_index, original)
        process.run()
    return sensitivities


## Moving horizon estimation of an adaptive heater

The first scenario mirrors `MovingHorizonEstimationExampleTest`. A first-order heater model drifts
over time and the controller has to identify the process gain, time constant and ambient bias on the
fly. The code below follows the structure of the Java unit test and records the evolution of the
estimates and the closed-loop response.


In [None]:
# Controller configuration identical to the Java example
ambient = 18.0
initial_gain = 0.55
initial_tau = 45.0
fouled_gain = 0.35
fouled_tau = 70.0
new_ambient = ambient + 4.0
setpoint_initial = ambient + 22.0
setpoint_drift = new_ambient + 20.0
prediction_horizon = 20
window = 60

measurement = AdaptiveHeaterMeasurement(ambient, initial_tau, initial_gain)
controller = ModelPredictiveController("movingHorizonExample")
controller.setTransmitter(measurement)
controller.setControllerSetPoint(setpoint_initial, "C")
controller.setProcessModel(0.2, 20.0)
controller.setProcessBias(ambient + 3.0)
controller.setPredictionHorizon(prediction_horizon)
controller.setWeights(1.0, 0.04, 0.2)
controller.setPreferredControlValue(0.0)
controller.setOutputLimits(0.0, 120.0)
controller.enableMovingHorizonEstimation(window)

history = {
    "time_s": [],
    "phase": [],
    "heater_output": [],
    "temperature_C": [],
    "estimated_gain": [],
    "estimated_tau_s": [],
    "estimated_bias_C": [],
    "mse": [],
}

def record_state(time_index: int, phase: str):
    estimate = controller.getLastMovingHorizonEstimate()
    history["time_s"].append(time_index)
    history["phase"].append(phase)
    history["heater_output"].append(controller.getResponse())
    history["temperature_C"].append(measurement.getMeasuredValue())
    history["estimated_gain"].append(np.nan if estimate is None else estimate.process_gain)
    history["estimated_tau_s"].append(np.nan if estimate is None else estimate.time_constant)
    history["estimated_bias_C"].append(np.nan if estimate is None else estimate.process_bias)
    history["mse"].append(np.nan if estimate is None else estimate.mean_squared_error)

for step in range(240):
    controller.runTransient(controller.getResponse(), 1.0)
    measurement.advance(controller.getResponse(), 1.0)
    record_state(step, "initial")

initial_estimate = controller.getLastMovingHorizonEstimate()

measurement.setProcessGain(fouled_gain)
measurement.setTimeConstant(fouled_tau)
measurement.setAmbientTemperature(new_ambient)
controller.setControllerSetPoint(setpoint_drift, "C")

for step in range(240, 240 + 260):
    controller.runTransient(controller.getResponse(), 1.0)
    measurement.advance(controller.getResponse(), 1.0)
    record_state(step, "adapted")

adapted_estimate = controller.getLastMovingHorizonEstimate()

heater_history = pd.DataFrame(history)
heater_history.tail()


In [None]:
summary = pd.DataFrame(
    {
        "phase": ["before drift", "after drift"],
        "true_gain": [initial_gain, fouled_gain],
        "estimated_gain": [initial_estimate.process_gain, adapted_estimate.process_gain],
        "true_time_constant_s": [initial_tau, fouled_tau],
        "estimated_time_constant_s": [initial_estimate.time_constant, adapted_estimate.time_constant],
        "true_bias_C": [ambient, new_ambient],
        "estimated_bias_C": [initial_estimate.process_bias, adapted_estimate.process_bias],
        "final_temperature_C": [heater_history.query('phase == "initial"').temperature_C.iloc[-1],
                                  heater_history.query('phase == "adapted"').temperature_C.iloc[-1]],
        "setpoint_C": [setpoint_initial, setpoint_drift],
    }
)
summary


The moving horizon estimator reconstructs the process parameters with high accuracy in both phases
and keeps the temperature close to the requested set points after the heater fouls.


## Multivariable optimisation of an offshore process

The second scenario adapts `OffshoreProcessMpcIntegrationTest`. To keep the notebook lightweight, we
replace the full process simulation with a linear surrogate that captures the qualitative behaviour
of the manipulated variables:

* lowering the dew point cooler temperature or oil heater duty reduces energy usage while helping
  quality targets,
* reducing export pressure saves compressor power but risks violating the gas rate specification.

The MPC tunes the three controls towards an energy-efficient operating point subject to soft
constraints on gas production, Wobbe Index and RVP.


In [None]:
process = SimplifiedOffshoreProcess(dew_point_temperature=35.0, oil_heater_temperature=63.0, export_pressure=250.0)
process.run()

baseline_metrics = {
    "dewPointTemperature_C": process.clone_controls()[0],
    "oilHeaterTemperature_C": process.clone_controls()[1],
    "exportPressure_bara": process.clone_controls()[2],
    "energy_MW": process.total_energy(),
    "gas_rate_tonne_hr": process.sales_gas_rate(),
    "wobbe_MJ_Sm3": process.wobbe_index(),
    "rvp_bara": process.rvp(),
}

controller = ModelPredictiveController("fpsoMpc")
controller.configureControls("dewPointTemperature", "oilHeaterTemperature", "exportPressure")
initial_controls = process.clone_controls()
controller.setInitialControlValues(initial_controls)
controller.setPreferredControlVector([initial_controls[0] - 2.0, initial_controls[1] - 3.0, initial_controls[2] - 10.0])
controller.setControlLimits("dewPointTemperature", 0.0, 40.0)
controller.setControlLimits("oilHeaterTemperature", 40.0, 90.0)
controller.setControlLimits("exportPressure", 200.0, 260.0)
controller.setControlWeights([0.05, 0.05, 3.0])
controller.setMoveWeights([0.01, 0.01, 0.2])

sensitivities = compute_control_sensitivities(process, initial_controls)

gas_measurement = GasProductionMeasurement("sales gas", process)
wobbe_measurement = WobbeQualityMeasurement("wobbe", process)
rvp_measurement = OilRvpMeasurement("rvp", process)

base_gas = gas_measurement.getMeasuredValue()
base_wobbe = wobbe_measurement.getMeasuredValue()
base_rvp = rvp_measurement.getMeasuredValue()

gas_target = -base_gas * 0.97
wobbe_spec = -base_wobbe - 0.05
rvp_spec = base_rvp * 1.02

controller.addQualityConstraint(
    QualityConstraint.builder("gasRate")
    .measurement(gas_measurement)
    .unit("tonne/hr")
    .limit(-gas_target)
    .margin(0.5)
    .controlSensitivity(sensitivities[0])
    .build()
)
controller.addQualityConstraint(
    QualityConstraint.builder("wobbe")
    .measurement(wobbe_measurement)
    .unit("MJ/Sm3")
    .limit(-wobbe_spec)
    .margin(0.02)
    .controlSensitivity(sensitivities[1])
    .build()
)
controller.addQualityConstraint(
    QualityConstraint.builder("rvp")
    .measurement(rvp_measurement)
    .unit("bara")
    .limit(rvp_spec)
    .margin(0.02)
    .controlSensitivity(sensitivities[2])
    .build()
)

iteration_records = []
current_controls = initial_controls.copy()
for iteration in range(1, 4):
    controller.setInitialControlValues(current_controls)
    controller.runTransient(float("nan"), 1.0, uuid.uuid4())
    recommendation = controller.getControlVector()
    process.apply_vector(recommendation)
    process.run()
    current_controls = recommendation.copy()
    iteration_records.append(
        {
            "iteration": iteration,
            "dewPointTemperature_C": recommendation[0],
            "oilHeaterTemperature_C": recommendation[1],
            "exportPressure_bara": recommendation[2],
            "energy_MW": process.total_energy(),
            "gas_rate_tonne_hr": process.sales_gas_rate(),
            "wobbe_MJ_Sm3": process.wobbe_index(),
            "rvp_bara": process.rvp(),
        }
    )

baseline_metrics, pd.DataFrame(iteration_records)


Energy usage drops substantially while the quality metrics remain inside their respective
specifications. The control vector converges towards the preferred low-energy operating point, in
line with the expectations from the Java integration test.
