# Multi-Objective Constrained Optimization for Traffic Simulation

This notebook implements state-of-the-art multi-objective optimization for the IHUTE Nashville transportation incentive simulation using:

- **PyTorch** for differentiable optimization
- **MLX** for Apple Silicon acceleration (optional)
- **EPO (Exact Pareto Optimal)** solver for true Pareto front discovery
- **Neural Surrogate Model** for fast objective evaluation

## Objectives
1. **VMT Reduction** - Minimize vehicle-miles-traveled
2. **Travel Time Savings** - Minimize aggregate commuter time
3. **Budget Efficiency** - Minimize $/VMT spent
4. **Equity** - Maximize fair distribution across zones

## 1. Setup & Configuration

In [1]:
# Core dependencies
import sys
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import duckdb
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import json

# Visualization
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

# Scientific computing
from scipy.stats import qmc
from scipy.spatial.distance import cdist

# Check for MLX availability (Apple Silicon)
MLX_AVAILABLE = False
try:
    import mlx.core as mx
    import mlx.nn as mlx_nn
    MLX_AVAILABLE = True
    print("MLX available - Apple Silicon acceleration enabled")
except ImportError:
    print("MLX not available - using PyTorch only")

# Add src to path for importing project modules
sys.path.insert(0, str(Path.cwd().parent / 'src'))

# Set random seeds for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

# Device configuration
if torch.backends.mps.is_available():
    DEVICE = torch.device("mps")  # Apple Silicon GPU
    print(f"Using Apple Silicon MPS backend")
elif torch.cuda.is_available():
    DEVICE = torch.device("cuda")
    print(f"Using CUDA")
else:
    DEVICE = torch.device("cpu")
    print(f"Using CPU")

print(f"PyTorch version: {torch.__version__}")

MLX available - Apple Silicon acceleration enabled
Using Apple Silicon MPS backend
PyTorch version: 2.9.1


In [2]:
# Configuration
@dataclass
class OptimizationConfig:
    """Configuration for multi-objective optimization."""
    # Decision variable bounds
    carpool_reward_range: Tuple[float, float] = (1.0, 10.0)
    pacer_reward_range: Tuple[float, float] = (0.05, 0.30)
    peak_multiplier_range: Tuple[float, float] = (1.0, 2.5)
    budget_split_range: Tuple[float, float] = (0.0, 1.0)
    morning_peak_start_range: Tuple[float, float] = (6.0, 8.0)
    morning_peak_end_range: Tuple[float, float] = (8.0, 10.0)
    evening_peak_start_range: Tuple[float, float] = (16.0, 18.0)
    evening_peak_end_range: Tuple[float, float] = (18.0, 20.0)
    
    # Constraints
    daily_budget: float = 50000.0
    min_zone_allocation: float = 0.05  # 5% minimum per zone
    
    # Optimization parameters
    n_pareto_starts: int = 50
    n_iterations: int = 100
    learning_rate: float = 0.01
    
    # Surrogate model
    n_training_samples: int = 5000
    hidden_dims: List[int] = None
    dropout_rate: float = 0.1
    n_epochs: int = 200
    batch_size: int = 64
    
    def __post_init__(self):
        if self.hidden_dims is None:
            self.hidden_dims = [128, 256, 128]

config = OptimizationConfig()
print(f"Configuration loaded: {config.n_training_samples} training samples, {config.n_pareto_starts} Pareto starts")

Configuration loaded: 5000 training samples, 50 Pareto starts


## 2. Data Preparation

Load data from the DuckDB warehouse and generate synthetic training samples using the behavioral models.

In [3]:
# Connect to DuckDB warehouse
DB_PATH = Path.cwd().parent / 'warehouse.duckdb'

def load_warehouse_data() -> Dict[str, pd.DataFrame]:
    """Load relevant data from DuckDB warehouse."""
    conn = duckdb.connect(str(DB_PATH), read_only=True)
    
    data = {}
    
    # Try to load incentive effectiveness data
    try:
        data['incentive_metrics'] = conn.execute("""
            SELECT * FROM metrics_incentive_effectiveness
            LIMIT 10000
        """).fetchdf()
        print(f"Loaded {len(data['incentive_metrics'])} incentive metrics rows")
    except:
        print("metrics_incentive_effectiveness not found - will generate synthetic data")
        data['incentive_metrics'] = None
    
    # Try to load congestion data
    try:
        data['congestion'] = conn.execute("""
            SELECT * FROM metrics_congestion
            LIMIT 10000
        """).fetchdf()
        print(f"Loaded {len(data['congestion'])} congestion rows")
    except:
        print("metrics_congestion not found")
        data['congestion'] = None
    
    # List all available tables
    tables = conn.execute("SHOW TABLES").fetchdf()
    print(f"\nAvailable tables in warehouse: {tables['name'].tolist()}")
    
    conn.close()
    return data

warehouse_data = load_warehouse_data()

metrics_incentive_effectiveness not found - will generate synthetic data
metrics_congestion not found

Available tables in warehouse: ['raw_agent_decisions', 'raw_hytch_participants', 'raw_hytch_trips', 'raw_incentive_events', 'raw_laddms_pet', 'raw_laddms_trajectories', 'raw_laddms_trajectory_counts', 'raw_laddms_zones', 'raw_metrics_timeseries', 'raw_simulation_runs']


In [4]:
class BehavioralSimulator:
    """
    Simplified behavioral simulator based on IHUTE's behavioral models.
    Used to generate training data for the neural surrogate.
    """
    
    def __init__(self, n_agents: int = 10000, seed: int = SEED):
        np.random.seed(seed)
        self.n_agents = n_agents
        
        # Agent population parameters (from incentives.yml)
        self.vot = np.random.lognormal(mean=np.log(25), sigma=0.4, size=n_agents)  # Value of time $/hr
        self.beta_incentive = np.random.normal(0.15, 0.05, size=n_agents)  # Incentive sensitivity
        self.beta_time = np.random.normal(-0.08, 0.02, size=n_agents)  # Time sensitivity
        self.flexibility = np.random.uniform(0.5, 1.5, size=n_agents)  # Schedule flexibility (hours)
        
        # Zone assignments (4 zones along I-24 corridor)
        self.home_zone = np.random.choice(4, size=n_agents, p=[0.3, 0.25, 0.25, 0.2])
        self.work_zone = np.random.choice(4, size=n_agents, p=[0.2, 0.3, 0.3, 0.2])
        
        # Baseline trip characteristics
        self.base_distance = np.random.lognormal(mean=np.log(15), sigma=0.5, size=n_agents)  # miles
        self.base_travel_time = self.base_distance * 2.5 + np.random.normal(0, 5, size=n_agents)  # minutes
    
    def compute_objectives(self, decision_vars: np.ndarray) -> np.ndarray:
        """
        Compute the 4 objectives given decision variables.
        
        Args:
            decision_vars: Array of shape (12,) containing:
                [carpool_reward, pacer_reward, peak_multiplier, budget_split,
                 morning_start, morning_end, evening_start, evening_end,
                 zone_weight_0, zone_weight_1, zone_weight_2, zone_weight_3]
        
        Returns:
            objectives: Array of shape (4,) [vmt_reduction, travel_time, budget_efficiency, equity]
        """
        # Unpack decision variables
        carpool_reward = decision_vars[0]
        pacer_reward = decision_vars[1]
        peak_multiplier = decision_vars[2]
        budget_split = decision_vars[3]  # fraction to carpool
        morning_start, morning_end = decision_vars[4], decision_vars[5]
        evening_start, evening_end = decision_vars[6], decision_vars[7]
        zone_weights = decision_vars[8:12]
        zone_weights = zone_weights / (zone_weights.sum() + 1e-8)  # Normalize
        
        # Peak hours duration
        peak_hours = (morning_end - morning_start) + (evening_end - evening_start)
        
        # --- Carpool participation model (logit-based) ---
        carpool_utility = (
            self.beta_incentive * carpool_reward * peak_multiplier +
            self.beta_time * (-10)  # 10 min coordination cost
        )
        carpool_prob = 1 / (1 + np.exp(-carpool_utility))
        
        # Zone-based targeting effect
        zone_effect = zone_weights[self.home_zone]
        carpool_prob = carpool_prob * (0.5 + zone_effect)
        carpool_prob = np.clip(carpool_prob, 0, 0.6)  # Cap at 60%
        
        # --- Pacer participation model ---
        pacer_utility = self.beta_incentive * 1.5 * pacer_reward * self.base_distance
        pacer_prob = 1 / (1 + np.exp(-pacer_utility * 20))  # Higher sensitivity
        pacer_prob = np.clip(pacer_prob, 0, 0.15)  # Max 15% are pacers
        
        # --- VMT Reduction ---
        # Carpoolers reduce VMT by sharing rides (assume 2.3 avg occupancy)
        carpool_participants = carpool_prob.mean()
        vmt_reduction_carpool = carpool_participants * 0.35  # 35% VMT reduction per carpooler
        
        # Pacers smooth flow, reducing stop-and-go (indirect VMT benefit)
        pacer_participants = pacer_prob.mean()
        vmt_reduction_pacer = pacer_participants * 0.1  # 10% flow improvement
        
        vmt_reduction = vmt_reduction_carpool + vmt_reduction_pacer
        vmt_reduction = np.clip(vmt_reduction, 0, 0.3)  # Max 30% reduction
        
        # --- Travel Time Savings ---
        # Base congestion reduction from participation
        congestion_reduction = 0.5 * carpool_participants + 0.8 * pacer_participants
        
        # Average travel time savings (minutes)
        base_avg_time = self.base_travel_time.mean()
        time_savings = base_avg_time * congestion_reduction * 0.3  # Up to 30% time reduction
        travel_time = base_avg_time - time_savings
        
        # --- Budget Efficiency ---
        # Total spending
        carpool_spending = (
            carpool_prob.sum() * carpool_reward * peak_multiplier * 
            (peak_hours / 4)  # Normalize by peak duration
        )
        pacer_spending = (
            (pacer_prob * self.base_distance).sum() * pacer_reward * peak_multiplier *
            (peak_hours / 4)
        )
        
        total_spending = budget_split * carpool_spending + (1 - budget_split) * pacer_spending
        total_spending = np.clip(total_spending, 100, config.daily_budget)
        
        # VMT reduced (in vehicle-miles)
        total_vmt = (self.base_distance * (1 - carpool_prob * 0.35)).sum()
        vmt_reduced = self.base_distance.sum() - total_vmt
        
        budget_efficiency = total_spending / (vmt_reduced + 1)  # $/VMT
        
        # --- Equity (Gini-based) ---
        # Benefits per zone
        zone_benefits = np.zeros(4)
        for z in range(4):
            zone_mask = self.home_zone == z
            zone_benefits[z] = (
                carpool_prob[zone_mask].sum() * carpool_reward +
                (pacer_prob[zone_mask] * self.base_distance[zone_mask]).sum() * pacer_reward
            ) * zone_weights[z]
        
        # Equity = 1 - normalized std (higher is more equitable)
        if zone_benefits.sum() > 0:
            zone_shares = zone_benefits / zone_benefits.sum()
            equity = 1 - np.std(zone_shares) * 2  # Scale std to 0-1
        else:
            equity = 0.5
        equity = np.clip(equity, 0, 1)
        
        return np.array([
            1 - vmt_reduction,  # Minimize (lower = more reduction = better)
            travel_time,         # Minimize
            budget_efficiency,   # Minimize ($/VMT)
            1 - equity           # Minimize (we negate equity to minimize)
        ])

# Initialize simulator
simulator = BehavioralSimulator(n_agents=10000)
print(f"Behavioral simulator initialized with {simulator.n_agents} agents")

Behavioral simulator initialized with 10000 agents


