# Inverse Control Optimization Example

This notebook demonstrates the complete workflow for inverse optimization using trained digital twin models.

## Table of Contents
1. Setup and Model Loading
2. Single-Objective Optimization
3. Multi-Objective Optimization (Pareto Frontier)
4. Kalman Filter Real-Time Correction

## Prerequisites
- A trained Stage1 SST model
- Test data for inference

## 1. Setup and Model Loading

In [None]:
import sys
import os
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import pickle

# Add parent directory to path
sys.path.append('..')

from models.static_transformer import StaticSensorTransformer
from optimization import (
    InverseOptimizer,
    MultiObjectiveOptimizer,
    KalmanCorrector,
    ConstraintManager,
    InputConstraint,
    OptimizationConfig,
    MultiObjectiveConfig,
    KalmanConfig
)

# Check CUDA availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

### Load Trained Model

In [None]:
# Specify model path
model_path = '../saved_models/stage1_model.pth'
config_path = model_path.replace('.pth', '_config.json')
scaler_path = model_path.replace('.pth', '_scaler.pkl')

# Load configuration
with open(config_path, 'r') as f:
    config = json.load(f)

print("Model Configuration:")
print(f"  Boundary signals: {len(config['boundary_signals'])}")
print(f"  Target signals: {len(config['target_signals'])}")
print(f"  Model dimension: {config.get('d_model', 128)}")

# Load scalers
with open(scaler_path, 'rb') as f:
    scalers = pickle.load(f)

scaler_X = scalers['scaler_X']
scaler_y = scalers['scaler_y']

# Create and load model
model = StaticSensorTransformer(
    num_boundary_sensors=len(config['boundary_signals']),
    num_target_sensors=len(config['target_signals']),
    d_model=config.get('d_model', 128),
    nhead=config.get('nhead', 8),
    num_layers=config.get('num_layers', 3)
)

checkpoint = torch.load(model_path, map_location=device, weights_only=False)
model.load_state_dict(checkpoint)
model.to(device)
model.eval()

print("âœ… Model loaded successfully!")

# Store signal names
boundary_signals = config['boundary_signals']
target_signals = config['target_signals']

### Load Test Data

In [None]:
# Load data
data_path = '../data/data.csv'
df = pd.read_csv(data_path)

print(f"Data shape: {df.shape}")
print(f"\nFirst few rows:")
df.head()

### Baseline Prediction

In [None]:
# Select a baseline sample
baseline_idx = 0
baseline_inputs = df.iloc[baseline_idx][boundary_signals].values
baseline_targets = df.iloc[baseline_idx][target_signals].values

# Make prediction
X_scaled = scaler_X.transform(baseline_inputs.reshape(1, -1))
X_tensor = torch.from_numpy(X_scaled).float().to(device)

with torch.no_grad():
    y_pred_scaled = model(X_tensor)
    baseline_predictions = scaler_y.inverse_transform(y_pred_scaled.cpu().numpy())[0]

print("Baseline Predictions:")
for i, signal in enumerate(target_signals[:5]):  # Show first 5
    print(f"  {signal}: {baseline_predictions[i]:.4f}")

## 2. Single-Objective Optimization

**Scenario:** Reduce a specific target signal by 10% by optimizing selected input parameters.

### Define Optimization Target

In [None]:
# Select target signal to optimize (e.g., first target signal)
target_signal_name = target_signals[0]
target_signal_idx = 0

# Define target: reduce by 10%
target_bias = -0.10  # -10%

targets = {
    target_signal_idx: {
        'bias': target_bias,
        'weight': 1.0
    }
}

print(f"Optimization Target:")
print(f"  Signal: {target_signal_name}")
print(f"  Current value: {baseline_predictions[target_signal_idx]:.4f}")
print(f"  Target value: {baseline_predictions[target_signal_idx] * (1 + target_bias):.4f}")
print(f"  Change: {target_bias*100}%")

### Define Variable Inputs and Constraints

In [None]:
# Select 3 inputs to optimize (e.g., first 3 boundary signals)
optimizable_signal_names = boundary_signals[:3]
optimizable_indices = [0, 1, 2]

# Create constraints
constraints_list = []

for i, signal_name in enumerate(boundary_signals):
    baseline_value = baseline_inputs[i]
    
    if i in optimizable_indices:
        # Optimizable: allow Â±50% range, Â±20% change rate
        constraints_list.append(InputConstraint(
            name=signal_name,
            min_value=baseline_value * 0.5,
            max_value=baseline_value * 1.5,
            baseline_value=baseline_value,
            max_change_rate=0.20,  # Â±20%
            is_fixed=False
        ))
    else:
        # Fixed: keep at baseline value
        constraints_list.append(InputConstraint(
            name=signal_name,
            min_value=baseline_value,
            max_value=baseline_value,
            baseline_value=baseline_value,
            is_fixed=True
        ))

