# Notebook 4: Robust Model Predictive Control System

**Project:** `PharmaControl-Pro`
**Goal:** Build the 'brain' of our control system. This involves creating a robust Model Predictive Control (MPC) class that encapsulates the logic for real-time decision-making. This controller will use the trained model to find the best control action while respecting process constraints.

### Table of Contents
1. [The MPC Controller: Architecture and Logic](#1.-The-MPC-Controller:-Architecture-and-Logic)
2. [Implementing the MPC Controller Class](#2.-Implementing-the-MPC-Controller-Class)
3. [Defining Process Constraints](#3.-Defining-Process-Constraints)
4. [Standalone Test of the MPC Controller](#4.-Standalone-Test-of-the-MPC-Controller)

--- 
## 1. The MPC Controller: Architecture and Logic

The MPC controller's job is to answer one question at each decision point: **"Given the current state of the plant, what is the best possible action to take right now to meet our future targets?"**

To do this, it follows a precise, multi-step algorithm based on the principles we discussed in Notebook 1:

1.  **Generate Candidate Actions:** Create a 'lattice' or grid of potential future control plans. For example, 'Plan A: increase spray rate by 5%', 'Plan B: hold steady', 'Plan C: decrease air flow by 10 m³/h', and so on. This is our search space.

2.  **Enforce Constraints:** This is a critical safety and quality step. The controller must filter out any candidate plans that are physically impossible or would violate process limits (e.g., a negative flow rate, a change that is too rapid, or a value outside the equipment's operational range).

3.  **Predict and Evaluate:** For every *valid* candidate plan, use our trained `GranulationPredictor` model to forecast the future CMAs over the horizon `H`. Then, calculate a 'cost' for each prediction. The cost function is key:
    *   It heavily penalizes deviations from the target CMAs (`target_error_cost`).
    *   It can also lightly penalize large or rapid changes in the control actions themselves to promote smooth, stable operation (`control_effort_cost`).
    *   `Total Cost = target_error_cost + λ * control_effort_cost`

4.  **Select Optimal Action:** The candidate plan with the lowest total cost is the winner.

5.  **Apply Receding Horizon:** The controller returns only the *very first step* of the winning plan. This action is sent to the plant. At the next decision point, the entire process is repeated, ensuring the controller is always reacting to the latest information.

--- 
## 2. Implementing the MPC Controller Class

We will encapsulate all this logic into a reusable `MPCController` class. This class will be defined in `src/mpc_controller.py`.

In [1]:
%%writefile ../src/mpc_controller.py
import torch
import numpy as np
import itertools
from tqdm.auto import tqdm

class MPCController:
    """
    Implements a Model Predictive Controller that uses a trained PyTorch model
    to find optimal control actions while respecting process constraints.
    """
    def __init__(self, model, config, constraints, scalers):
        self.model = model
        self.config = config
        self.constraints = constraints
        self.scalers = scalers
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model.to(self.device)
        self.model.eval()

    def _generate_control_lattice(self, current_cpps):
        """Creates a grid of possible future control sequences."""
        cpp_names = self.config['cpp_names']
        discretization = self.config['discretization_steps']
        
        # For each CPP, create a set of possible delta (change) values
        # Example: [-5.0, 0.0, 5.0] for spray_rate
        delta_options = []
        for name in cpp_names:
            max_change = self.constraints[name]['max_change_per_step']
            options = np.linspace(-max_change, max_change, discretization)
            delta_options.append(options)
        
        # Create the Cartesian product of all delta options
        # This gives every possible combination of a single-step change
        candidate_deltas = list(itertools.product(*delta_options))
        
        # Assume the change is held constant over the horizon H
        candidate_sequences = []
        for deltas in candidate_deltas:
            # Create a plan by applying the deltas to the current CPPs
            new_cpp_step = current_cpps + np.array(deltas)
            # Create a full horizon sequence by repeating this step
            sequence = np.tile(new_cpp_step, (self.config['horizon'], 1))
            candidate_sequences.append(sequence)
            
        return candidate_sequences

    def _filter_by_constraints(self, candidates, current_cpps):
        """Removes candidate sequences that violate process constraints."""
        valid_candidates = []
        cpp_names = self.config['cpp_names']
        
        for seq in candidates:
            is_valid = True
            # We only need to check the first step, as it's held constant
            first_step = seq[0]
            for i, name in enumerate(cpp_names):
                # Check min/max operational limits
                if not (self.constraints[name]['min_val'] <= first_step[i] <= self.constraints[name]['max_val']):
                    is_valid = False
                    break
            
            if is_valid:
                valid_candidates.append(seq)
        
        return valid_candidates

    def _calculate_cost(self, prediction, action, target_cmas):
        """Calculates the cost of a predicted trajectory."""
        # Ensure target is on the correct device
        target_cmas = target_cmas.to(self.device)
        
        # 1. Target Error Cost (how far are we from the setpoint?)
        # Using L1 loss (Mean Absolute Error) is often more robust to outliers
        target_error = torch.mean(torch.abs(prediction - target_cmas))
        
        # 2. Control Effort Cost (penalize large changes to promote stability)
        # This is a placeholder; a more complex version could penalize deviation from a desired steady state
        control_effort = torch.mean(torch.abs(action - action[0])) # Penalize non-constant actions
        
        # Combine costs with a weighting factor (lambda)
        total_cost = target_error + self.config['control_effort_lambda'] * control_effort
        return total_cost.item()

    def suggest_action(self, past_cmas_scaled, past_cpps_scaled, target_cmas_unscaled):
        """The main MPC loop to find and return the best single control action."""
        # Get the last known CPPs (unscaled) to base our search on
        last_cpps_scaled = past_cpps_scaled[-1, :]
        current_cpps_unscaled = np.zeros(len(self.config['cpp_names']))
        for i, name in enumerate(self.config['cpp_names']):
            current_cpps_unscaled[i] = self.scalers[name].inverse_transform(last_cpps_scaled[i].reshape(-1, 1))
        
        # 1. Generate all possible actions
        candidates_unscaled = self._generate_control_lattice(current_cpps_unscaled)
        
        # 2. Filter out invalid actions
        valid_candidates_unscaled = self._filter_by_constraints(candidates_unscaled, current_cpps_unscaled)
        
        if not valid_candidates_unscaled:
            print("Warning: No valid control actions found after applying constraints.")
            return current_cpps_unscaled # Return the last known safe action
        
        # 3. Predict and Evaluate
        best_cost = float('inf')
        best_action_sequence = None
        
        # Prepare scaled target tensor once
        target_cmas_scaled = np.zeros_like(target_cmas_unscaled)
        for i, name in enumerate(self.config['cma_names']):
            target_cmas_scaled[:, i] = self.scalers[name].transform(target_cmas_unscaled[:, i].reshape(-1, 1)).flatten()
        target_cmas_tensor = torch.tensor(target_cmas_scaled, dtype=torch.float32).unsqueeze(0)
        
        # Convert historical data to tensors
        past_cmas_tensor = torch.tensor(past_cmas_scaled, dtype=torch.float32).unsqueeze(0).to(self.device)
        past_cpps_tensor = torch.tensor(past_cpps_scaled, dtype=torch.float32).unsqueeze(0).to(self.device)

        with torch.no_grad():
            pbar = tqdm(valid_candidates_unscaled, desc="Evaluating MPC Candidates", leave=False)
            for action_seq_unscaled in pbar:
                # Scale the candidate action sequence for the model
                action_seq_scaled = np.zeros_like(action_seq_unscaled)
                for i, name in enumerate(self.config['cpp_names_and_soft_sensors']):
                     if name in self.scalers:
                        # This part needs to be improved to handle soft sensors correctly.
                        # For now, we assume we can scale the base CPPs.
                        if i < len(self.config['cpp_names']):
                             action_seq_scaled[:, i] = self.scalers[name].transform(action_seq_unscaled[:, i].reshape(-1, 1)).flatten()
                
                action_tensor = torch.tensor(action_seq_scaled, dtype=torch.float32).unsqueeze(0).to(self.device)
                
                # Get model prediction
                prediction = self.model(past_cmas_tensor, past_cpps_tensor, action_tensor)
                
                # Calculate cost
                cost = self._calculate_cost(prediction, action_tensor, target_cmas_tensor)
                
                if cost < best_cost:
                    best_cost = cost
                    best_action_sequence = action_seq_unscaled
        
        # 4. Return the first step of the best plan found
        return best_action_sequence[0]

Writing ../src/mpc_controller.py


--- 
## 3. Defining Process Constraints

A real industrial process has strict limits. The controller must never be allowed to suggest an action that could damage the equipment or ruin the product. We will define these limits in a clear, structured dictionary that our `MPCController` can use.

In [2]:
PROCESS_CONSTRAINTS = {
    'spray_rate': {
        'min_val': 80.0,              # Equipment minimum
        'max_val': 180.0,             # Equipment maximum
        'max_change_per_step': 10.0   # Max allowed change in one go (for stability)
    },
    'air_flow': {
        'min_val': 400.0,
        'max_val': 700.0,
        'max_change_per_step': 25.0
    },
    'carousel_speed': {
        'min_val': 20.0,
        'max_val': 40.0,
        'max_change_per_step': 2.0
    }
}

--- 
## 4. Standalone Test of the MPC Controller

Before we connect the controller to our simulator in the final notebook, let's perform a standalone test. We will create some dummy historical data, define a target, and ask the controller for a single best action. This allows us to debug the controller's logic in isolation.

In [3]:
import torch
import joblib
import os, sys
sys.path.append('..')  # Add parent directory to Python path
import pandas as pd
from src.model_architecture import GranulationPredictor
from src.mpc_controller import MPCController

# --- Load all necessary components ---
DATA_DIR = '../data'
MODEL_SAVE_PATH = os.path.join(DATA_DIR, 'best_predictor_model.pth')
SCALER_FILE = os.path.join(DATA_DIR, 'scalers.joblib')
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Load model (need to know its hyperparameters to instantiate)
# For this test, we'll hardcode them from the previous notebook's tuning results.
# In a real app, these would be saved in a config file.
BEST_HPARAMS = {'d_model': 64, 'nhead': 4, 'num_encoder_layers': 2, 'num_decoder_layers': 2, 'dropout': 0.15}
CMA_COLS = ['d50', 'lod']
CPP_COLS_BASE = ['spray_rate', 'air_flow', 'carousel_speed']
CPP_COLS_FULL = ['spray_rate', 'air_flow', 'carousel_speed', 'specific_energy', 'froude_number_proxy']

model = GranulationPredictor(
    cma_features=len(CMA_COLS),
    cpp_features=len(CPP_COLS_FULL),
    **BEST_HPARAMS
)
model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location=DEVICE))

# Load scalers
scalers = joblib.load(SCALER_FILE)

# --- MPC Configuration ---
MPC_CONFIG = {
    'horizon': 72,
    'cpp_names': CPP_COLS_BASE,
    'cma_names': CMA_COLS,
    'cpp_names_and_soft_sensors': CPP_COLS_FULL,
    'discretization_steps': 3, # Use 3 steps for each CPP delta: [-max, 0, +max]
    'control_effort_lambda': 0.05 # Small penalty for control changes
}

# --- Instantiate the Controller ---
mpc = MPCController(model, MPC_CONFIG, PROCESS_CONSTRAINTS, scalers)
print("MPC Controller instantiated successfully.")

FileNotFoundError: [Errno 2] No such file or directory: '../data/best_predictor_model.pth'

In [None]:
# --- Prepare Dummy Data for the Test ---

# Create some fake historical data (36 steps)
# In a real scenario, this comes from the live plant data stream.
dummy_past_cmas_unscaled = np.array([[450, 1.2]] * 36)
dummy_past_cpps_unscaled = np.array([[130, 550, 30]] * 36)

# Note: The controller needs the soft sensors, so we must add them.
# For this simple test, we will approximate them.
dummy_se = (dummy_past_cpps_unscaled[:, 0] * dummy_past_cpps_unscaled[:, 2]) / 1000.0
dummy_fr = (dummy_past_cpps_unscaled[:, 2]**2) / 9.81
dummy_past_cpps_full_unscaled = np.hstack([
    dummy_past_cpps_unscaled,
    dummy_se.reshape(-1, 1),
    dummy_fr.reshape(-1, 1)
])

# Scale the dummy data just as we would in a real loop
past_cmas_scaled = np.zeros_like(dummy_past_cmas_unscaled)
past_cpps_scaled = np.zeros_like(dummy_past_cpps_full_unscaled)

for i, col in enumerate(CMA_COLS):
    past_cmas_scaled[:, i] = scalers[col].transform(dummy_past_cmas_unscaled[:, i].reshape(-1, 1)).flatten()
for i, col in enumerate(CPP_COLS_FULL):
    past_cpps_scaled[:, i] = scalers[col].transform(dummy_past_cpps_full_unscaled[:, i].reshape(-1, 1)).flatten()

# Define a target we want to reach
# We want smaller, wetter granules.
target_cmas_unscaled = np.tile([350.0, 1.8], (MPC_CONFIG['horizon'], 1))

# --- Run the MPC Decision Logic ---
print(f"Current State (unscaled): d50={dummy_past_cmas_unscaled[-1, 0]}, lod={dummy_past_cmas_unscaled[-1, 1]}")
print(f"Target State (unscaled):  d50={target_cmas_unscaled[0, 0]}, lod={target_cmas_unscaled[0, 1]}")

suggested_action = mpc.suggest_action(past_cmas_scaled, past_cpps_scaled, target_cmas_unscaled)

print("\n--- MPC Suggestion ---")
print("To reach the target, the best immediate action is:")
for i, name in enumerate(MPC_CONFIG['cpp_names']):
    print(f"  {name}: {suggested_action[i]:.2f}")

### Analysis of the Suggestion

To reach a target of smaller (`d50`=350) and wetter (`LOD`=1.8) granules from a state of larger, drier ones, we expect the controller to suggest actions that:
1.  **Reduce Granule Size:** This is primarily achieved by decreasing the `spray_rate`.
2.  **Increase Moisture:** This can be done by decreasing the `air_flow` or increasing the `carousel_speed` (reducing drying time).

Check if the controller's suggested action aligns with this physical intuition. This confirms that the model has learned meaningful relationships and the MPC logic is working correctly.

We are now ready to put all the pieces together in the final notebook and run a full closed-loop simulation.