In [5]:
def generate_training_data(n_samples: int = 5000) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generate training data using Latin Hypercube Sampling for decision variables
    and the behavioral simulator for objectives.
    """
    # Define bounds for all 12 decision variables
    bounds = np.array([
        config.carpool_reward_range,
        config.pacer_reward_range,
        config.peak_multiplier_range,
        config.budget_split_range,
        config.morning_peak_start_range,
        config.morning_peak_end_range,
        config.evening_peak_start_range,
        config.evening_peak_end_range,
        (0.1, 0.9),  # zone_weight_0
        (0.1, 0.9),  # zone_weight_1
        (0.1, 0.9),  # zone_weight_2
        (0.1, 0.9),  # zone_weight_3
    ])
    
    # Latin Hypercube Sampling for better space coverage
    sampler = qmc.LatinHypercube(d=12, seed=SEED)
    samples_unit = sampler.random(n=n_samples)
    
    # Scale to bounds
    X = qmc.scale(samples_unit, bounds[:, 0], bounds[:, 1])
    
    # Compute objectives for each sample
    Y = np.zeros((n_samples, 4))
    for i in range(n_samples):
        Y[i] = simulator.compute_objectives(X[i])
        if (i + 1) % 1000 == 0:
            print(f"Generated {i + 1}/{n_samples} samples")
    
    return X, Y

# Generate training data
print("Generating training data...")
X_train, Y_train = generate_training_data(config.n_training_samples)
print(f"\nTraining data shape: X={X_train.shape}, Y={Y_train.shape}")
print("\nObjective statistics:")
print(f"  VMT (1-reduction): min={Y_train[:, 0].min():.3f}, max={Y_train[:, 0].max():.3f}, mean={Y_train[:, 0].mean():.3f}")
print(f"  Travel Time (min): min={Y_train[:, 1].min():.1f}, max={Y_train[:, 1].max():.1f}, mean={Y_train[:, 1].mean():.1f}")
print(f"  Budget Eff ($/VMT): min={Y_train[:, 2].min():.2f}, max={Y_train[:, 2].max():.2f}, mean={Y_train[:, 2].mean():.2f}")
print(f"  Inequity (1-equity): min={Y_train[:, 3].min():.3f}, max={Y_train[:, 3].max():.3f}, mean={Y_train[:, 3].mean():.3f}")

Generating training data...
Generated 1000/5000 samples
Generated 2000/5000 samples
Generated 3000/5000 samples
Generated 4000/5000 samples
Generated 5000/5000 samples

Training data shape: X=(5000, 12), Y=(5000, 4)

Objective statistics:
  VMT (1-reduction): min=0.775, max=0.813, mean=0.782
  Travel Time (min): min=37.4, max=38.1, mean=37.5
  Budget Eff ($/VMT): min=0.03, max=1.52, mean=0.78
  Inequity (1-equity): min=0.010, max=0.571, mean=0.219


## 3. Neural Surrogate Model

Multi-headed neural network to predict all 4 objectives from decision variables.

In [6]:
class MultiObjectiveSurrogate(nn.Module):
    """
    Neural surrogate model with shared trunk and separate heads for each objective.
    Uses uncertainty weighting for multi-task learning (Kendall et al. 2018).
    """
    
    def __init__(self, input_dim: int = 12, hidden_dims: List[int] = [128, 256, 128], 
                 n_objectives: int = 4, dropout_rate: float = 0.1):
        super().__init__()
        
        self.input_dim = input_dim
        self.n_objectives = n_objectives
        
        # Input normalization
        self.input_norm = nn.LayerNorm(input_dim)
        
        # Shared trunk
        trunk_layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            trunk_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.SiLU(),
                nn.Dropout(dropout_rate)
            ])
            prev_dim = hidden_dim
        self.trunk = nn.Sequential(*trunk_layers)
        
        # Separate heads for each objective
        self.heads = nn.ModuleList([
            nn.Sequential(
                nn.Linear(hidden_dims[-1], 64),
                nn.SiLU(),
                nn.Linear(64, 1)
            )
            for _ in range(n_objectives)
        ])
        
        # Learnable uncertainty weights (log variance)
        self.log_vars = nn.Parameter(torch.zeros(n_objectives))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the network.
        
        Args:
            x: Input tensor of shape (batch_size, input_dim)
        
        Returns:
            outputs: Tensor of shape (batch_size, n_objectives)
        """
        x = self.input_norm(x)
        shared = self.trunk(x)
        
        outputs = torch.cat([head(shared) for head in self.heads], dim=-1)
        return outputs
    
    def multi_task_loss(self, predictions: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        """
        Multi-task loss with learned uncertainty weighting.
        L = Σ (1/(2*σ_i^2)) * L_i + log(σ_i)
        """
        losses = []
        for i in range(self.n_objectives):
            mse = torch.mean((predictions[:, i] - targets[:, i]) ** 2)
            precision = torch.exp(-self.log_vars[i])
            loss = precision * mse + self.log_vars[i]
            losses.append(loss)
        
        return torch.sum(torch.stack(losses))

# Initialize model
model = MultiObjectiveSurrogate(
    input_dim=12,
    hidden_dims=config.hidden_dims,
    n_objectives=4,
    dropout_rate=config.dropout_rate
).to(DEVICE)

print(f"Model architecture:")
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")

Model architecture:
MultiObjectiveSurrogate(
  (input_norm): LayerNorm((12,), eps=1e-05, elementwise_affine=True)
  (trunk): Sequential(
    (0): Linear(in_features=12, out_features=128, bias=True)
    (1): SiLU()
    (2): Dropout(p=0.1, inplace=False)
    (3): Linear(in_features=128, out_features=256, bias=True)
    (4): SiLU()
    (5): Dropout(p=0.1, inplace=False)
    (6): Linear(in_features=256, out_features=128, bias=True)
    (7): SiLU()
    (8): Dropout(p=0.1, inplace=False)
  )
  (heads): ModuleList(
    (0-3): 4 x Sequential(
      (0): Linear(in_features=128, out_features=64, bias=True)
      (1): SiLU()
      (2): Linear(in_features=64, out_features=1, bias=True)
    )
  )
)

Total parameters: 100,896


In [7]:
def train_surrogate(model: nn.Module, X: np.ndarray, Y: np.ndarray,
                    n_epochs: int = 200, batch_size: int = 64,
                    lr: float = 1e-3, val_split: float = 0.2) -> Dict:
    """
    Train the surrogate model with early stopping.
    """
    # Train/val split
    n_val = int(len(X) * val_split)
    indices = np.random.permutation(len(X))
    train_idx, val_idx = indices[n_val:], indices[:n_val]
    
    # Normalize inputs
    X_mean, X_std = X[train_idx].mean(0), X[train_idx].std(0) + 1e-8
    X_norm = (X - X_mean) / X_std
    
    # Normalize outputs per objective
    Y_mean, Y_std = Y[train_idx].mean(0), Y[train_idx].std(0) + 1e-8
    Y_norm = (Y - Y_mean) / Y_std
    
    # Create data loaders
    X_train_t = torch.FloatTensor(X_norm[train_idx]).to(DEVICE)
    Y_train_t = torch.FloatTensor(Y_norm[train_idx]).to(DEVICE)
    X_val_t = torch.FloatTensor(X_norm[val_idx]).to(DEVICE)
    Y_val_t = torch.FloatTensor(Y_norm[val_idx]).to(DEVICE)
    
    train_dataset = TensorDataset(X_train_t, Y_train_t)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    # Optimizer
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5)
    
    # Training loop
    history = {'train_loss': [], 'val_loss': [], 'val_r2': []}
    best_val_loss = float('inf')
    patience_counter = 0
    best_state = None
    
    for epoch in range(n_epochs):
        # Training
        model.train()
        train_losses = []
        for X_batch, Y_batch in train_loader:
            optimizer.zero_grad()
            predictions = model(X_batch)
            loss = model.multi_task_loss(predictions, Y_batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_losses.append(loss.item())
        
        # Validation
        model.eval()
        with torch.no_grad():
            val_pred = model(X_val_t)
            val_loss = model.multi_task_loss(val_pred, Y_val_t).item()
            
            # R² per objective
            val_r2 = []
            for i in range(4):
                ss_res = ((val_pred[:, i] - Y_val_t[:, i]) ** 2).sum()
                ss_tot = ((Y_val_t[:, i] - Y_val_t[:, i].mean()) ** 2).sum()
                r2 = 1 - ss_res / (ss_tot + 1e-8)
                val_r2.append(r2.item())
        
        history['train_loss'].append(np.mean(train_losses))
        history['val_loss'].append(val_loss)
        history['val_r2'].append(val_r2)
        
        scheduler.step(val_loss)
        
        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
        else:
            patience_counter += 1
            if patience_counter >= 20:
                print(f"Early stopping at epoch {epoch + 1}")
                break
        
        if (epoch + 1) % 20 == 0:
            r2_str = ', '.join([f'{r:.3f}' for r in val_r2])
            print(f"Epoch {epoch + 1}: train_loss={np.mean(train_losses):.4f}, "
                  f"val_loss={val_loss:.4f}, val_R²=[{r2_str}]")
    
    # Restore best model
    if best_state is not None:
        model.load_state_dict(best_state)
    
    # Store normalization params
    model.X_mean = torch.FloatTensor(X_mean).to(DEVICE)
    model.X_std = torch.FloatTensor(X_std).to(DEVICE)
    model.Y_mean = torch.FloatTensor(Y_mean).to(DEVICE)
    model.Y_std = torch.FloatTensor(Y_std).to(DEVICE)
    
    return history

# Train the model
print("Training surrogate model...\n")
history = train_surrogate(
    model, X_train, Y_train,
    n_epochs=config.n_epochs,
    batch_size=config.batch_size
)

# Final R² scores
print(f"\nFinal validation R² scores:")
obj_names = ['VMT Reduction', 'Travel Time', 'Budget Efficiency', 'Equity']
for i, name in enumerate(obj_names):
    print(f"  {name}: R² = {history['val_r2'][-1][i]:.4f}")

Training surrogate model...

Epoch 20: train_loss=-0.9211, val_loss=-0.8896, val_R²=[0.679, 0.679, 0.769, 0.757]
Epoch 40: train_loss=-2.4845, val_loss=-1.2322, val_R²=[0.701, 0.701, 0.790, 0.763]
Early stopping at epoch 58

Final validation R² scores:
  VMT Reduction: R² = 0.6774
  Travel Time: R² = 0.6770
  Budget Efficiency: R² = 0.7947
  Equity: R² = 0.7690


In [8]:
# Plot training history
fig = make_subplots(rows=1, cols=2, subplot_titles=['Loss History', 'R² per Objective'])

# Loss plot
fig.add_trace(
    go.Scatter(y=history['train_loss'], name='Train Loss', mode='lines'),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(y=history['val_loss'], name='Val Loss', mode='lines'),
    row=1, col=1
)

# R² plot
r2_array = np.array(history['val_r2'])
for i, name in enumerate(obj_names):
    fig.add_trace(
        go.Scatter(y=r2_array[:, i], name=name, mode='lines'),
        row=1, col=2
    )

fig.update_layout(height=800, title_text="Surrogate Model Training")
fig.update_xaxes(title_text="Epoch", row=1, col=1)
fig.update_xaxes(title_text="Epoch", row=1, col=2)
fig.update_yaxes(title_text="Loss", row=1, col=1)
fig.update_yaxes(title_text="R²", row=1, col=2)
fig.show()

## 4. Constrained Pareto Optimization

EPO (Exact Pareto Optimal) solver with Augmented Lagrangian for constraints.

In [9]:
class EPOSolver:
    """
    Exact Pareto Optimal (EPO) solver for multi-objective optimization.
    Finds solutions on the true Pareto front using gradient descent.
    """
    
    def __init__(self, n_objectives: int = 4):
        self.n_objectives = n_objectives
    
    def get_epo_direction(self, gradients: torch.Tensor, 
                          preference: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Compute EPO descent direction that improves all objectives.
        
        Args:
            gradients: Tensor of shape (n_objectives, n_params) - gradients for each objective
            preference: Optional preference weights for objectives
        
        Returns:
            direction: Descent direction of shape (n_params,)
        """
        n_obj, n_params = gradients.shape
        
        if preference is None:
            preference = torch.ones(n_obj, device=gradients.device) / n_obj
        
        # Normalize gradients
        grad_norms = torch.norm(gradients, dim=1, keepdim=True) + 1e-8
        normalized_grads = gradients / grad_norms
        
        # Compute Gram matrix
        G = normalized_grads @ normalized_grads.T
        
        # Solve for optimal weights using quadratic programming approximation
        # We want weights w such that w @ G @ w.T is minimized subject to w >= 0, sum(w) = 1
        # Use softmax of preference as initial weights
        weights = torch.softmax(preference * 10, dim=0)
        
        # Refine weights with a few gradient steps
        for _ in range(5):
            grad_w = G @ weights
            weights = weights - 0.1 * grad_w
            weights = torch.clamp(weights, min=0.01)
            weights = weights / weights.sum()
        
        # Compute descent direction
        direction = weights @ gradients
        
        return direction


class ConstrainedParetoOptimizer:
    """
    Multi-objective optimizer with Augmented Lagrangian constraint handling.
    """
    
    def __init__(self, surrogate: nn.Module, config: OptimizationConfig):
        self.surrogate = surrogate
        self.config = config
        self.epo = EPOSolver(n_objectives=4)
        
        # Decision variable bounds (as tensors)
        self.bounds_low = torch.tensor([
            config.carpool_reward_range[0],
            config.pacer_reward_range[0],
            config.peak_multiplier_range[0],
            config.budget_split_range[0],
            config.morning_peak_start_range[0],
            config.morning_peak_end_range[0],
            config.evening_peak_start_range[0],
            config.evening_peak_end_range[0],
            0.1, 0.1, 0.1, 0.1  # zone weights
        ], device=DEVICE)
        
        self.bounds_high = torch.tensor([
            config.carpool_reward_range[1],
            config.pacer_reward_range[1],
            config.peak_multiplier_range[1],
            config.budget_split_range[1],
            config.morning_peak_start_range[1],
            config.morning_peak_end_range[1],
            config.evening_peak_start_range[1],
            config.evening_peak_end_range[1],
            0.9, 0.9, 0.9, 0.9  # zone weights
        ], device=DEVICE)
    
    def compute_constraints(self, x: torch.Tensor) -> torch.Tensor:
        """
        Compute constraint violations (positive = violated).
        
        Constraints:
        1. Budget constraint: estimated spending <= daily_budget
        2. Zone equity: each zone weight >= min_zone_allocation
        """
        constraints = []
        
        # Budget constraint (approximate)
        carpool_reward = x[0]
        pacer_reward = x[1]
        peak_mult = x[2]
        budget_split = x[3]
        
        # Rough spending estimate
        estimated_spending = (
            budget_split * carpool_reward * peak_mult * 2000 +  # ~2000 carpool trips
            (1 - budget_split) * pacer_reward * peak_mult * 50000  # ~50000 pacer miles
        )
        budget_violation = (estimated_spending - self.config.daily_budget) / self.config.daily_budget
        constraints.append(budget_violation)
        
        # Zone weight constraints (min allocation)
        zone_weights = x[8:12]
        zone_weights_norm = zone_weights / (zone_weights.sum() + 1e-8)
        for i in range(4):
            violation = self.config.min_zone_allocation - zone_weights_norm[i]
            constraints.append(violation)
        
        return torch.stack(constraints)
    
    def augmented_lagrangian_loss(self, objectives: torch.Tensor, 
                                   constraints: torch.Tensor,
                                   lambdas: torch.Tensor,
                                   mu: float) -> torch.Tensor:
        """
        Augmented Lagrangian: L = f(x) + λᵀg(x) + (μ/2)||max(0, g(x))||²
        """
        # Constraint penalty
        violated = torch.clamp(constraints, min=0)
        penalty = lambdas @ constraints + (mu / 2) * (violated ** 2).sum()
        
        return penalty
    
    def optimize_single(self, x_init: torch.Tensor, 
                        preference: Optional[torch.Tensor] = None,
                        n_iterations: int = 100) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Optimize from a single starting point.
        
        Returns:
            x_final: Optimized decision variables
            objectives: Final objective values
        """
        x = x_init.clone().requires_grad_(True)
        
        # Lagrange multipliers
        lambdas = torch.zeros(5, device=DEVICE)  # 1 budget + 4 zone constraints
        mu = 1.0
        
        optimizer = optim.Adam([x], lr=self.config.learning_rate)
        
        for iteration in range(n_iterations):
            optimizer.zero_grad()
            
            # Normalize and predict
            x_norm = (x - self.surrogate.X_mean) / self.surrogate.X_std
            objectives = self.surrogate(x_norm.unsqueeze(0)).squeeze()
            
            # Denormalize objectives
            objectives_denorm = objectives * self.surrogate.Y_std + self.surrogate.Y_mean
            
            # Compute gradients for each objective
            gradients = []
            for i in range(4):
                grad = torch.autograd.grad(objectives[i], x, retain_graph=True)[0]
                gradients.append(grad)
            gradients = torch.stack(gradients)
            
            # EPO direction
            epo_direction = self.epo.get_epo_direction(gradients, preference)
            
            # Constraint handling
            constraints = self.compute_constraints(x)
            constraint_loss = self.augmented_lagrangian_loss(objectives, constraints, lambdas, mu)
            
            # Combined gradient
            x.grad = epo_direction
            if constraint_loss > 0:
                constraint_grad = torch.autograd.grad(constraint_loss, x, retain_graph=True)[0]
                x.grad = x.grad + 0.5 * constraint_grad
            
            optimizer.step()
            
            # Project to bounds
            with torch.no_grad():
                x.data = torch.clamp(x.data, self.bounds_low, self.bounds_high)
            
            # Update Lagrange multipliers
            if (iteration + 1) % 10 == 0:
                with torch.no_grad():
                    constraints = self.compute_constraints(x)
                    lambdas = lambdas + mu * torch.clamp(constraints, min=0)
                    mu = min(mu * 1.1, 100)  # Anneal penalty
        
        # Final evaluation
        with torch.no_grad():
            x_norm = (x - self.surrogate.X_mean) / self.surrogate.X_std
            objectives = self.surrogate(x_norm.unsqueeze(0)).squeeze()
            objectives = objectives * self.surrogate.Y_std + self.surrogate.Y_mean
        
        return x.detach(), objectives.detach()
    
    def find_pareto_front(self, n_starts: int = 50) -> Tuple[np.ndarray, np.ndarray]:
        """
        Find Pareto front by optimizing from multiple starting points with varied preferences.
        """
        all_solutions = []
        all_objectives = []
        
        # Generate diverse preference vectors
        preferences = []
        for i in range(n_starts):
            # Dirichlet distribution for diverse preferences
            pref = np.random.dirichlet(np.ones(4) * 0.5)
            preferences.append(torch.tensor(pref, device=DEVICE, dtype=torch.float32))
        
        print(f"Optimizing from {n_starts} starting points...")
        
        for i, pref in enumerate(preferences):
            # Random starting point within bounds
            x_init = self.bounds_low + torch.rand(12, device=DEVICE) * (self.bounds_high - self.bounds_low)
            
            x_final, objectives = self.optimize_single(x_init, pref, self.config.n_iterations)
            
            all_solutions.append(x_final.cpu().numpy())
            all_objectives.append(objectives.cpu().numpy())
            
            if (i + 1) % 10 == 0:
                print(f"  Completed {i + 1}/{n_starts} optimizations")
        
        all_solutions = np.array(all_solutions)
        all_objectives = np.array(all_objectives)
        
        # Filter dominated solutions
        pareto_mask = self._get_pareto_mask(all_objectives)
        
        print(f"\nFound {pareto_mask.sum()} non-dominated solutions out of {n_starts}")
        
        return all_solutions[pareto_mask], all_objectives[pareto_mask]
    
    def _get_pareto_mask(self, objectives: np.ndarray) -> np.ndarray:
        """
        Find non-dominated solutions (all objectives are minimized).
        """
        n = len(objectives)
        is_pareto = np.ones(n, dtype=bool)
        
        for i in range(n):
            if not is_pareto[i]:
                continue
            for j in range(n):
                if i == j or not is_pareto[j]:
                    continue
                # Check if j dominates i (all objectives of j <= i, at least one <)
                if np.all(objectives[j] <= objectives[i]) and np.any(objectives[j] < objectives[i]):
                    is_pareto[i] = False
                    break
        
        return is_pareto

# Initialize optimizer
optimizer = ConstrainedParetoOptimizer(model, config)
print("Pareto optimizer initialized")

Pareto optimizer initialized


In [10]:
# Find Pareto front
pareto_solutions, pareto_objectives = optimizer.find_pareto_front(n_starts=config.n_pareto_starts)

print(f"\nPareto front statistics:")
print(f"  VMT (1-reduction): min={pareto_objectives[:, 0].min():.3f}, max={pareto_objectives[:, 0].max():.3f}")
print(f"  Travel Time: min={pareto_objectives[:, 1].min():.1f}, max={pareto_objectives[:, 1].max():.1f}")
print(f"  Budget Eff: min={pareto_objectives[:, 2].min():.2f}, max={pareto_objectives[:, 2].max():.2f}")
print(f"  Inequity: min={pareto_objectives[:, 3].min():.3f}, max={pareto_objectives[:, 3].max():.3f}")

Optimizing from 50 starting points...
  Completed 10/50 optimizations
  Completed 20/50 optimizations
  Completed 30/50 optimizations
  Completed 40/50 optimizations
  Completed 50/50 optimizations

Found 23 non-dominated solutions out of 50

Pareto front statistics:
  VMT (1-reduction): min=0.773, max=0.782
  Travel Time: min=37.4, max=37.5
  Budget Eff: min=-0.06, max=0.24
  Inequity: min=0.020, max=0.091


## 5. Pareto Front Visualization

In [11]:
def visualize_pareto_front(solutions: np.ndarray, objectives: np.ndarray):
    """
    Create comprehensive Pareto front visualizations.
    """
    obj_names = ['VMT (1-reduction)', 'Travel Time (min)', 'Budget Eff ($/VMT)', 'Inequity (1-equity)']
    
    # Convert objectives for display (transform back to intuitive values)
    display_obj = objectives.copy()
    display_obj[:, 0] = (1 - objectives[:, 0]) * 100  # VMT reduction %
    display_obj[:, 3] = (1 - objectives[:, 3]) * 100  # Equity %
    display_names = ['VMT Reduction (%)', 'Travel Time (min)', 'Cost ($/VMT)', 'Equity (%)']
    
    # 1. Parallel Coordinates Plot
    fig_parallel = go.Figure(data=
        go.Parcoords(
            line=dict(
                color=display_obj[:, 2],  # Color by budget efficiency
                colorscale='Viridis',
                showscale=True,
                cmin=display_obj[:, 2].min(),
                cmax=display_obj[:, 2].max()
            ),
            dimensions=[
                dict(range=[display_obj[:, 0].min(), display_obj[:, 0].max()],
                     label=display_names[0], values=display_obj[:, 0]),
                dict(range=[display_obj[:, 1].min(), display_obj[:, 1].max()],
                     label=display_names[1], values=display_obj[:, 1]),
                dict(range=[display_obj[:, 2].min(), display_obj[:, 2].max()],
                     label=display_names[2], values=display_obj[:, 2]),
                dict(range=[display_obj[:, 3].min(), display_obj[:, 3].max()],
                     label=display_names[3], values=display_obj[:, 3]),
            ]
        )
    )
    fig_parallel.update_layout(
        title='Pareto Front - Parallel Coordinates (colored by Cost)',
        height=900
    )
    fig_parallel.show()
    
    # 2. Pairwise Trade-off Matrix
    fig_matrix = make_subplots(
        rows=4, cols=4,
        subplot_titles=[f'{display_names[i]} vs {display_names[j]}' 
                       for i in range(4) for j in range(4)]
    )
    
    for i in range(4):
        for j in range(4):
            if i == j:
                # Histogram on diagonal
                fig_matrix.add_trace(
                    go.Histogram(x=display_obj[:, i], nbinsx=15, showlegend=False,
                                marker_color='steelblue'),
                    row=i+1, col=j+1
                )
            else:
                # Scatter plot
                fig_matrix.add_trace(
                    go.Scatter(x=display_obj[:, j], y=display_obj[:, i], 
                              mode='markers', showlegend=False,
                              marker=dict(size=8, color='steelblue', opacity=0.7)),
                    row=i+1, col=j+1
                )
    
    fig_matrix.update_layout(
        title='Pairwise Trade-off Matrix',
        height=1400, width=1800,
        showlegend=False
    )
    fig_matrix.show()
    
    # 3. 3D Interactive Plot
    fig_3d = go.Figure(data=[go.Scatter3d(
        x=display_obj[:, 0],
        y=display_obj[:, 1],
        z=display_obj[:, 2],
        mode='markers',
        marker=dict(
            size=display_obj[:, 3] / 10,  # Size by equity
            color=display_obj[:, 3],
            colorscale='RdYlGn',
            colorbar=dict(title='Equity (%)'),
            opacity=0.8
        ),
        text=[f'VMT: {v:.1f}%<br>Time: {t:.1f}min<br>Cost: ${c:.2f}<br>Equity: {e:.1f}%'
              for v, t, c, e in display_obj],
        hoverinfo='text'
    )])
    
    fig_3d.update_layout(
        title='3D Pareto Front (size/color = Equity)',
        scene=dict(
            xaxis_title='VMT Reduction (%)',
            yaxis_title='Travel Time (min)',
            zaxis_title='Cost ($/VMT)'
        ),
        height=600
    )
    fig_3d.show()
    
    return display_obj

display_objectives = visualize_pareto_front(pareto_solutions, pareto_objectives)

## 6. Solution Selection & Analysis

In [12]:
def find_knee_point(objectives: np.ndarray) -> int:
    """
    Find the knee point (best balanced solution) using distance to utopia.
    """
    # Utopia point (best value for each objective)
    utopia = objectives.min(axis=0)
    # Nadir point (worst value for each objective)
    nadir = objectives.max(axis=0)
    
    # Normalize objectives
    normalized = (objectives - utopia) / (nadir - utopia + 1e-8)
    
    # Distance to utopia (origin in normalized space)
    distances = np.sqrt((normalized ** 2).sum(axis=1))
    
    return np.argmin(distances)


def analyze_solutions(solutions: np.ndarray, objectives: np.ndarray):
    """
    Analyze and categorize Pareto solutions.
    """
    var_names = [
        'Carpool Reward ($)', 'Pacer Reward ($/mi)', 'Peak Multiplier',
        'Budget Split (carpool)', 'AM Peak Start', 'AM Peak End',
        'PM Peak Start', 'PM Peak End',
        'Zone 1 Weight', 'Zone 2 Weight', 'Zone 3 Weight', 'Zone 4 Weight'
    ]
    
    # Find special solutions
    knee_idx = find_knee_point(objectives)
    vmt_best_idx = np.argmin(objectives[:, 0])
    time_best_idx = np.argmin(objectives[:, 1])
    cost_best_idx = np.argmin(objectives[:, 2])
    equity_best_idx = np.argmin(objectives[:, 3])
    
    # Create comparison table
    special_solutions = {
        'Knee (Balanced)': knee_idx,
        'Best VMT Reduction': vmt_best_idx,
        'Best Travel Time': time_best_idx,
        'Best Cost Efficiency': cost_best_idx,
        'Best Equity': equity_best_idx
    }
    
    print("=" * 80)
    print("PARETO SOLUTION COMPARISON")
    print("=" * 80)
    
    # Objectives comparison
    print("\n--- Objectives ---")
    print(f"{'Solution':<25} {'VMT Red %':>10} {'Time (min)':>12} {'Cost $/VMT':>12} {'Equity %':>10}")
    print("-" * 75)
    
    for name, idx in special_solutions.items():
        obj = objectives[idx]
        vmt_red = (1 - obj[0]) * 100
        equity = (1 - obj[3]) * 100
        print(f"{name:<25} {vmt_red:>10.1f} {obj[1]:>12.1f} {obj[2]:>12.2f} {equity:>10.1f}")
    
    # Decision variables for knee solution
    print("\n--- Knee Point Decision Variables ---")
    knee_sol = solutions[knee_idx]
    for i, (name, val) in enumerate(zip(var_names, knee_sol)):
        print(f"  {name}: {val:.3f}")
    
    return special_solutions, knee_idx

special_solutions, knee_idx = analyze_solutions(pareto_solutions, pareto_objectives)

PARETO SOLUTION COMPARISON

--- Objectives ---
Solution                   VMT Red %   Time (min)   Cost $/VMT   Equity %
---------------------------------------------------------------------------
Knee (Balanced)                 22.4         37.4         0.03       94.5
Best VMT Reduction              22.7         37.4         0.17       95.3
Best Travel Time                22.7         37.4         0.24       98.0
Best Cost Efficiency            22.2         37.5        -0.06       93.1
Best Equity                     22.7         37.4         0.24       98.0

--- Knee Point Decision Variables ---
  Carpool Reward ($): 5.630
  Pacer Reward ($/mi): 0.082
  Peak Multiplier: 1.848
  Budget Split (carpool): 0.044
  AM Peak Start: 7.698
  AM Peak End: 8.641
  PM Peak Start: 17.669
  PM Peak End: 19.107
  Zone 1 Weight: 0.582
  Zone 2 Weight: 0.678
  Zone 3 Weight: 0.641
  Zone 4 Weight: 0.701


In [13]:
# Create solution comparison radar chart
def plot_solution_radar(objectives: np.ndarray, special_solutions: dict):
    """
    Radar chart comparing special solutions.
    """
    categories = ['VMT Reduction', 'Travel Time\n(inverse)', 'Cost Efficiency\n(inverse)', 'Equity']
    
    fig = go.Figure()
    
    colors = ['blue', 'green', 'orange', 'red', 'purple']
    
    for (name, idx), color in zip(special_solutions.items(), colors):
        obj = objectives[idx]
        # Normalize and invert where needed (all should be "higher is better" for radar)
        values = [
            (1 - obj[0]) * 100,  # VMT reduction (higher = better)
            100 - obj[1],         # Inverse travel time (higher = better)
            100 / (obj[2] + 1),   # Inverse cost (higher = better)
            (1 - obj[3]) * 100    # Equity (higher = better)
        ]
        values.append(values[0])  # Close the radar
        
        fig.add_trace(go.Scatterpolar(
            r=values,
            theta=categories + [categories[0]],
            fill='toself',
            name=name,
            opacity=0.6
        ))
    
    fig.update_layout(
        polar=dict(radialaxis=dict(visible=True, range=[0, 100])),
        showlegend=True,
        title='Solution Comparison Radar Chart',
        height=600
    )
    fig.show()

plot_solution_radar(pareto_objectives, special_solutions)

## 7. Validation & Export

In [14]:
def validate_surrogate(model: nn.Module, simulator: BehavioralSimulator,
                       solutions: np.ndarray, n_samples: int = 20) -> pd.DataFrame:
    """
    Validate surrogate predictions against actual simulator.
    """
    # Sample solutions to validate
    indices = np.random.choice(len(solutions), min(n_samples, len(solutions)), replace=False)
    
    results = []
    
    model.eval()
    with torch.no_grad():
        for idx in indices:
            x = solutions[idx]
            
            # Surrogate prediction
            x_tensor = torch.FloatTensor(x).to(DEVICE)
            x_norm = (x_tensor - model.X_mean) / model.X_std
            pred = model(x_norm.unsqueeze(0)).squeeze()
            pred = pred * model.Y_std + model.Y_mean
            pred = pred.cpu().numpy()
            
            # Actual simulator
            actual = simulator.compute_objectives(x)
            
            results.append({
                'solution_idx': idx,
                'pred_vmt': pred[0], 'actual_vmt': actual[0],
                'pred_time': pred[1], 'actual_time': actual[1],
                'pred_cost': pred[2], 'actual_cost': actual[2],
                'pred_equity': pred[3], 'actual_equity': actual[3],
            })
    
    df = pd.DataFrame(results)
    
    # Compute errors
    print("Surrogate Validation Results:")
    print("-" * 50)
    for obj in ['vmt', 'time', 'cost', 'equity']:
        mae = np.abs(df[f'pred_{obj}'] - df[f'actual_{obj}']).mean()
        mape = (np.abs(df[f'pred_{obj}'] - df[f'actual_{obj}']) / (np.abs(df[f'actual_{obj}']) + 1e-8)).mean() * 100
        print(f"  {obj.upper()}: MAE={mae:.4f}, MAPE={mape:.1f}%")
    
    return df

validation_df = validate_surrogate(model, simulator, pareto_solutions)

Surrogate Validation Results:
--------------------------------------------------
  VMT: MAE=0.0009, MAPE=0.1%
  TIME: MAE=0.0160, MAPE=0.0%
  COST: MAE=0.0803, MAPE=97.2%
  EQUITY: MAE=0.0221, MAPE=84.3%


In [15]:
def compute_pareto_metrics(objectives: np.ndarray) -> dict:
    """
    Compute quality metrics for the Pareto front.
    """
    # 1. Hypervolume (approximate using reference point)
    ref_point = objectives.max(axis=0) * 1.1  # 10% beyond worst
    
    # Simple hypervolume approximation (sum of dominated volumes)
    # For exact HV, use pymoo or similar library
    sorted_idx = np.argsort(objectives[:, 0])
    hv_approx = 0
    prev_point = ref_point.copy()
    for idx in sorted_idx:
        point = objectives[idx]
        # Volume contribution
        vol = np.prod(np.maximum(prev_point - point, 0))
        hv_approx += vol
        prev_point = np.minimum(prev_point, point)
    
    # 2. Spacing (uniformity of distribution)
    distances = cdist(objectives, objectives)
    np.fill_diagonal(distances, np.inf)
    min_distances = distances.min(axis=1)
    spacing = np.std(min_distances)
    
    # 3. Spread (coverage of objective space)
    spread = np.prod(objectives.max(axis=0) - objectives.min(axis=0))
    
    metrics = {
        'n_solutions': len(objectives),
        'hypervolume_approx': hv_approx,
        'spacing': spacing,
        'spread': spread,
        'avg_min_distance': min_distances.mean()
    }
    
    print("\nPareto Front Quality Metrics:")
    print("-" * 40)
    for k, v in metrics.items():
        print(f"  {k}: {v:.4f}")
    
    return metrics

pareto_metrics = compute_pareto_metrics(pareto_objectives)


Pareto Front Quality Metrics:
----------------------------------------
  n_solutions: 23.0000
  hypervolume_approx: 0.0017
  spacing: 0.0178
  spread: 0.0000
  avg_min_distance: 0.0220


In [16]:
# Export results
results_dir = Path.cwd().parent / 'results'
results_dir.mkdir(exist_ok=True)

# 1. Save Pareto front solutions
var_names = [
    'carpool_reward', 'pacer_reward', 'peak_multiplier', 'budget_split',
    'morning_start', 'morning_end', 'evening_start', 'evening_end',
    'zone_weight_0', 'zone_weight_1', 'zone_weight_2', 'zone_weight_3'
]
obj_names = ['vmt_1minus_reduction', 'travel_time_min', 'budget_eff_per_vmt', 'inequity']

pareto_df = pd.DataFrame(pareto_solutions, columns=var_names)
for i, name in enumerate(obj_names):
    pareto_df[name] = pareto_objectives[:, i]

# Add derived columns
pareto_df['vmt_reduction_pct'] = (1 - pareto_df['vmt_1minus_reduction']) * 100
pareto_df['equity_pct'] = (1 - pareto_df['inequity']) * 100
pareto_df['is_knee'] = False
pareto_df.loc[knee_idx, 'is_knee'] = True

pareto_df.to_csv(results_dir / 'pareto_front.csv', index=False)
print(f"Saved Pareto front to {results_dir / 'pareto_front.csv'}")

# 2. Save surrogate model
torch.save({
    'model_state_dict': model.state_dict(),
    'X_mean': model.X_mean.cpu(),
    'X_std': model.X_std.cpu(),
    'Y_mean': model.Y_mean.cpu(),
    'Y_std': model.Y_std.cpu(),
    'config': config.__dict__
}, results_dir / 'surrogate_model.pt')
print(f"Saved surrogate model to {results_dir / 'surrogate_model.pt'}")

# 3. Save metrics
metrics_dict = {
    'pareto_metrics': pareto_metrics,
    'training_history': {
        'final_val_loss': history['val_loss'][-1],
        'final_r2': history['val_r2'][-1]
    }
}
with open(results_dir / 'optimization_metrics.json', 'w') as f:
    json.dump(metrics_dict, f, indent=2, default=float)
print(f"Saved metrics to {results_dir / 'optimization_metrics.json'}")

Saved Pareto front to /Users/leoncenshuti/Desktop/portfolio/ihute/results/pareto_front.csv
Saved surrogate model to /Users/leoncenshuti/Desktop/portfolio/ihute/results/surrogate_model.pt
Saved metrics to /Users/leoncenshuti/Desktop/portfolio/ihute/results/optimization_metrics.json


In [17]:
# Final summary
print("\n" + "=" * 80)
print("MULTI-OBJECTIVE OPTIMIZATION COMPLETE")
print("=" * 80)

print("\nSurrogate Model Performance:")
for i, name in enumerate(['VMT', 'Travel Time', 'Budget Efficiency', 'Equity']):
    print(f"  {name}: R² = {history['val_r2'][-1][i]:.4f}")

print(f"\nPareto Front: {len(pareto_solutions)} non-dominated solutions")

print("\nKnee Point (Recommended Balanced Solution):")
knee_obj = pareto_objectives[knee_idx]
print(f"  VMT Reduction: {(1 - knee_obj[0]) * 100:.1f}%")
print(f"  Travel Time: {knee_obj[1]:.1f} minutes")
print(f"  Cost: ${knee_obj[2]:.2f}/VMT")
print(f"  Equity: {(1 - knee_obj[3]) * 100:.1f}%")

print("\nKey Decision Variables (Knee Point):")
knee_sol = pareto_solutions[knee_idx]
print(f"  Carpool Reward: ${knee_sol[0]:.2f}/passenger")
print(f"  Pacer Reward: ${knee_sol[1]:.3f}/mile")
print(f"  Peak Multiplier: {knee_sol[2]:.2f}x")
print(f"  Budget Split (carpool): {knee_sol[3]*100:.1f}%")

print(f"\nOutputs saved to: {results_dir}")


MULTI-OBJECTIVE OPTIMIZATION COMPLETE

Surrogate Model Performance:
  VMT: R² = 0.6774
  Travel Time: R² = 0.6770
  Budget Efficiency: R² = 0.7947
  Equity: R² = 0.7690

Pareto Front: 23 non-dominated solutions

Knee Point (Recommended Balanced Solution):
  VMT Reduction: 22.4%
  Travel Time: 37.4 minutes
  Cost: $0.03/VMT
  Equity: 94.5%

Key Decision Variables (Knee Point):
  Carpool Reward: $5.63/passenger
  Pacer Reward: $0.082/mile
  Peak Multiplier: 1.85x
  Budget Split (carpool): 4.4%

Outputs saved to: /Users/leoncenshuti/Desktop/portfolio/ihute/results


## 8. Full MLX Implementation for Apple Silicon

Complete MLX port of the neural surrogate, EPO solver, and Pareto optimizer for maximum Apple Silicon performance. This implementation provides:

- **Native MLX layers**: Linear, LayerNorm, SiLU activation
- **Full surrogate model**: Multi-headed architecture with uncertainty weighting
- **MLX EPO solver**: Gradient-based Pareto descent direction computation
- **Constrained optimizer**: Augmented Lagrangian with MLX autodiff

In [18]:
# =============================================================================
# 8.1 MLX Layer Implementations
# =============================================================================

if MLX_AVAILABLE:
    print("=" * 70)
    print("FULL MLX IMPLEMENTATION FOR APPLE SILICON")
    print("=" * 70)
    
    class MLXLinear:
        """MLX Linear layer with Xavier initialization."""
        
        def __init__(self, in_features: int, out_features: int, bias: bool = True):
            self.in_features = in_features
            self.out_features = out_features
            
            # Xavier initialization
            scale = np.sqrt(2.0 / (in_features + out_features))
            self.weight = mx.array(
                np.random.randn(out_features, in_features).astype(np.float32) * scale
            )
            self.bias = mx.zeros((out_features,)) if bias else None
        
        def __call__(self, x: mx.array) -> mx.array:
            out = x @ self.weight.T
            if self.bias is not None:
                out = out + self.bias
            return out
        
        def parameters(self) -> List[mx.array]:
            if self.bias is not None:
                return [self.weight, self.bias]
            return [self.weight]
    
    
    class MLXLayerNorm:
        """MLX Layer Normalization."""
        
        def __init__(self, normalized_shape: int, eps: float = 1e-5):
            self.normalized_shape = normalized_shape
            self.eps = eps
            self.weight = mx.ones((normalized_shape,))
            self.bias = mx.zeros((normalized_shape,))
        
        def __call__(self, x: mx.array) -> mx.array:
            mean = mx.mean(x, axis=-1, keepdims=True)
            var = mx.var(x, axis=-1, keepdims=True)
            x_norm = (x - mean) / mx.sqrt(var + self.eps)
            return self.weight * x_norm + self.bias
        
        def parameters(self) -> List[mx.array]:
            return [self.weight, self.bias]
    
    
    def mlx_silu(x: mx.array) -> mx.array:
        """SiLU (Swish) activation: x * sigmoid(x)"""
        return x * mx.sigmoid(x)
    
    
    class MLXDropout:
        """MLX Dropout layer (identity during inference)."""
        
        def __init__(self, p: float = 0.1):
            self.p = p
            self.training = False
        
        def __call__(self, x: mx.array) -> mx.array:
            if not self.training or self.p == 0:
                return x
            # During training, apply dropout mask
            mask = mx.random.uniform(x.shape) > self.p
            return x * mask / (1 - self.p)
        
        def train(self, mode: bool = True):
            self.training = mode
        
        def parameters(self) -> List[mx.array]:
            return []
    
    print("✓ MLX layer implementations created: Linear, LayerNorm, SiLU, Dropout")
else:
    print("MLX not available - skipping MLX implementation")
    print("To enable: pip install mlx")

FULL MLX IMPLEMENTATION FOR APPLE SILICON
✓ MLX layer implementations created: Linear, LayerNorm, SiLU, Dropout


In [19]:
# =============================================================================
# 8.2 MLX Multi-Objective Surrogate Model
# =============================================================================

if MLX_AVAILABLE:
    
    class MLXMultiObjectiveSurrogate:
        """
        Full MLX implementation of the multi-objective surrogate model.
        Architecture: LayerNorm -> Trunk (3 layers) -> 4 Heads
        """
        
        def __init__(self, input_dim: int = 12, hidden_dims: List[int] = [128, 256, 128],
                     n_objectives: int = 4, dropout_rate: float = 0.1):
            self.input_dim = input_dim
            self.hidden_dims = hidden_dims
            self.n_objectives = n_objectives
            
            # Input normalization
            self.input_norm = MLXLayerNorm(input_dim)
            
            # Shared trunk: 3 blocks of Linear + SiLU + Dropout
            self.trunk_layers = []
            prev_dim = input_dim
            for hidden_dim in hidden_dims:
                self.trunk_layers.append({
                    'linear': MLXLinear(prev_dim, hidden_dim),
                    'dropout': MLXDropout(dropout_rate)
                })
                prev_dim = hidden_dim
            
            # Separate heads for each objective
            self.heads = []
            for _ in range(n_objectives):
                self.heads.append({
                    'linear1': MLXLinear(hidden_dims[-1], 64),
                    'linear2': MLXLinear(64, 1)
                })
            
            # Learnable uncertainty weights (log variance)
            self.log_vars = mx.zeros((n_objectives,))
            
            # Normalization parameters (set after loading weights)
            self.X_mean = None
            self.X_std = None
            self.Y_mean = None
            self.Y_std = None
            
            self.training = False
        
        def __call__(self, x: mx.array) -> mx.array:
            """Forward pass through the network."""
            # Input normalization
            x = self.input_norm(x)
            
            # Trunk forward pass
            for layer in self.trunk_layers:
                x = layer['linear'](x)
                x = mlx_silu(x)
                if self.training:
                    x = layer['dropout'](x)
            
            # Head forward passes
            outputs = []
            for head in self.heads:
                h = head['linear1'](x)
                h = mlx_silu(h)
                h = head['linear2'](h)
                outputs.append(h)
            
            # Concatenate outputs: (batch, n_objectives)
            return mx.concatenate(outputs, axis=-1)
        
        def predict(self, x: mx.array) -> mx.array:
            """Predict with normalization applied."""
            # Normalize input
            x_norm = (x - self.X_mean) / self.X_std
            # Forward pass
            y_norm = self(x_norm)
            # Denormalize output
            return y_norm * self.Y_std + self.Y_mean
        
        def parameters(self) -> List[mx.array]:
            """Get all trainable parameters."""
            params = []
            params.extend(self.input_norm.parameters())
            for layer in self.trunk_layers:
                params.extend(layer['linear'].parameters())
            for head in self.heads:
                params.extend(head['linear1'].parameters())
                params.extend(head['linear2'].parameters())
            params.append(self.log_vars)
            return params
        
        def train(self, mode: bool = True):
            """Set training mode."""
            self.training = mode
            for layer in self.trunk_layers:
                layer['dropout'].train(mode)
        
        def eval(self):
            """Set evaluation mode."""
            self.train(False)
    
    print("✓ MLXMultiObjectiveSurrogate model class created")

✓ MLXMultiObjectiveSurrogate model class created


In [20]:
# =============================================================================
# 8.3 PyTorch to MLX Weight Conversion
# =============================================================================

if MLX_AVAILABLE:
    
    def convert_pytorch_to_mlx(pytorch_model: nn.Module, 
                                mlx_model: 'MLXMultiObjectiveSurrogate') -> None:
        """
        Convert trained PyTorch weights to MLX model.
        Maps PyTorch state dict to MLX layer structure.
        """
        state_dict = pytorch_model.state_dict()
        
        # Input LayerNorm
        mlx_model.input_norm.weight = mx.array(state_dict['input_norm.weight'].cpu().numpy())
        mlx_model.input_norm.bias = mx.array(state_dict['input_norm.bias'].cpu().numpy())
        
        # Trunk layers (3 blocks: Linear at indices 0, 3, 6)
        trunk_linear_indices = [0, 3, 6]
        for i, idx in enumerate(trunk_linear_indices):
            mlx_model.trunk_layers[i]['linear'].weight = mx.array(
                state_dict[f'trunk.{idx}.weight'].cpu().numpy()
            )
            mlx_model.trunk_layers[i]['linear'].bias = mx.array(
                state_dict[f'trunk.{idx}.bias'].cpu().numpy()
            )
        
        # Heads (4 objectives, each with 2 linear layers at indices 0, 2)
        for i in range(4):
            mlx_model.heads[i]['linear1'].weight = mx.array(
                state_dict[f'heads.{i}.0.weight'].cpu().numpy()
            )
            mlx_model.heads[i]['linear1'].bias = mx.array(
                state_dict[f'heads.{i}.0.bias'].cpu().numpy()
            )
            mlx_model.heads[i]['linear2'].weight = mx.array(
                state_dict[f'heads.{i}.2.weight'].cpu().numpy()
            )
            mlx_model.heads[i]['linear2'].bias = mx.array(
                state_dict[f'heads.{i}.2.bias'].cpu().numpy()
            )
        
        # Log variance parameters
        mlx_model.log_vars = mx.array(state_dict['log_vars'].cpu().numpy())
        
        # Normalization parameters
        mlx_model.X_mean = mx.array(pytorch_model.X_mean.cpu().numpy())
        mlx_model.X_std = mx.array(pytorch_model.X_std.cpu().numpy())
        mlx_model.Y_mean = mx.array(pytorch_model.Y_mean.cpu().numpy())
        mlx_model.Y_std = mx.array(pytorch_model.Y_std.cpu().numpy())
        
        print("✓ PyTorch weights converted to MLX")
    
    
    def verify_conversion(pytorch_model: nn.Module, 
                          mlx_model: 'MLXMultiObjectiveSurrogate',
                          n_tests: int = 10) -> float:
        """
        Verify MLX model produces same outputs as PyTorch model.
        Returns mean absolute error between outputs.
        """
        pytorch_model.eval()
        mlx_model.eval()
        
        errors = []
        for _ in range(n_tests):
            # Random test input
            x_np = np.random.randn(32, 12).astype(np.float32)
            
            # PyTorch prediction
            with torch.no_grad():
                x_torch = torch.FloatTensor(x_np).to(DEVICE)
                x_norm = (x_torch - pytorch_model.X_mean) / pytorch_model.X_std
                y_torch = pytorch_model(x_norm)
                y_torch = y_torch * pytorch_model.Y_std + pytorch_model.Y_mean
                y_torch = y_torch.cpu().numpy()
            
            # MLX prediction
            x_mlx = mx.array(x_np)
            y_mlx = mlx_model.predict(x_mlx)
            mx.eval(y_mlx)  # Force evaluation
            y_mlx = np.array(y_mlx)
            
            # Compute error
            error = np.abs(y_torch - y_mlx).mean()
            errors.append(error)
        
        mean_error = np.mean(errors)
        print(f"✓ Conversion verification: Mean absolute error = {mean_error:.6f}")
        return mean_error
    
    # Create and convert MLX model
    print("\n--- Converting PyTorch Model to MLX ---")
    mlx_surrogate = MLXMultiObjectiveSurrogate(
        input_dim=12,
        hidden_dims=config.hidden_dims,
        n_objectives=4,
        dropout_rate=config.dropout_rate
    )
    
    convert_pytorch_to_mlx(model, mlx_surrogate)
    conversion_error = verify_conversion(model, mlx_surrogate)


--- Converting PyTorch Model to MLX ---
✓ PyTorch weights converted to MLX
✓ Conversion verification: Mean absolute error = 0.000000


In [21]:
# =============================================================================
# 8.4 MLX EPO (Exact Pareto Optimal) Solver
# =============================================================================

if MLX_AVAILABLE:
    
    class MLXEPOSolver:
        """
        MLX-native EPO solver for multi-objective optimization.
        Uses MLX operations for gradient-based Pareto descent direction computation.
        """
        
        def __init__(self, n_objectives: int = 4):
            self.n_objectives = n_objectives
        
        def get_epo_direction(self, gradients: mx.array, 
                              preference: Optional[mx.array] = None) -> mx.array:
            """
            Compute EPO descent direction using MLX operations.
            
            Args:
                gradients: Array of shape (n_objectives, n_params)
                preference: Optional preference weights
            
            Returns:
                direction: Descent direction of shape (n_params,)
            """
            n_obj = gradients.shape[0]
            
            if preference is None:
                preference = mx.ones((n_obj,)) / n_obj
            
            # Normalize gradients
            grad_norms = mx.sqrt(mx.sum(gradients ** 2, axis=1, keepdims=True)) + 1e-8
            normalized_grads = gradients / grad_norms
            
            # Compute Gram matrix
            G = normalized_grads @ normalized_grads.T
            
            # Softmax of preference as initial weights
            exp_pref = mx.exp(preference * 10)
            weights = exp_pref / mx.sum(exp_pref)
            
            # Refine weights with gradient steps (projected gradient descent)
            for _ in range(5):
                grad_w = G @ weights
                weights = weights - 0.1 * grad_w
                weights = mx.maximum(weights, 0.01)
                weights = weights / mx.sum(weights)
            
            # Compute descent direction
            direction = weights @ gradients
            
            return direction
        
        def compute_gradients_via_finite_diff(self, 
                                               surrogate: 'MLXMultiObjectiveSurrogate',
                                               x: mx.array,
                                               eps: float = 1e-4) -> mx.array:
            """
            Compute gradients for all objectives using finite differences.
            MLX autodiff alternative for when grad() is complex.
            
            Args:
                surrogate: MLX surrogate model
                x: Decision variables of shape (n_params,)
                eps: Finite difference epsilon
            
            Returns:
                gradients: Array of shape (n_objectives, n_params)
            """
            n_params = x.shape[0]
            n_objectives = surrogate.n_objectives
            
            # Base prediction
            y_base = surrogate.predict(x.reshape(1, -1)).reshape(-1)
            mx.eval(y_base)
            
            gradients = mx.zeros((n_objectives, n_params))
            
            for i in range(n_params):
                # Perturb parameter i
                x_plus = x.tolist()
                x_plus[i] += eps
                x_plus = mx.array(x_plus)
                
                # Forward difference
                y_plus = surrogate.predict(x_plus.reshape(1, -1)).reshape(-1)
                mx.eval(y_plus)
                
                # Gradient approximation
                grad_i = (y_plus - y_base) / eps
                
                # Update gradients array (need to rebuild since MLX arrays are immutable)
                gradients_list = np.array(gradients)
                gradients_list[:, i] = np.array(grad_i)
                gradients = mx.array(gradients_list)
            
            return gradients
    
    print("✓ MLXEPOSolver created with finite difference gradient computation")

✓ MLXEPOSolver created with finite difference gradient computation


In [22]:
# =============================================================================
# 8.5 MLX Constrained Pareto Optimizer
# =============================================================================

if MLX_AVAILABLE:
    
    class MLXConstrainedParetoOptimizer:
        """
        Full MLX implementation of the constrained Pareto optimizer.
        Uses MLX arrays and operations throughout for Apple Silicon acceleration.
        """
        
        def __init__(self, surrogate: 'MLXMultiObjectiveSurrogate', config: OptimizationConfig):
            self.surrogate = surrogate
            self.config = config
            self.epo = MLXEPOSolver(n_objectives=4)
            
            # Decision variable bounds
            self.bounds_low = mx.array([
                config.carpool_reward_range[0],
                config.pacer_reward_range[0],
                config.peak_multiplier_range[0],
                config.budget_split_range[0],
                config.morning_peak_start_range[0],
                config.morning_peak_end_range[0],
                config.evening_peak_start_range[0],
                config.evening_peak_end_range[0],
                0.1, 0.1, 0.1, 0.1
            ], dtype=mx.float32)
            
            self.bounds_high = mx.array([
                config.carpool_reward_range[1],
                config.pacer_reward_range[1],
                config.peak_multiplier_range[1],
                config.budget_split_range[1],
                config.morning_peak_start_range[1],
                config.morning_peak_end_range[1],
                config.evening_peak_start_range[1],
                config.evening_peak_end_range[1],
                0.9, 0.9, 0.9, 0.9
            ], dtype=mx.float32)
        
        def compute_constraints(self, x: mx.array) -> mx.array:
            """Compute constraint violations using MLX operations."""
            x_np = np.array(x)
            
            carpool_reward = x_np[0]
            pacer_reward = x_np[1]
            peak_mult = x_np[2]
            budget_split = x_np[3]
            
            # Budget constraint
            estimated_spending = (
                budget_split * carpool_reward * peak_mult * 2000 +
                (1 - budget_split) * pacer_reward * peak_mult * 50000
            )
            budget_violation = (estimated_spending - self.config.daily_budget) / self.config.daily_budget
            
            # Zone weight constraints
            zone_weights = x_np[8:12]
            zone_weights_norm = zone_weights / (zone_weights.sum() + 1e-8)
            zone_violations = self.config.min_zone_allocation - zone_weights_norm
            
            constraints = np.concatenate([[budget_violation], zone_violations])
            return mx.array(constraints, dtype=mx.float32)
        
        def optimize_single(self, x_init: mx.array,
                           preference: Optional[mx.array] = None,
                           n_iterations: int = 100,
                           learning_rate: float = 0.01) -> Tuple[mx.array, mx.array]:
            """
            Optimize from a single starting point using MLX operations.
            """
            x = np.array(x_init).copy()
            
            # Lagrange multipliers
            lambdas = np.zeros(5, dtype=np.float32)
            mu = 1.0
            
            for iteration in range(n_iterations):
                x_mlx = mx.array(x, dtype=mx.float32)
                
                # Compute gradients via finite differences
                gradients = self.epo.compute_gradients_via_finite_diff(self.surrogate, x_mlx)
                mx.eval(gradients)
                gradients_np = np.array(gradients)
                
                # EPO direction
                pref_mlx = preference if preference is not None else mx.ones((4,)) / 4
                epo_direction = self.epo.get_epo_direction(mx.array(gradients_np), pref_mlx)
                mx.eval(epo_direction)
                direction = np.array(epo_direction)
                
                # Constraint penalty gradient (approximate)
                constraints = self.compute_constraints(x_mlx)
                mx.eval(constraints)
                constraints_np = np.array(constraints)
                
                violated = np.maximum(constraints_np, 0)
                constraint_penalty = lambdas @ constraints_np + (mu / 2) * (violated ** 2).sum()
                
                # Update with EPO direction plus constraint correction
                x = x - learning_rate * direction
                
                # Add constraint gradient (approximate via finite differences on constraint)
                if constraint_penalty > 0:
                    for i in range(len(x)):
                        x_plus = x.copy()
                        x_plus[i] += 1e-4
                        c_plus = np.array(self.compute_constraints(mx.array(x_plus)))
                        c_grad = (c_plus - constraints_np) / 1e-4
                        x[i] -= learning_rate * 0.5 * (lambdas @ c_grad + mu * violated @ c_grad)
                
                # Project to bounds
                x = np.clip(x, np.array(self.bounds_low), np.array(self.bounds_high))
                
                # Update Lagrange multipliers
                if (iteration + 1) % 10 == 0:
                    lambdas = lambdas + mu * np.maximum(constraints_np, 0)
                    mu = min(mu * 1.1, 100)
            
            # Final evaluation
            x_final = mx.array(x, dtype=mx.float32)
            objectives = self.surrogate.predict(x_final.reshape(1, -1)).reshape(-1)
            mx.eval(objectives)
            
            return x_final, objectives
        
        def find_pareto_front(self, n_starts: int = 50) -> Tuple[np.ndarray, np.ndarray]:
            """
            Find Pareto front using MLX-accelerated optimization.
            """
            all_solutions = []
            all_objectives = []
            
            print(f"MLX Pareto optimization from {n_starts} starting points...")
            
            for i in range(n_starts):
                # Dirichlet preference
                pref = np.random.dirichlet(np.ones(4) * 0.5)
                pref_mlx = mx.array(pref, dtype=mx.float32)
                
                # Random starting point
                x_init = np.array(self.bounds_low) + np.random.rand(12) * (
                    np.array(self.bounds_high) - np.array(self.bounds_low)
                )
                x_init = mx.array(x_init.astype(np.float32))
                
                x_final, objectives = self.optimize_single(
                    x_init, pref_mlx, self.config.n_iterations, self.config.learning_rate
                )
                
                all_solutions.append(np.array(x_final))
                all_objectives.append(np.array(objectives))
                
                if (i + 1) % 10 == 0:
                    print(f"  Completed {i + 1}/{n_starts}")
            
            all_solutions = np.array(all_solutions)
            all_objectives = np.array(all_objectives)
            
            # Filter dominated solutions
            pareto_mask = self._get_pareto_mask(all_objectives)
            
            print(f"✓ MLX Pareto front: {pareto_mask.sum()} non-dominated solutions")
            
            return all_solutions[pareto_mask], all_objectives[pareto_mask]
        
        def _get_pareto_mask(self, objectives: np.ndarray) -> np.ndarray:
            """Find non-dominated solutions."""
            n = len(objectives)
            is_pareto = np.ones(n, dtype=bool)
            
            for i in range(n):
                if not is_pareto[i]:
                    continue
                for j in range(n):
                    if i == j or not is_pareto[j]:
                        continue
                    if np.all(objectives[j] <= objectives[i]) and np.any(objectives[j] < objectives[i]):
                        is_pareto[i] = False
                        break
            
            return is_pareto
    
    # Initialize MLX optimizer
    mlx_optimizer = MLXConstrainedParetoOptimizer(mlx_surrogate, config)
    print("✓ MLXConstrainedParetoOptimizer initialized")

✓ MLXConstrainedParetoOptimizer initialized


In [23]:
# =============================================================================
# 8.6 Comprehensive PyTorch vs MLX Benchmarks
# =============================================================================

if MLX_AVAILABLE:
    import time
    
    print("=" * 70)
    print("PERFORMANCE BENCHMARK: PyTorch (MPS) vs MLX")
    print("=" * 70)
    
    def benchmark_inference(n_iterations: int = 100, batch_sizes: List[int] = [1, 32, 128, 512]) -> pd.DataFrame:
        """
        Benchmark inference speed for both backends.
        """
        results = []
        
        for batch_size in batch_sizes:
            # Generate test data
            test_input = np.random.randn(batch_size, 12).astype(np.float32)
            
            # --- PyTorch (MPS) Benchmark ---
            model.eval()
            x_torch = torch.FloatTensor(test_input).to(DEVICE)
            
            # Warmup
            with torch.no_grad():
                for _ in range(10):
                    _ = model((x_torch - model.X_mean) / model.X_std)
            
            # Timed runs
            if DEVICE.type == 'mps':
                torch.mps.synchronize()
            
            start = time.perf_counter()
            with torch.no_grad():
                for _ in range(n_iterations):
                    x_norm = (x_torch - model.X_mean) / model.X_std
                    _ = model(x_norm)
            
            if DEVICE.type == 'mps':
                torch.mps.synchronize()
            pytorch_time = (time.perf_counter() - start) / n_iterations * 1000  # ms
            
            # --- MLX Benchmark ---
            mlx_surrogate.eval()
            x_mlx = mx.array(test_input)
            
            # Warmup
            for _ in range(10):
                y = mlx_surrogate.predict(x_mlx)
                mx.eval(y)
            
            # Timed runs
            start = time.perf_counter()
            for _ in range(n_iterations):
                y = mlx_surrogate.predict(x_mlx)
                mx.eval(y)
            mlx_time = (time.perf_counter() - start) / n_iterations * 1000  # ms
            
            results.append({
                'batch_size': batch_size,
                'pytorch_mps_ms': pytorch_time,
                'mlx_ms': mlx_time,
                'speedup': pytorch_time / mlx_time if mlx_time > 0 else 0
            })
            
            print(f"Batch {batch_size:>4}: PyTorch={pytorch_time:.3f}ms, MLX={mlx_time:.3f}ms, "
                  f"Speedup={results[-1]['speedup']:.2f}x")
        
        return pd.DataFrame(results)
    
    print("\n--- Inference Benchmark ---")
    inference_results = benchmark_inference(n_iterations=100)
    
    def benchmark_optimization(n_starts: int = 10) -> dict:
        """
        Benchmark full Pareto optimization for both backends.
        """
        print(f"\n--- Optimization Benchmark ({n_starts} starts) ---")
        
        # PyTorch optimization
        print("Running PyTorch optimization...")
        start = time.perf_counter()
        pytorch_solutions, pytorch_objectives = optimizer.find_pareto_front(n_starts=n_starts)
        pytorch_time = time.perf_counter() - start
        
        # MLX optimization
        print("\nRunning MLX optimization...")
        start = time.perf_counter()
        mlx_solutions, mlx_objectives = mlx_optimizer.find_pareto_front(n_starts=n_starts)
        mlx_time = time.perf_counter() - start
        
        return {
            'pytorch_time_s': pytorch_time,
            'mlx_time_s': mlx_time,
            'pytorch_solutions': len(pytorch_solutions),
            'mlx_solutions': len(mlx_solutions),
            'speedup': pytorch_time / mlx_time if mlx_time > 0 else 0
        }
    
    opt_results = benchmark_optimization(n_starts=10)
    
    print("\n" + "=" * 70)
    print("BENCHMARK SUMMARY")
    print("=" * 70)
    print("\nInference (avg across batch sizes):")
    print(f"  PyTorch (MPS): {inference_results['pytorch_mps_ms'].mean():.3f}ms")
    print(f"  MLX:           {inference_results['mlx_ms'].mean():.3f}ms")
    print(f"  Avg Speedup:   {inference_results['speedup'].mean():.2f}x")
    
    print(f"\nPareto Optimization ({10} starts):")
    print(f"  PyTorch (MPS): {opt_results['pytorch_time_s']:.2f}s ({opt_results['pytorch_solutions']} solutions)")
    print(f"  MLX:           {opt_results['mlx_time_s']:.2f}s ({opt_results['mlx_solutions']} solutions)")
    print(f"  Speedup:       {opt_results['speedup']:.2f}x")
    
    # Visualize benchmark results
    fig = make_subplots(rows=1, cols=2, subplot_titles=['Inference Latency by Batch Size', 'Optimization Time'])
    
    # Inference comparison
    fig.add_trace(
        go.Bar(name='PyTorch (MPS)', x=[str(b) for b in inference_results['batch_size']], 
               y=inference_results['pytorch_mps_ms'], marker_color='blue'),
        row=1, col=1
    )
    fig.add_trace(
        go.Bar(name='MLX', x=[str(b) for b in inference_results['batch_size']], 
               y=inference_results['mlx_ms'], marker_color='green'),
        row=1, col=1
    )
    
    # Optimization comparison
    fig.add_trace(
        go.Bar(name='PyTorch', x=['Optimization'], y=[opt_results['pytorch_time_s']], 
               marker_color='blue', showlegend=False),
        row=1, col=2
    )
    fig.add_trace(
        go.Bar(name='MLX', x=['Optimization'], y=[opt_results['mlx_time_s']], 
               marker_color='green', showlegend=False),
        row=1, col=2
    )
    
    fig.update_layout(
        title='PyTorch (MPS) vs MLX Performance Comparison',
        height=800,
        barmode='group'
    )
    fig.update_yaxes(title_text='Latency (ms)', row=1, col=1)
    fig.update_yaxes(title_text='Time (s)', row=1, col=2)
    fig.update_xaxes(title_text='Batch Size', row=1, col=1)
    fig.show()

PERFORMANCE BENCHMARK: PyTorch (MPS) vs MLX

--- Inference Benchmark ---
Batch    1: PyTorch=1.185ms, MLX=0.506ms, Speedup=2.34x
Batch   32: PyTorch=0.663ms, MLX=0.576ms, Speedup=1.15x
Batch  128: PyTorch=0.691ms, MLX=0.660ms, Speedup=1.05x
Batch  512: PyTorch=0.705ms, MLX=0.962ms, Speedup=0.73x

--- Optimization Benchmark (10 starts) ---
Running PyTorch optimization...
Optimizing from 10 starting points...
  Completed 10/10 optimizations

Found 8 non-dominated solutions out of 10

Running MLX optimization...
MLX Pareto optimization from 10 starting points...
  Completed 10/10
✓ MLX Pareto front: 6 non-dominated solutions

BENCHMARK SUMMARY

Inference (avg across batch sizes):
  PyTorch (MPS): 0.811ms
  MLX:           0.676ms
  Avg Speedup:   1.32x

Pareto Optimization (10 starts):
  PyTorch (MPS): 4.37s (8 solutions)
  MLX:           7.53s (6 solutions)
  Speedup:       0.58x


In [24]:
# =============================================================================
# 8.7 MLX Model Export & Summary
# =============================================================================

if MLX_AVAILABLE:
    
    def save_mlx_model(mlx_model: 'MLXMultiObjectiveSurrogate', 
                       path: Path) -> None:
        """
        Save MLX model weights and normalization parameters.
        """
        weights = {
            'input_norm': {
                'weight': np.array(mlx_model.input_norm.weight),
                'bias': np.array(mlx_model.input_norm.bias)
            },
            'trunk': [],
            'heads': [],
            'log_vars': np.array(mlx_model.log_vars),
            'X_mean': np.array(mlx_model.X_mean),
            'X_std': np.array(mlx_model.X_std),
            'Y_mean': np.array(mlx_model.Y_mean),
            'Y_std': np.array(mlx_model.Y_std)
        }
        
        for layer in mlx_model.trunk_layers:
            weights['trunk'].append({
                'weight': np.array(layer['linear'].weight),
                'bias': np.array(layer['linear'].bias)
            })
        
        for head in mlx_model.heads:
            weights['heads'].append({
                'linear1': {
                    'weight': np.array(head['linear1'].weight),
                    'bias': np.array(head['linear1'].bias)
                },
                'linear2': {
                    'weight': np.array(head['linear2'].weight),
                    'bias': np.array(head['linear2'].bias)
                }
            })
        
        np.save(path, weights, allow_pickle=True)
        print(f"✓ MLX model saved to {path}")
    
    # Save MLX model
    mlx_model_path = results_dir / 'mlx_surrogate_model.npy'
    save_mlx_model(mlx_surrogate, mlx_model_path)
    
    # Final MLX summary
    print("\n" + "=" * 70)
    print("MLX IMPLEMENTATION SUMMARY")
    print("=" * 70)
    
    print("\n✓ Components Implemented:")
    print("  • MLXLinear - Fully connected layer with Xavier init")
    print("  • MLXLayerNorm - Layer normalization")
    print("  • mlx_silu - SiLU/Swish activation function")
    print("  • MLXDropout - Dropout layer (inference mode)")
    print("  • MLXMultiObjectiveSurrogate - Full multi-headed architecture")
    print("  • MLXEPOSolver - EPO with finite difference gradients")
    print("  • MLXConstrainedParetoOptimizer - Full Pareto optimizer")
    
    print(f"\n✓ Model Conversion:")
    print(f"  • PyTorch → MLX weight transfer: {conversion_error:.6f} MAE")
    
    print(f"\n✓ Outputs:")
    print(f"  • PyTorch model: {results_dir / 'surrogate_model.pt'}")
    print(f"  • MLX model: {mlx_model_path}")
    print(f"  • Pareto front: {results_dir / 'pareto_front.csv'}")
    
    print("\n✓ Usage Example:")
    print("  ```python")
    print("  # Load MLX model")
    print("  weights = np.load('results/mlx_surrogate_model.npy', allow_pickle=True).item()")
    print("  mlx_model = MLXMultiObjectiveSurrogate()")
    print("  # ... restore weights ...")
    print("  ")
    print("  # Fast inference")
    print("  x = mx.array(decision_vars)")
    print("  objectives = mlx_model.predict(x)")
    print("  ```")

✓ MLX model saved to /Users/leoncenshuti/Desktop/portfolio/ihute/results/mlx_surrogate_model.npy

MLX IMPLEMENTATION SUMMARY

✓ Components Implemented:
  • MLXLinear - Fully connected layer with Xavier init
  • MLXLayerNorm - Layer normalization
  • mlx_silu - SiLU/Swish activation function
  • MLXDropout - Dropout layer (inference mode)
  • MLXMultiObjectiveSurrogate - Full multi-headed architecture
  • MLXEPOSolver - EPO with finite difference gradients
  • MLXConstrainedParetoOptimizer - Full Pareto optimizer

✓ Model Conversion:
  • PyTorch → MLX weight transfer: 0.000000 MAE

✓ Outputs:
  • PyTorch model: /Users/leoncenshuti/Desktop/portfolio/ihute/results/surrogate_model.pt
  • MLX model: /Users/leoncenshuti/Desktop/portfolio/ihute/results/mlx_surrogate_model.npy
  • Pareto front: /Users/leoncenshuti/Desktop/portfolio/ihute/results/pareto_front.csv

✓ Usage Example:
  ```python
  # Load MLX model
  weights = np.load('results/mlx_surrogate_model.npy', allow_pickle=True).item()
  m

---

# Part II: Investor-Ready Analysis & Interactive Decision Support

## Executive Summary for Strategic Decision-Making

### Key Findings

**Business Outcomes (Non-Technical)**
- **VMT Reduction**: 18-23% reduction in vehicle-miles-traveled achievable across I-24 corridor, translating to ~15,000-20,000 fewer vehicle-miles daily
- **Cost Efficiency**: Optimal configurations achieve $0.00-0.25 per VMT reduced, competitive with traditional infrastructure investments ($0.50-2.00/VMT)
- **Equity Achievement**: 92-98% equity scores across all 4 corridor zones with zone-targeted incentive allocation
- **Commuter Time Savings**: 0.5-1.0 minute average reduction per trip during peak hours, ~$2.5M annual value at $25/hr VOT
- **Budget Utilization**: Recommended knee solution operates at 15-20% carpool allocation, maximizing pacer program ROI

**Recommended Configuration (Knee Point)**
- Carpool Reward: $5.94/passenger | Pacer Reward: $0.13/mile | Peak Multiplier: 2.05x
- Expected Outcomes: 22.4% VMT reduction | 94.4% equity | $0.00/VMT cost efficiency
- Trade-off Position: Balanced solution minimizing distance to ideal across all four objectives

**Technical Validation**
- Surrogate Model R²: 0.68-0.79 across objectives; validated against behavioral simulator
- Pareto Front: 24 non-dominated solutions from 50 optimization starts
- Constraint Satisfaction: All solutions satisfy $50K daily budget and 5% minimum zone allocation

**Risk Factors & Assumptions**
1. Behavioral parameters calibrated on historical Hytch data (369K trips); real-world response may vary ±20%
2. Equity metric based on zone-level benefit distribution; individual-level equity not modeled
3. Infrastructure capacity assumed constant; congestion feedback not dynamically coupled

---

## 9. Investor Analysis - Additional Setup

In [25]:
# =============================================================================
# 9.1 Additional Imports for Investor Analysis
# =============================================================================
# Rationale: Extend visualization and interactivity capabilities while maintaining
# compatibility with existing codebase. Gradio enables embedded interactive apps.

import gradio as gr
from functools import lru_cache
from itertools import combinations
import io
import base64

# Optional: SALib for global sensitivity analysis (Sobol indices)
SALIB_AVAILABLE = False
try:
    from SALib.sample import saltelli
    from SALib.analyze import sobol
    SALIB_AVAILABLE = True
    print("SALib available - Global sensitivity analysis enabled")
except ImportError:
    print("SALib not available - Using local sensitivity only")
    print("To enable: pip install SALib")

# Color schemes for consistent visualization
COLORS = {
    'primary': '#2E86AB',
    'secondary': '#A23B72', 
    'tertiary': '#F18F01',
    'quaternary': '#C73E1D',
    'success': '#3A7D44',
    'neutral': '#6C757D',
    'knee': '#FFD700',
    'pareto': '#4ECDC4'
}

# Objective display configuration
OBJ_CONFIG = {
    'vmt': {'name': 'VMT Reduction', 'unit': '%', 'transform': lambda x: (1 - x) * 100, 'better': 'higher'},
    'time': {'name': 'Travel Time', 'unit': 'min', 'transform': lambda x: x, 'better': 'lower'},
    'cost': {'name': 'Cost Efficiency', 'unit': '$/VMT', 'transform': lambda x: x, 'better': 'lower'},
    'equity': {'name': 'Equity', 'unit': '%', 'transform': lambda x: (1 - x) * 100, 'better': 'higher'}
}

# Decision variable names and configuration
VAR_NAMES = [
    'carpool_reward', 'pacer_reward', 'peak_multiplier', 'budget_split',
    'morning_start', 'morning_end', 'evening_start', 'evening_end',
    'zone_weight_0', 'zone_weight_1', 'zone_weight_2', 'zone_weight_3'
]

VAR_DISPLAY = {
    'carpool_reward': ('Carpool Reward', '$/passenger'),
    'pacer_reward': ('Pacer Reward', '$/mile'),
    'peak_multiplier': ('Peak Multiplier', 'x'),
    'budget_split': ('Budget Split (Carpool)', 'fraction'),
    'morning_start': ('AM Peak Start', 'hour'),
    'morning_end': ('AM Peak End', 'hour'),
    'evening_start': ('PM Peak Start', 'hour'),
    'evening_end': ('PM Peak End', 'hour'),
    'zone_weight_0': ('Zone 1 Weight', ''),
    'zone_weight_1': ('Zone 2 Weight', ''),
    'zone_weight_2': ('Zone 3 Weight', ''),
    'zone_weight_3': ('Zone 4 Weight', ''),
}

# Variable bounds from config
VAR_BOUNDS = {
    'carpool_reward': config.carpool_reward_range,
    'pacer_reward': config.pacer_reward_range,
    'peak_multiplier': config.peak_multiplier_range,
    'budget_split': config.budget_split_range,
    'morning_start': config.morning_peak_start_range,
    'morning_end': config.morning_peak_end_range,
    'evening_start': config.evening_peak_start_range,
    'evening_end': config.evening_peak_end_range,
    'zone_weight_0': (0.1, 0.9),
    'zone_weight_1': (0.1, 0.9),
    'zone_weight_2': (0.1, 0.9),
    'zone_weight_3': (0.1, 0.9),
}

print("Investor analysis configuration complete")

SALib available - Global sensitivity analysis enabled
Investor analysis configuration complete


## 10. Diagnostics Utilities

Comprehensive model diagnostics including residual analysis, feature importance, and partial dependence plots.

In [26]:
# =============================================================================
# 10.1 Surrogate Model Diagnostics
# =============================================================================
# Rationale: Investors need confidence in model reliability. Residual plots,
# feature importance, and partial dependence provide interpretability.

class SurrogateDiagnostics:
    """Comprehensive diagnostics for surrogate model validation."""
    
    def __init__(self, model: nn.Module, simulator: BehavioralSimulator,
                 X_data: np.ndarray, Y_data: np.ndarray):
        self.model = model
        self.simulator = simulator
        self.X_data = X_data
        self.Y_data = Y_data
        self.obj_names = ['VMT (1-red)', 'Travel Time', 'Cost $/VMT', 'Inequity']
        
    def get_predictions(self, X: np.ndarray) -> np.ndarray:
        """Get surrogate predictions for input data."""
        self.model.eval()
        with torch.no_grad():
            X_tensor = torch.FloatTensor(X).to(DEVICE)
            X_norm = (X_tensor - self.model.X_mean) / self.model.X_std
            Y_pred = self.model(X_norm)
            Y_pred = Y_pred * self.model.Y_std + self.model.Y_mean
            return Y_pred.cpu().numpy()
    
    def compute_residuals(self, n_samples: int = 500) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute residuals on validation samples."""
        indices = np.random.choice(len(self.X_data), min(n_samples, len(self.X_data)), replace=False)
        X_val = self.X_data[indices]
        Y_actual = self.Y_data[indices]
        Y_pred = self.get_predictions(X_val)
        residuals = Y_pred - Y_actual
        return Y_actual, Y_pred, residuals
    
    def plot_residuals(self, n_samples: int = 500) -> go.Figure:
        """Create predicted vs actual scatter plots with residual information."""
        Y_actual, Y_pred, residuals = self.compute_residuals(n_samples)
        
        fig = make_subplots(
            rows=2, cols=4,
            subplot_titles=[
                f'{name} - Predicted vs Actual' for name in self.obj_names
            ] + [f'{name} - Residuals' for name in self.obj_names],
            vertical_spacing=0.12
        )
        
        colors = [COLORS['primary'], COLORS['secondary'], COLORS['tertiary'], COLORS['quaternary']]
        
        for i, (name, color) in enumerate(zip(self.obj_names, colors)):
            # Predicted vs Actual
            r2 = 1 - np.sum((Y_actual[:, i] - Y_pred[:, i])**2) / np.sum((Y_actual[:, i] - Y_actual[:, i].mean())**2)
            mae = np.abs(residuals[:, i]).mean()
            
            fig.add_trace(
                go.Scatter(
                    x=Y_actual[:, i], y=Y_pred[:, i],
                    mode='markers', marker=dict(color=color, opacity=0.5, size=6),
                    name=name, showlegend=False,
                    hovertemplate=f'Actual: %{{x:.3f}}<br>Predicted: %{{y:.3f}}'
                ),
                row=1, col=i+1
            )
            # Perfect prediction line
            min_val, max_val = Y_actual[:, i].min(), Y_actual[:, i].max()
            fig.add_trace(
                go.Scatter(
                    x=[min_val, max_val], y=[min_val, max_val],
                    mode='lines', line=dict(color='black', dash='dash'),
                    showlegend=False
                ),
                row=1, col=i+1
            )
            # Add R² and MAE annotation
            fig.add_annotation(
                x=0.05, y=0.95, xref=f'x{i+1} domain', yref=f'y{i+1} domain',
                text=f'R²={r2:.3f}<br>MAE={mae:.4f}',
                showarrow=False, font=dict(size=10),
                bgcolor='white', bordercolor='gray'
            )
            
            # Residual histogram
            fig.add_trace(
                go.Histogram(
                    x=residuals[:, i], nbinsx=30,
                    marker_color=color, opacity=0.7,
                    showlegend=False
                ),
                row=2, col=i+1
            )
        
        fig.update_layout(
            title='Surrogate Model Residual Analysis',
            height=600, width=1200,
            showlegend=False
        )
        
        # Update axis labels
        for i in range(4):
            fig.update_xaxes(title_text='Actual', row=1, col=i+1)
            fig.update_yaxes(title_text='Predicted', row=1, col=i+1)
            fig.update_xaxes(title_text='Residual', row=2, col=i+1)
            fig.update_yaxes(title_text='Count', row=2, col=i+1)
        
        return fig
    
    def permutation_importance(self, n_samples: int = 500, n_repeats: int = 10) -> pd.DataFrame:
        """
        Compute permutation feature importance for each objective.
        Rationale: Permutation importance is model-agnostic and intuitive for
        non-technical stakeholders - shows which variables matter most.
        """
        np.random.seed(SEED)
        indices = np.random.choice(len(self.X_data), min(n_samples, len(self.X_data)), replace=False)
        X_val = self.X_data[indices]
        Y_actual = self.Y_data[indices]
        
        # Baseline predictions
        Y_pred_base = self.get_predictions(X_val)
        base_mse = np.mean((Y_pred_base - Y_actual) ** 2, axis=0)
        
        importance_results = []
        
        for var_idx, var_name in enumerate(VAR_NAMES):
            importances = []
            for _ in range(n_repeats):
                X_permuted = X_val.copy()
                X_permuted[:, var_idx] = np.random.permutation(X_permuted[:, var_idx])
                Y_pred_perm = self.get_predictions(X_permuted)
                perm_mse = np.mean((Y_pred_perm - Y_actual) ** 2, axis=0)
                importances.append(perm_mse - base_mse)
            
            importances = np.array(importances)
            for obj_idx, obj_name in enumerate(self.obj_names):
                importance_results.append({
                    'variable': var_name,
                    'variable_display': VAR_DISPLAY[var_name][0],
                    'objective': obj_name,
                    'importance_mean': importances[:, obj_idx].mean(),
                    'importance_std': importances[:, obj_idx].std()
                })
        
        return pd.DataFrame(importance_results)
    
    def plot_importance(self, importance_df: pd.DataFrame) -> go.Figure:
        """Create bar charts showing feature importance per objective."""
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=self.obj_names,
            vertical_spacing=0.15, horizontal_spacing=0.1
        )
        
        colors = [COLORS['primary'], COLORS['secondary'], COLORS['tertiary'], COLORS['quaternary']]
        
        for idx, obj_name in enumerate(self.obj_names):
            row, col = idx // 2 + 1, idx % 2 + 1
            
            obj_data = importance_df[importance_df['objective'] == obj_name].copy()
            obj_data = obj_data.sort_values('importance_mean', ascending=True)
            
            fig.add_trace(
                go.Bar(
                    y=obj_data['variable_display'],
                    x=obj_data['importance_mean'],
                    orientation='h',
                    marker_color=colors[idx],
                    error_x=dict(type='data', array=obj_data['importance_std']),
                    showlegend=False,
                    hovertemplate='%{y}: %{x:.4f}'
                ),
                row=row, col=col
            )
        
        fig.update_layout(
            title='Permutation Feature Importance by Objective<br><sup>Higher = more influential on prediction accuracy</sup>',
            height=700, width=1000
        )
        
        return fig
    
    def partial_dependence(self, var_name: str, n_points: int = 50, 
                           n_samples: int = 200) -> Tuple[np.ndarray, np.ndarray]:
        """
        Compute partial dependence for a single variable.
        Rationale: PD plots show average effect of a variable on predictions,
        marginalizing over other variables - essential for understanding causality.
        """
        var_idx = VAR_NAMES.index(var_name)
        var_range = np.linspace(VAR_BOUNDS[var_name][0], VAR_BOUNDS[var_name][1], n_points)
        
        # Sample baseline points
        sample_indices = np.random.choice(len(self.X_data), min(n_samples, len(self.X_data)), replace=False)
        X_sample = self.X_data[sample_indices]
        
        pd_values = np.zeros((n_points, 4))
        
        for i, val in enumerate(var_range):
            X_modified = X_sample.copy()
            X_modified[:, var_idx] = val
            predictions = self.get_predictions(X_modified)
            pd_values[i] = predictions.mean(axis=0)
        
        return var_range, pd_values
    
    def plot_partial_dependence(self, variables: List[str] = None) -> go.Figure:
        """Create partial dependence plots for key variables."""
        if variables is None:
            variables = ['carpool_reward', 'pacer_reward', 'peak_multiplier', 'budget_split']
        
        n_vars = len(variables)
        fig = make_subplots(
            rows=n_vars, cols=4,
            subplot_titles=[f'{VAR_DISPLAY[v][0]} → {obj}' 
                           for v in variables for obj in self.obj_names],
            vertical_spacing=0.08, horizontal_spacing=0.05
        )
        
        colors = [COLORS['primary'], COLORS['secondary'], COLORS['tertiary'], COLORS['quaternary']]
        
        for var_idx, var_name in enumerate(variables):
            var_range, pd_values = self.partial_dependence(var_name)
            
            for obj_idx in range(4):
                fig.add_trace(
                    go.Scatter(
                        x=var_range, y=pd_values[:, obj_idx],
                        mode='lines', line=dict(color=colors[obj_idx], width=2),
                        showlegend=False,
                        hovertemplate=f'{VAR_DISPLAY[var_name][0]}: %{{x:.2f}}<br>Value: %{{y:.4f}}'
                    ),
                    row=var_idx + 1, col=obj_idx + 1
                )
        
        fig.update_layout(
            title='Partial Dependence Plots<br><sup>Average effect of each variable on objectives</sup>',
            height=200 * n_vars, width=1200
        )
        
        return fig

# Initialize diagnostics
diagnostics = SurrogateDiagnostics(model, simulator, X_train, Y_train)
print("Surrogate diagnostics initialized")

Surrogate diagnostics initialized


## 11. Constraint Analytics

Analysis of feasibility regions and constraint violation patterns across the decision space.

In [27]:
# =============================================================================
# 11.1 Constraint Analytics Utilities
# =============================================================================
# Rationale: Understanding feasibility boundaries is critical for operational
# deployment - investors need to see where solutions break constraints.

class ConstraintAnalytics:
    """Analyze constraint violations and feasibility regions."""
    
    def __init__(self, config: OptimizationConfig):
        self.config = config
        self.budget = config.daily_budget
        self.min_zone = config.min_zone_allocation
    
    def compute_budget_violation(self, x: np.ndarray) -> float:
        """Compute budget constraint violation for a single point."""
        carpool_reward = x[0]
        pacer_reward = x[1]
        peak_mult = x[2]
        budget_split = x[3]
        
        estimated_spending = (
            budget_split * carpool_reward * peak_mult * 2000 +
            (1 - budget_split) * pacer_reward * peak_mult * 50000
        )
        return max(0, (estimated_spending - self.budget) / self.budget)
    
    def compute_zone_violations(self, x: np.ndarray) -> np.ndarray:
        """Compute zone weight constraint violations."""
        zone_weights = x[8:12]
        zone_weights_norm = zone_weights / (zone_weights.sum() + 1e-8)
        return np.maximum(0, self.min_zone - zone_weights_norm)
    
    def compute_total_violation(self, x: np.ndarray) -> float:
        """Compute total constraint violation."""
        budget_viol = self.compute_budget_violation(x)
        zone_viols = self.compute_zone_violations(x)
        return budget_viol + zone_viols.sum()
    
    def is_feasible(self, x: np.ndarray, tol: float = 0.01) -> bool:
        """Check if a point is feasible."""
        return self.compute_total_violation(x) <= tol
    
    def sample_constraint_space(self, n_samples: int = 2000) -> pd.DataFrame:
        """Sample decision space and compute constraint violations."""
        np.random.seed(SEED)
        
        samples = []
        bounds = np.array([list(VAR_BOUNDS[v]) for v in VAR_NAMES])
        
        for _ in range(n_samples):
            x = bounds[:, 0] + np.random.rand(12) * (bounds[:, 1] - bounds[:, 0])
            budget_viol = self.compute_budget_violation(x)
            zone_viols = self.compute_zone_violations(x)
            total_viol = budget_viol + zone_viols.sum()
            
            samples.append({
                **{VAR_NAMES[i]: x[i] for i in range(12)},
                'budget_violation': budget_viol,
                'zone_violation_total': zone_viols.sum(),
                'total_violation': total_viol,
                'is_feasible': total_viol <= 0.01
            })
        
        return pd.DataFrame(samples)
    
    def plot_constraint_heatmap(self, var1: str, var2: str, 
                                 n_grid: int = 50) -> go.Figure:
        """
        Create heatmap showing constraint violations over two variables.
        """
        var1_idx = VAR_NAMES.index(var1)
        var2_idx = VAR_NAMES.index(var2)
        
        var1_range = np.linspace(VAR_BOUNDS[var1][0], VAR_BOUNDS[var1][1], n_grid)
        var2_range = np.linspace(VAR_BOUNDS[var2][0], VAR_BOUNDS[var2][1], n_grid)
        
        # Use median values for other variables
        base_x = np.array([
            (VAR_BOUNDS[v][0] + VAR_BOUNDS[v][1]) / 2 for v in VAR_NAMES
        ])
        
        violations = np.zeros((n_grid, n_grid))
        
        for i, v1 in enumerate(var1_range):
            for j, v2 in enumerate(var2_range):
                x = base_x.copy()
                x[var1_idx] = v1
                x[var2_idx] = v2
                violations[j, i] = self.compute_total_violation(x)
        
        fig = go.Figure(data=go.Heatmap(
            z=violations,
            x=var1_range,
            y=var2_range,
            colorscale='RdYlGn_r',
            colorbar=dict(title='Violation'),
            hovertemplate=f'{VAR_DISPLAY[var1][0]}: %{{x:.2f}}<br>'
                          f'{VAR_DISPLAY[var2][0]}: %{{y:.2f}}<br>'
                          f'Violation: %{{z:.3f}}<extra></extra>'
        ))
        
        # Add feasible boundary contour
        fig.add_trace(go.Contour(
            z=violations,
            x=var1_range,
            y=var2_range,
            contours=dict(
                start=0.01, end=0.01, size=0.01,
                coloring='none'
            ),
            line=dict(color='black', width=3),
            showscale=False,
            hoverinfo='skip'
        ))
        
        fig.update_layout(
            title=f'Constraint Violation: {VAR_DISPLAY[var1][0]} vs {VAR_DISPLAY[var2][0]}<br>'
                  f'<sup>Black line = feasibility boundary; Green = feasible</sup>',
            xaxis_title=f'{VAR_DISPLAY[var1][0]} ({VAR_DISPLAY[var1][1]})',
            yaxis_title=f'{VAR_DISPLAY[var2][0]} ({VAR_DISPLAY[var2][1]})',
            height=500, width=600
        )
        
        return fig
    
    def plot_feasibility_overview(self, sample_df: pd.DataFrame) -> go.Figure:
        """Create overview of feasibility across decision space."""
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=[
                'Carpool Reward vs Pacer Reward',
                'Peak Multiplier vs Budget Split',
                'Budget Violation Distribution',
                'Feasibility Rate by Variable Quartile'
            ]
        )
        
        feasible = sample_df[sample_df['is_feasible']]
        infeasible = sample_df[~sample_df['is_feasible']]
        
        # Plot 1: Carpool vs Pacer
        fig.add_trace(
            go.Scatter(
                x=feasible['carpool_reward'], y=feasible['pacer_reward'],
                mode='markers', marker=dict(color=COLORS['success'], opacity=0.5, size=4),
                name='Feasible', showlegend=True
            ),
            row=1, col=1
        )
        fig.add_trace(
            go.Scatter(
                x=infeasible['carpool_reward'], y=infeasible['pacer_reward'],
                mode='markers', marker=dict(color=COLORS['quaternary'], opacity=0.3, size=4),
                name='Infeasible', showlegend=True
            ),
            row=1, col=1
        )
        
        # Plot 2: Peak vs Budget Split
        fig.add_trace(
            go.Scatter(
                x=feasible['peak_multiplier'], y=feasible['budget_split'],
                mode='markers', marker=dict(color=COLORS['success'], opacity=0.5, size=4),
                showlegend=False
            ),
            row=1, col=2
        )
        fig.add_trace(
            go.Scatter(
                x=infeasible['peak_multiplier'], y=infeasible['budget_split'],
                mode='markers', marker=dict(color=COLORS['quaternary'], opacity=0.3, size=4),
                showlegend=False
            ),
            row=1, col=2
        )
        
        # Plot 3: Budget violation histogram
        fig.add_trace(
            go.Histogram(
                x=sample_df['budget_violation'], nbinsx=50,
                marker_color=COLORS['primary'], opacity=0.7,
                showlegend=False
            ),
            row=2, col=1
        )
        
        # Plot 4: Feasibility rate by variable quartile
        vars_to_check = ['carpool_reward', 'pacer_reward', 'peak_multiplier', 'budget_split']
        quartile_data = []
        
        for var in vars_to_check:
            sample_df['quartile'] = pd.qcut(sample_df[var], 4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
            rates = sample_df.groupby('quartile')['is_feasible'].mean()
            for q, rate in rates.items():
                quartile_data.append({'variable': VAR_DISPLAY[var][0], 'quartile': q, 'rate': rate})
        
        qdf = pd.DataFrame(quartile_data)
        
        for var in vars_to_check[:4]:
            var_data = qdf[qdf['variable'] == VAR_DISPLAY[var][0]]
            fig.add_trace(
                go.Bar(
                    x=var_data['quartile'], y=var_data['rate'],
                    name=VAR_DISPLAY[var][0], showlegend=False
                ),
                row=2, col=2
            )
        
        fig.update_layout(
            title='Feasibility Analysis Overview',
            height=800, width=1200
        )
        
        fig.update_xaxes(title_text='Carpool Reward', row=1, col=1)
        fig.update_yaxes(title_text='Pacer Reward', row=1, col=1)
        fig.update_xaxes(title_text='Peak Multiplier', row=1, col=2)
        fig.update_yaxes(title_text='Budget Split', row=1, col=2)
        fig.update_xaxes(title_text='Budget Violation', row=2, col=1)
        fig.update_yaxes(title_text='Count', row=2, col=1)
        
        return fig

# Initialize constraint analytics
constraint_analytics = ConstraintAnalytics(config)
print("Constraint analytics initialized")

Constraint analytics initialized


In [28]:
# =============================================================================
# 11.2 Run Constraint Analytics
# =============================================================================

# Sample constraint space
print("Sampling constraint space...")
constraint_samples = constraint_analytics.sample_constraint_space(n_samples=2000)
feasibility_rate = constraint_samples['is_feasible'].mean()
print(f"Overall feasibility rate: {feasibility_rate:.1%}")

# Feasibility overview
fig_feasibility = constraint_analytics.plot_feasibility_overview(constraint_samples)
fig_feasibility.show()

# Constraint heatmaps for key variable pairs
print("\nGenerating constraint heatmaps...")
fig_heatmap1 = constraint_analytics.plot_constraint_heatmap('carpool_reward', 'pacer_reward')
fig_heatmap1.show()

fig_heatmap2 = constraint_analytics.plot_constraint_heatmap('peak_multiplier', 'budget_split')
fig_heatmap2.show()

print("Constraint analytics complete")

Sampling constraint space...
Overall feasibility rate: 99.9%



Generating constraint heatmaps...


Constraint analytics complete


## 12. Enhanced Pareto Front Analysis

Deep-dive into Pareto solutions with knee detection, trade-off elasticity, and cluster analysis.

In [29]:
# =============================================================================
# 12.1 Enhanced Pareto Front Analysis
# =============================================================================
# Rationale: Investors need to understand trade-offs, not just see solutions.
# Knee detection and elasticity analysis provide actionable decision support.

class EnhancedParetoAnalysis:
    """Advanced Pareto front analysis with knee detection and trade-off insights."""
    
    def __init__(self, solutions: np.ndarray, objectives: np.ndarray):
        self.solutions = solutions
        self.objectives = objectives
        self.n_solutions = len(solutions)
        self.obj_names = ['VMT (1-red)', 'Travel Time', 'Cost $/VMT', 'Inequity']
        
        # Compute special points
        self._compute_special_points()
    
    def _compute_special_points(self):
        """Identify special solutions: knee, best per objective, clusters."""
        # Utopia and nadir
        self.utopia = self.objectives.min(axis=0)
        self.nadir = self.objectives.max(axis=0)
        
        # Normalize
        self.obj_normalized = (self.objectives - self.utopia) / (self.nadir - self.utopia + 1e-8)
        
        # Knee: minimum distance to utopia
        distances = np.sqrt((self.obj_normalized ** 2).sum(axis=1))
        self.knee_idx = np.argmin(distances)
        self.distances_to_utopia = distances
        
        # Best per objective
        self.best_vmt_idx = np.argmin(self.objectives[:, 0])
        self.best_time_idx = np.argmin(self.objectives[:, 1])
        self.best_cost_idx = np.argmin(self.objectives[:, 2])
        self.best_equity_idx = np.argmin(self.objectives[:, 3])
        
        # Cluster by dominant objective
        dominant = np.argmin(self.obj_normalized, axis=1)
        self.clusters = {
            'vmt_focused': np.where(dominant == 0)[0],
            'time_focused': np.where(dominant == 1)[0],
            'cost_focused': np.where(dominant == 2)[0],
            'equity_focused': np.where(dominant == 3)[0]
        }
    
    def compute_knee_metrics(self) -> dict:
        """Multiple knee detection metrics for verification."""
        metrics = {
            'distance_to_utopia': {},
            'curvature': {},
            'trade_off_ratio': {}
        }
        
        # Distance to utopia (already computed)
        for i in range(self.n_solutions):
            metrics['distance_to_utopia'][i] = self.distances_to_utopia[i]
        
        # Curvature heuristic (for sorted front)
        sorted_idx = np.argsort(self.objectives[:, 0])  # Sort by first objective
        for pos in range(1, len(sorted_idx) - 1):
            prev_i = sorted_idx[pos - 1]
            curr_i = sorted_idx[pos]
            next_i = sorted_idx[pos + 1]
            
            # Compute angle change
            v1 = self.objectives[curr_i] - self.objectives[prev_i]
            v2 = self.objectives[next_i] - self.objectives[curr_i]
            
            dot = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2) + 1e-8)
            curvature = 1 - dot  # Higher = more bent
            metrics['curvature'][curr_i] = curvature
        
        return metrics
    
    def compute_trade_off_elasticity(self, solution_idx: int, eps: float = 0.01) -> dict:
        """
        Compute local marginal rates of substitution around a solution.
        Shows how much of one objective must be sacrificed for another.
        """
        obj = self.obj_normalized[solution_idx]
        
        # Find nearby solutions
        distances = np.sqrt(((self.obj_normalized - obj) ** 2).sum(axis=1))
        distances[solution_idx] = np.inf  # Exclude self
        nearby = np.argsort(distances)[:min(5, self.n_solutions - 1)]
        
        elasticities = {}
        
        for i in range(4):
            for j in range(i + 1, 4):
                # Compute MRS: d(obj_j) / d(obj_i)
                delta_i = self.objectives[nearby, i] - self.objectives[solution_idx, i]
                delta_j = self.objectives[nearby, j] - self.objectives[solution_idx, j]
                
                valid = np.abs(delta_i) > eps
                if valid.sum() > 0:
                    mrs = np.mean(delta_j[valid] / delta_i[valid])
                else:
                    mrs = np.nan
                
                elasticities[f'{self.obj_names[i]} → {self.obj_names[j]}'] = mrs
        
        return elasticities
    
    def plot_enhanced_pareto(self) -> go.Figure:
        """Create enhanced Pareto visualization with annotations and hover details."""
        # Transform to display values
        display_obj = self.objectives.copy()
        display_obj[:, 0] = (1 - self.objectives[:, 0]) * 100  # VMT reduction %
        display_obj[:, 3] = (1 - self.objectives[:, 3]) * 100  # Equity %
        
        display_names = ['VMT Reduction (%)', 'Travel Time (min)', 'Cost ($/VMT)', 'Equity (%)']
        
        # Create hover text with decision variables
        hover_texts = []
        for i in range(self.n_solutions):
            sol = self.solutions[i]
            obj = display_obj[i]
            text = (
                f"<b>Solution {i}</b><br>"
                f"VMT Red: {obj[0]:.1f}% | Time: {obj[1]:.1f}min<br>"
                f"Cost: ${obj[2]:.2f}/VMT | Equity: {obj[3]:.1f}%<br>"
                f"---<br>"
                f"Carpool: ${sol[0]:.2f} | Pacer: ${sol[1]:.3f}/mi<br>"
                f"Peak: {sol[2]:.2f}x | Split: {sol[3]*100:.1f}%"
            )
            hover_texts.append(text)
        
        fig = make_subplots(
            rows=2, cols=3,
            subplot_titles=[
                'VMT vs Travel Time', 'VMT vs Cost', 'VMT vs Equity',
                'Time vs Cost', 'Time vs Equity', 'Cost vs Equity'
            ]
        )
        
        pairs = [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
        positions = [(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3)]
        
        for (i, j), (row, col) in zip(pairs, positions):
            # Regular points
            fig.add_trace(
                go.Scatter(
                    x=display_obj[:, i], y=display_obj[:, j],
                    mode='markers',
                    marker=dict(
                        size=10, color=self.distances_to_utopia,
                        colorscale='Viridis_r', opacity=0.7
                    ),
                    text=hover_texts,
                    hoverinfo='text',
                    showlegend=False
                ),
                row=row, col=col
            )
            
            # Knee point (gold star)
            fig.add_trace(
                go.Scatter(
                    x=[display_obj[self.knee_idx, i]],
                    y=[display_obj[self.knee_idx, j]],
                    mode='markers',
                    marker=dict(size=20, color=COLORS['knee'], symbol='star', 
                               line=dict(color='black', width=2)),
                    name='Knee',
                    showlegend=(row == 1 and col == 1)
                ),
                row=row, col=col
            )
            
            # Best per objective markers
            special_points = [
                (self.best_vmt_idx, 'VMT Best', 'diamond', COLORS['primary']),
                (self.best_cost_idx, 'Cost Best', 'square', COLORS['tertiary']),
                (self.best_equity_idx, 'Equity Best', 'triangle-up', COLORS['success'])
            ]
            
            for idx, name, symbol, color in special_points:
                fig.add_trace(
                    go.Scatter(
                        x=[display_obj[idx, i]],
                        y=[display_obj[idx, j]],
                        mode='markers',
                        marker=dict(size=14, color=color, symbol=symbol,
                                   line=dict(color='black', width=1)),
                        name=name,
                        showlegend=(row == 1 and col == 1)
                    ),
                    row=row, col=col
                )
            
            fig.update_xaxes(title_text=display_names[i], row=row, col=col)
            fig.update_yaxes(title_text=display_names[j], row=row, col=col)
        
        fig.update_layout(
            title='Enhanced Pareto Front Analysis<br><sup>Star = Knee (balanced); Colored = best per objective</sup>',
            height=1000, width=1800,
            legend=dict(orientation='h', yanchor='bottom', y=1.02)
        )
        
        return fig
    
    def plot_knee_verification(self) -> go.Figure:
        """Visualize knee detection with multiple metrics."""
        metrics = self.compute_knee_metrics()
        
        fig = make_subplots(
            rows=1, cols=2,
            subplot_titles=['Distance to Utopia', 'Solution Ranking']
        )
        
        # Distance to utopia bar chart
        sorted_idx = np.argsort(list(metrics['distance_to_utopia'].values()))
        
        colors = [COLORS['primary']] * self.n_solutions
        colors[sorted_idx[0]] = COLORS['knee']  # Knee point
        
        fig.add_trace(
            go.Bar(
                x=list(range(self.n_solutions)),
                y=[metrics['distance_to_utopia'][i] for i in range(self.n_solutions)],
                marker_color=colors,
                showlegend=False
            ),
            row=1, col=1
        )
        
        # Annotate knee
        fig.add_annotation(
            x=self.knee_idx, y=metrics['distance_to_utopia'][self.knee_idx],
            text=f"KNEE (idx={self.knee_idx})",
            showarrow=True, arrowhead=2,
            font=dict(size=12, color=COLORS['knee']),
            row=1, col=1
        )
        
        # Ranking comparison
        ranking_data = []
        for i in range(self.n_solutions):
            ranking_data.append({
                'solution': i,
                'distance_rank': np.argsort(np.argsort(
                    [metrics['distance_to_utopia'][j] for j in range(self.n_solutions)]
                ))[i] + 1,
                'is_knee': i == self.knee_idx
            })
        
        rdf = pd.DataFrame(ranking_data)
        
        fig.add_trace(
            go.Bar(
                x=rdf['solution'],
                y=rdf['distance_rank'],
                marker_color=[COLORS['knee'] if x else COLORS['neutral'] for x in rdf['is_knee']],
                showlegend=False
            ),
            row=1, col=2
        )
        
        fig.update_layout(
            title='Knee Point Verification',
            height=400, width=1000
        )
        
        fig.update_xaxes(title_text='Solution Index', row=1, col=1)
        fig.update_yaxes(title_text='Distance to Utopia', row=1, col=1)
        fig.update_xaxes(title_text='Solution Index', row=1, col=2)
        fig.update_yaxes(title_text='Rank (1=best)', row=1, col=2)
        
        return fig
    
    def plot_trade_off_elasticity(self) -> go.Figure:
        """Visualize trade-off elasticity around the knee."""
        elasticities = self.compute_trade_off_elasticity(self.knee_idx)
        
        labels = list(elasticities.keys())
        values = [elasticities[k] for k in labels]
        
        # Clean up labels
        labels_clean = [l.replace('(1-red)', '').replace('$/VMT', '') for l in labels]
        
        fig = go.Figure(go.Bar(
            x=values,
            y=labels_clean,
            orientation='h',
            marker_color=[COLORS['primary'] if v > 0 else COLORS['quaternary'] for v in values]
        ))
        
        fig.add_vline(x=0, line_dash='dash', line_color='black')
        
        fig.update_layout(
            title=f'Trade-off Elasticity at Knee Point (Solution {self.knee_idx})<br>'
                  '<sup>Marginal rate of substitution: how much of Y changes per unit of X</sup>',
            xaxis_title='Elasticity (dY/dX)',
            height=400, width=700
        )
        
        return fig

# Initialize enhanced analysis
enhanced_pareto = EnhancedParetoAnalysis(pareto_solutions, pareto_objectives)
print("Enhanced Pareto analysis initialized")
print(f"  Knee solution: {enhanced_pareto.knee_idx}")
print(f"  Best VMT: {enhanced_pareto.best_vmt_idx}")
print(f"  Best Cost: {enhanced_pareto.best_cost_idx}")
print(f"  Best Equity: {enhanced_pareto.best_equity_idx}")

Enhanced Pareto analysis initialized
  Knee solution: 1
  Best VMT: 2
  Best Cost: 10
  Best Equity: 21


In [30]:
# =============================================================================
# 12.2 Run Enhanced Pareto Visualizations
# =============================================================================

# Enhanced Pareto front
fig_pareto_enhanced = enhanced_pareto.plot_enhanced_pareto()
fig_pareto_enhanced.show()

# Knee verification
fig_knee = enhanced_pareto.plot_knee_verification()
fig_knee.show()

# Trade-off elasticity
fig_elasticity = enhanced_pareto.plot_trade_off_elasticity()
fig_elasticity.show()

print("Enhanced Pareto analysis complete")

Enhanced Pareto analysis complete


## 13. Sensitivity Analysis Utilities

Local and global sensitivity analysis for understanding parameter impacts on objectives.

In [31]:
# =============================================================================
# 13.1 Sensitivity Analysis Utilities
# =============================================================================
# Rationale: Finite differences chosen over autograd for simplicity and 
# compatibility with both surrogate and simulator. Local sensitivity is fast;
# global (Sobol) provides comprehensive variance decomposition.

class SensitivityAnalysis:
    """Local and global sensitivity analysis for the surrogate model."""
    
    def __init__(self, model: nn.Module, config: OptimizationConfig):
        self.model = model
        self.config = config
        self.var_names = VAR_NAMES
        self.obj_names = ['VMT (1-red)', 'Travel Time', 'Cost $/VMT', 'Inequity']
    
    def predict(self, x: np.ndarray) -> np.ndarray:
        """Get surrogate prediction for a single point."""
        self.model.eval()
        with torch.no_grad():
            x_tensor = torch.FloatTensor(x.reshape(1, -1)).to(DEVICE)
            x_norm = (x_tensor - self.model.X_mean) / self.model.X_std
            y = self.model(x_norm)
            y = y * self.model.Y_std + self.model.Y_mean
            return y.cpu().numpy().flatten()
    
    def predict_batch(self, X: np.ndarray) -> np.ndarray:
        """Get surrogate predictions for batch."""
        self.model.eval()
        with torch.no_grad():
            x_tensor = torch.FloatTensor(X).to(DEVICE)
            x_norm = (x_tensor - self.model.X_mean) / self.model.X_std
            y = self.model(x_norm)
            y = y * self.model.Y_std + self.model.Y_mean
            return y.cpu().numpy()
    
    def local_sensitivity(self, x: np.ndarray, eps: float = 1e-4) -> np.ndarray:
        """
        Compute local sensitivity (finite differences) at a point.
        
        Returns:
            gradients: Array of shape (n_vars, n_objectives)
        """
        y_base = self.predict(x)
        gradients = np.zeros((len(self.var_names), 4))
        
        for i in range(len(self.var_names)):
            x_plus = x.copy()
            x_plus[i] += eps
            y_plus = self.predict(x_plus)
            gradients[i] = (y_plus - y_base) / eps
        
        return gradients
    
    def normalized_sensitivity(self, x: np.ndarray) -> np.ndarray:
        """
        Compute normalized sensitivity (elasticity-like).
        Shows % change in objective per % change in variable.
        """
        gradients = self.local_sensitivity(x)
        y_base = self.predict(x)
        
        # Normalize: (dy/dx) * (x/y) = elasticity
        normalized = np.zeros_like(gradients)
        for i in range(len(self.var_names)):
            for j in range(4):
                if np.abs(y_base[j]) > 1e-8:
                    normalized[i, j] = gradients[i, j] * (x[i] / y_base[j])
                else:
                    normalized[i, j] = gradients[i, j] * x[i]
        
        return normalized
    
    def one_way_sweep(self, x_base: np.ndarray, var_name: str, 
                      n_points: int = 50) -> Tuple[np.ndarray, np.ndarray]:
        """
        Sweep a single variable and compute objectives.
        
        Returns:
            var_values: Array of variable values
            objectives: Array of shape (n_points, 4)
        """
        var_idx = self.var_names.index(var_name)
        bounds = VAR_BOUNDS[var_name]
        var_values = np.linspace(bounds[0], bounds[1], n_points)
        
        objectives = np.zeros((n_points, 4))
        for i, val in enumerate(var_values):
            x = x_base.copy()
            x[var_idx] = val
            objectives[i] = self.predict(x)
        
        return var_values, objectives
    
    def two_way_grid(self, x_base: np.ndarray, var1: str, var2: str,
                     n_grid: int = 30, obj_idx: int = 0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Create response surface for two variables.
        
        Returns:
            var1_values, var2_values, objective_grid
        """
        var1_idx = self.var_names.index(var1)
        var2_idx = self.var_names.index(var2)
        
        var1_values = np.linspace(VAR_BOUNDS[var1][0], VAR_BOUNDS[var1][1], n_grid)
        var2_values = np.linspace(VAR_BOUNDS[var2][0], VAR_BOUNDS[var2][1], n_grid)
        
        objective_grid = np.zeros((n_grid, n_grid))
        
        for i, v1 in enumerate(var1_values):
            for j, v2 in enumerate(var2_values):
                x = x_base.copy()
                x[var1_idx] = v1
                x[var2_idx] = v2
                objectives = self.predict(x)
                objective_grid[j, i] = objectives[obj_idx]
        
        return var1_values, var2_values, objective_grid
    
    def global_sensitivity_sobol(self, n_samples: int = 1024) -> Optional[dict]:
        """
        Compute Sobol sensitivity indices using SALib.
        Warning: Can be slow for large n_samples.
        
        Returns:
            dict with S1, ST indices per objective, or None if SALib unavailable
        """
        if not SALIB_AVAILABLE:
            print("SALib not available - skipping global sensitivity")
            return None
        
        # Define problem for SALib
        problem = {
            'num_vars': len(self.var_names),
            'names': self.var_names,
            'bounds': [list(VAR_BOUNDS[v]) for v in self.var_names]
        }
        
        # Generate samples
        print(f"Generating {n_samples} Sobol samples...")
        param_values = saltelli.sample(problem, n_samples, calc_second_order=False)
        print(f"  Total evaluations: {len(param_values)}")
        
        # Evaluate model
        print("Evaluating surrogate model...")
        Y = self.predict_batch(param_values)
        
        # Analyze for each objective
        results = {}
        for obj_idx, obj_name in enumerate(self.obj_names):
            print(f"  Analyzing {obj_name}...")
            Si = sobol.analyze(problem, Y[:, obj_idx], calc_second_order=False)
            results[obj_name] = {
                'S1': dict(zip(self.var_names, Si['S1'])),
                'S1_conf': dict(zip(self.var_names, Si['S1_conf'])),
                'ST': dict(zip(self.var_names, Si['ST'])),
                'ST_conf': dict(zip(self.var_names, Si['ST_conf']))
            }
        
        return results
    
    def plot_tornado(self, x: np.ndarray, title: str = "Local Sensitivity") -> go.Figure:
        """Create tornado chart showing local sensitivity at a point."""
        sens = self.normalized_sensitivity(x)
        
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=self.obj_names,
            horizontal_spacing=0.15
        )
        
        colors = [COLORS['primary'], COLORS['secondary'], COLORS['tertiary'], COLORS['quaternary']]
        
        for obj_idx in range(4):
            row, col = obj_idx // 2 + 1, obj_idx % 2 + 1
            
            # Sort by absolute magnitude
            sorted_idx = np.argsort(np.abs(sens[:, obj_idx]))[::-1]
            
            values = sens[sorted_idx, obj_idx]
            names = [VAR_DISPLAY[self.var_names[i]][0] for i in sorted_idx]
            bar_colors = [COLORS['success'] if v < 0 else COLORS['quaternary'] for v in values]
            
            fig.add_trace(
                go.Bar(
                    x=values,
                    y=names,
                    orientation='h',
                    marker_color=bar_colors,
                    showlegend=False
                ),
                row=row, col=col
            )
            
            fig.add_vline(x=0, line_dash='dash', line_color='gray', row=row, col=col)
        
        fig.update_layout(
            title=f'{title}<br><sup>Normalized sensitivity: % change in objective per % change in variable</sup>',
            height=900, width=1200
        )
        
        return fig
    
    def plot_one_way_sweep(self, x_base: np.ndarray, var_name: str) -> go.Figure:
        """Plot one-way sensitivity sweep."""
        var_values, objectives = self.one_way_sweep(x_base, var_name)
        
        # Transform objectives for display
        display_obj = objectives.copy()
        display_obj[:, 0] = (1 - objectives[:, 0]) * 100
        display_obj[:, 3] = (1 - objectives[:, 3]) * 100
        
        display_names = ['VMT Reduction (%)', 'Travel Time (min)', 'Cost ($/VMT)', 'Equity (%)']
        colors = [COLORS['primary'], COLORS['secondary'], COLORS['tertiary'], COLORS['quaternary']]
        
        fig = make_subplots(rows=2, cols=2, subplot_titles=display_names)
        
        for i in range(4):
            row, col = i // 2 + 1, i % 2 + 1
            fig.add_trace(
                go.Scatter(
                    x=var_values, y=display_obj[:, i],
                    mode='lines', line=dict(color=colors[i], width=2),
                    showlegend=False
                ),
                row=row, col=col
            )
            
            # Mark current value
            var_idx = self.var_names.index(var_name)
            current_val = x_base[var_idx]
            current_obj = self.predict(x_base)
            current_display = current_obj.copy()
            current_display[0] = (1 - current_obj[0]) * 100
            current_display[3] = (1 - current_obj[3]) * 100
            
            fig.add_trace(
                go.Scatter(
                    x=[current_val], y=[current_display[i]],
                    mode='markers', marker=dict(size=12, color='red', symbol='x'),
                    showlegend=False
                ),
                row=row, col=col
            )
            
            fig.update_xaxes(title_text=f'{VAR_DISPLAY[var_name][0]}', row=row, col=col)
        
        fig.update_layout(
            title=f'One-Way Sensitivity: {VAR_DISPLAY[var_name][0]}<br><sup>Red X = current value</sup>',
            height=900, width=1200
        )
        
        return fig
    
    def plot_two_way_heatmap(self, x_base: np.ndarray, var1: str, var2: str,
                             obj_name: str = 'VMT Reduction') -> go.Figure:
        """Plot two-way response surface heatmap."""
        obj_map = {'VMT Reduction': 0, 'Travel Time': 1, 'Cost': 2, 'Equity': 3}
        obj_idx = obj_map.get(obj_name, 0)
        
        var1_values, var2_values, grid = self.two_way_grid(x_base, var1, var2, obj_idx=obj_idx)
        
        # Transform if needed
        if obj_idx in [0, 3]:
            grid = (1 - grid) * 100
        
        fig = go.Figure(data=go.Heatmap(
            z=grid,
            x=var1_values,
            y=var2_values,
            colorscale='Viridis',
            colorbar=dict(title=obj_name),
            hovertemplate=f'{VAR_DISPLAY[var1][0]}: %{{x:.2f}}<br>'
                          f'{VAR_DISPLAY[var2][0]}: %{{y:.2f}}<br>'
                          f'{obj_name}: %{{z:.2f}}<extra></extra>'
        ))
        
        # Mark current point
        var1_idx = self.var_names.index(var1)
        var2_idx = self.var_names.index(var2)
        fig.add_trace(go.Scatter(
            x=[x_base[var1_idx]], y=[x_base[var2_idx]],
            mode='markers', marker=dict(size=15, color='red', symbol='x'),
            name='Current', showlegend=True
        ))
        
        fig.update_layout(
            title=f'Response Surface: {obj_name}<br><sup>{VAR_DISPLAY[var1][0]} vs {VAR_DISPLAY[var2][0]}</sup>',
            xaxis_title=f'{VAR_DISPLAY[var1][0]} ({VAR_DISPLAY[var1][1]})',
            yaxis_title=f'{VAR_DISPLAY[var2][0]} ({VAR_DISPLAY[var2][1]})',
            height=800, width=1200
        )
        
        return fig

# Initialize sensitivity analysis
sensitivity = SensitivityAnalysis(model, config)
print("Sensitivity analysis utilities initialized")

Sensitivity analysis utilities initialized


In [32]:
# =============================================================================
# 13.2 Run Sensitivity Analysis
# =============================================================================

# Use knee solution as reference point
knee_solution = pareto_solutions[knee_idx]

# Local sensitivity tornado
fig_tornado = sensitivity.plot_tornado(knee_solution, "Local Sensitivity at Knee Point")
fig_tornado.show()

# One-way sweeps for key variables
for var in ['carpool_reward', 'pacer_reward']:
    fig_sweep = sensitivity.plot_one_way_sweep(knee_solution, var)
    fig_sweep.show()

# Two-way response surface
fig_2way = sensitivity.plot_two_way_heatmap(knee_solution, 'carpool_reward', 'pacer_reward', 'VMT Reduction')
fig_2way.show()

print("Sensitivity analysis complete")

Sensitivity analysis complete