constraint_manager = ConstraintManager(constraints_list)

print(constraint_manager.get_constraint_summary())

### Run Inverse Optimization

In [None]:
# Create optimizer
optimizer = InverseOptimizer(
    model=model,
    scaler_X=scaler_X,
    scaler_y=scaler_y,
    device=device
)

# Configuration
opt_config = OptimizationConfig(
    learning_rate=0.01,
    max_epochs=500,
    optimizer_type='adam',
    patience=50,
    verbose=True
)

# Run optimization
print("\n" + "="*80)
print("Running Inverse Optimization...")
print("="*80)

result = optimizer.optimize(
    targets=targets,
    constraint_manager=constraint_manager,
    initial_inputs=baseline_inputs,
    config=opt_config
)

print("\n" + "="*80)
print("Optimization Completed!")
print("="*80)

### Analyze Results

In [None]:
# Print summary
print("\nðŸ“Š Optimization Results:")
print(f"  Converged: {result['converged']}")
print(f"  Epochs: {result['num_epochs']}")
print(f"  Final loss: {result['final_loss']:.6f}")
print(f"  Elapsed time: {result['elapsed_time']:.2f}s")

print("\nðŸ“ˆ Target Achievement:")
current_val = baseline_predictions[target_signal_idx]
optimized_val = result['predictions'][target_signal_idx]
target_val = current_val * (1 + target_bias)

print(f"  Target signal: {target_signal_name}")
print(f"  Current: {current_val:.4f}")
print(f"  Optimized: {optimized_val:.4f}")
print(f"  Target: {target_val:.4f}")
print(f"  Achievement: {(optimized_val - current_val) / (target_val - current_val) * 100:.1f}%")

print("\nðŸ”§ Input Changes:")
for i in optimizable_indices:
    signal_name = boundary_signals[i]
    baseline_val = baseline_inputs[i]
    optimized_val = result['optimized_inputs'][i]
    change_pct = (optimized_val - baseline_val) / baseline_val * 100
    
    print(f"  {signal_name}:")
    print(f"    {baseline_val:.4f} â†’ {optimized_val:.4f} ({change_pct:+.2f}%)")

In [None]:
# Plot loss convergence
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(result['loss_history'], linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss Convergence')
plt.yscale('log')
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
# Plot input evolution for optimizable inputs
for i in optimizable_indices:
    plt.plot(result['input_history'][:, i], label=boundary_signals[i], linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('Input Value')
plt.title('Input Evolution')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 3. Multi-Objective Optimization (Pareto Frontier)

**Scenario:** Optimize two conflicting objectives and explore trade-offs.

In [None]:
# Define two objectives
# Objective 1: Reduce target signal 0 by 10%
# Objective 2: Increase target signal 1 by 5%

obj1_idx = 0
obj1_bias = -0.10  # -10%

obj2_idx = 1 if len(target_signals) > 1 else 0
obj2_bias = 0.05  # +5%

objectives = [
    {'signal_idx': obj1_idx, 'bias': obj1_bias},
    {'signal_idx': obj2_idx, 'bias': obj2_bias}
]

print("Multi-Objective Configuration:")
print(f"  Objective 1: {target_signals[obj1_idx]} ({obj1_bias*100:+.0f}%)")
print(f"  Objective 2: {target_signals[obj2_idx]} ({obj2_bias*100:+.0f}%)")

In [None]:
# Create multi-objective optimizer
mo_optimizer = MultiObjectiveOptimizer(
    model=model,
    scaler_X=scaler_X,
    scaler_y=scaler_y,
    device=device
)

# Configuration
mo_config = MultiObjectiveConfig(
    n_pareto_points=20,
    base_config=OptimizationConfig(
        max_epochs=300,
        verbose=False  # Disable verbose for batch optimization
    )
)

# Generate Pareto frontier
print("\nGenerating Pareto frontier...")
pareto_results = mo_optimizer.generate_pareto_frontier(
    objectives=objectives,
    constraint_manager=constraint_manager,
    initial_inputs=baseline_inputs,
    config=mo_config
)

In [None]:
# Visualize Pareto frontier
fig = mo_optimizer.plot_pareto_frontier(
    pareto_results,
    objective_names=[target_signals[obj1_idx], target_signals[obj2_idx]],
    use_plotly=False  # Use matplotlib for notebook compatibility
)
plt.show()

In [None]:
# Display Pareto solutions table
solutions_data = []
for i, point in enumerate(pareto_results['pareto_points'][:10]):  # Show first 10
    solutions_data.append({
        'ID': i,
        'Weight_1': point['weight_1'],
        'Weight_2': point['weight_2'],
        f"{target_signals[obj1_idx]}": point['objective_1'],
        f"{target_signals[obj2_idx]}": point['objective_2'],
        'Converged': point['converged']
    })

solutions_df = pd.DataFrame(solutions_data)
print("\nPareto Solutions (first 10):")
solutions_df

### Select and Analyze a Pareto Solution

In [None]:
# Select solution at index 10 (middle of the frontier)
selected_idx = 10
selected_solution = mo_optimizer.select_solution(pareto_results, selected_idx)

print(f"Selected Pareto Solution #{selected_idx}:")
print(f"  Weights: [{selected_solution['weight_1']:.2f}, {selected_solution['weight_2']:.2f}]")
print(f"  {target_signals[obj1_idx]}: {selected_solution['objective_1']:.4f}")
print(f"  {target_signals[obj2_idx]}: {selected_solution['objective_2']:.4f}")

print("\n  Optimized Inputs:")
for i in optimizable_indices:
    print(f"    {boundary_signals[i]}: {selected_solution['inputs'][i]:.4f}")

## 4. Kalman Filter Real-Time Correction

**Scenario:** Use Kalman filtering to correct optimized control strategy based on simulated sensor feedback.

In [None]:
# Use the single-objective optimization result from Section 2
optimized_inputs = result['optimized_inputs']
optimized_predictions = result['predictions']

# Create Kalman corrector
kf = KalmanCorrector(
    model=model,
    scaler_X=scaler_X,
    scaler_y=scaler_y,
    optimizable_input_indices=optimizable_indices,
    target_output_indices=[target_signal_idx],
    fixed_input_values=optimized_inputs,
    device=device
)

print("Kalman Filter Configuration:")
print(f"  Optimizable inputs: {len(optimizable_indices)}")
print(f"  Target outputs: 1")
print(f"  State dimension: {kf.dim_x}")
print(f"  Measurement dimension: {kf.dim_z}")

In [None]:
# Generate synthetic measurements with noise
n_steps = 50
measurement_noise_std = 0.1  # 10% noise

np.random.seed(42)
measurements = []
true_value = optimized_predictions[target_signal_idx]

for t in range(n_steps):
    noise = np.random.normal(0, measurement_noise_std * true_value)
    measurements.append([true_value + noise])

measurements = np.array(measurements)

print(f"Generated {n_steps} noisy measurements")
print(f"  True value: {true_value:.4f}")
print(f"  Noise std: {measurement_noise_std * true_value:.4f}")

In [None]:
# Run Kalman simulation
initial_inputs_opt = optimized_inputs[optimizable_indices]
initial_outputs = np.array([true_value])
initial_state = np.concatenate([initial_inputs_opt, initial_outputs])

kf_config = KalmanConfig(
    process_noise=0.01,
    measurement_noise=0.1,
    initial_state_covariance=1.0
)

print("Running Kalman filter simulation...")
sim_results = kf.run_simulation(
    initial_state=initial_state,
    measurements=measurements,
    config=kf_config
)

print("âœ… Simulation completed")

In [None]:
# Plot correction results
uncorrected_predictions = np.tile([true_value], (n_steps, 1))

fig = KalmanCorrector.plot_correction_results(
    simulation_results=sim_results,
    uncorrected_predictions=uncorrected_predictions,
    target_names=[target_signal_name],
    figsize=(15, 5)
)
plt.show()

In [None]:
# Compute correction metrics
metrics_df = kf.compute_correction_metrics(
    simulation_results=sim_results,
    uncorrected_predictions=uncorrected_predictions
)

print("\nðŸ“Š Kalman Correction Metrics:")
print(metrics_df.to_string(index=False))

print(f"\nâœ… Average RMSE improvement: {metrics_df['RMSE_Improvement_%'].mean():.2f}%")

## Summary

This notebook demonstrated:

1. **Single-Objective Optimization**: Optimized input parameters to achieve a target output reduction
2. **Multi-Objective Optimization**: Generated Pareto frontier to explore trade-offs between conflicting objectives
3. **Kalman Filter Correction**: Applied real-time correction to handle measurement noise

### Key Takeaways

- Inverse optimization converges in **seconds** (typically < 2s on GPU)
- Gradient-based approach works with **any trained model** without requiring model invertibility
- **Constraints** (hard bounds and change rates) are naturally handled through projection
- **Multi-objective** optimization provides a spectrum of trade-off solutions
- **Kalman filtering** can significantly improve robustness in noisy environments

### Next Steps

- Apply to your specific use case with real data
- Tune optimization parameters (learning rate, patience, etc.)
- Experiment with different constraint configurations
- Integrate into production control systems