In [None]:
"""
Bayesian Optimization Pipeline - Phase 2 (Enhanced)
====================================================
Iterative Bayesian Optimization for Chemical Experiments
With Range Optimization and Discrete Feature Support
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import json
import pickle
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Any, Union
from itertools import product

warnings.filterwarnings('ignore')

from sklearn.preprocessing import StandardScaler
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import ConstantKernel, Matern, WhiteKernel
from sklearn.metrics import r2_score, mean_squared_error
from scipy.stats import norm
from scipy.optimize import minimize

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class Phase2Config:
    """Phase 2 configuration parameters."""
    
    # Data
    response_column: str
    sheet_name: str = 'data'
    header_row: int = 5
    split_keyword: Optional[str] = "PREDICTED OPTIMUM RUNS"
    stop_feature: Optional[str] = "Batch ID"
    
    # Optimization Mode
    # Set maximize_response=True for maximization, False for minimization
    # Set target_range=(lower, upper) for range optimization (overrides maximize_response)
    maximize_response: bool = True
    target_range: Optional[Tuple[float, float]] = None  # NEW: (lower_bound, upper_bound)
    
    # Discrete Features Configuration
    # Dictionary mapping feature names to list of allowed values
    # Example: {'Temperature': [100, 150, 200], 'Catalyst': ['A', 'B', 'C']}
    discrete_features: Dict[str, List[Any]] = field(default_factory=dict)  # NEW
    
    # Suggestions
    n_suggestions: int = 5
    selection_strategy: str = 'diverse'  # 'diverse' hedges bets; 'greedy' exploits best region
    min_distance: float = 0.1  # Normalized distance; prevents clustered suggestions
    
    # Acquisition function
    exploration_weight: float = 0.01  # xi parameter; small value favors exploitation
    
    # Search space
    # Margin for continuous features (allows mild extrapolation beyond observed data)
    bounds_margin: float = 0.1  # Now customizable as requested
    
    # Optimizer
    n_optimizer_restarts: int = 25
    
    # Stopping
    patience: int = 3
    min_improvement: float = 0.01  # 1% relative improvement threshold
    
    # Duplicate prevention
    duplicate_threshold: float = 0.05
    
    # Output
    output_dir: str = 'bo_phase2_output'
    fresh_start: bool = False
    
    def __post_init__(self):
        """Validate configuration."""
        if self.target_range is not None:
            if len(self.target_range) != 2:
                raise ValueError("target_range must be a tuple of (lower, upper)")
            if self.target_range[0] >= self.target_range[1]:
                raise ValueError("target_range lower bound must be less than upper bound")

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# DataLoader
# ─────────────────────────────────────────────────────────────────────────────

class DataLoader:
    """Handles data loading and feature classification."""
    
    def load_excel(self, file_path: str, sheet_name: str, header_row: int) -> pd.DataFrame:
        xls = pd.ExcelFile(file_path, engine='openpyxl')
        return pd.read_excel(xls, sheet_name=sheet_name, header=header_row)
    
    def split_at_keyword(self, df: pd.DataFrame, keyword: str, 
                         column: str = 'Run') -> Tuple[pd.DataFrame, pd.DataFrame]:
        split_index = df.index[df[column] == keyword].tolist()
        if split_index:
            idx = split_index[0]
            return df.iloc[:idx], df.iloc[idx+1:]
        return df.copy(), pd.DataFrame()
    
    def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.drop(index=0, errors='ignore')
        df = df.dropna(how='all')
        return df.reset_index(drop=True)
    
    def classify_features(self, df: pd.DataFrame, feature_cols: List[str],
                         discrete_config: Dict[str, List[Any]]
                         ) -> Tuple[List[str], List[str], Dict[str, List[Any]]]:
        """
        Classify features as discrete or continuous.
        
        NEW: Instead of binary detection, uses user-provided discrete_config
        to determine which features are discrete and their allowed values.
        
        Args:
            df: DataFrame with features
            feature_cols: List of feature column names
            discrete_config: Dict mapping feature names to allowed values
            
        Returns:
            discrete_cols: List of discrete feature names
            continuous_cols: List of continuous feature names
            discrete_values: Dict mapping discrete features to their allowed values
        """
        numeric_features = df[feature_cols].select_dtypes(include=[np.number]).columns.tolist()
        
        discrete_cols = []
        continuous_cols = []
        discrete_values = {}
        
        for col in numeric_features:
            if col in discrete_config:
                # User specified this as discrete
                discrete_cols.append(col)
                discrete_values[col] = discrete_config[col]
                
                # Validate that observed data contains only allowed values (warning only)
                observed_values = set(df[col].dropna().unique())
                allowed_values = set(discrete_config[col])
                unexpected = observed_values - allowed_values
                if unexpected:
                    print(f"  Warning: Feature '{col}' has observed values {unexpected} "
                          f"not in allowed list {discrete_config[col]}")
            else:
                continuous_cols.append(col)
        
        # Handle non-numeric discrete features (e.g., categorical strings)
        for col in feature_cols:
            if col in discrete_config and col not in numeric_features:
                discrete_cols.append(col)
                discrete_values[col] = discrete_config[col]
                # Create numeric encoding for non-numeric discrete features
                df[col + '_encoded'] = df[col].map(
                    {v: i for i, v in enumerate(discrete_config[col])}
                )
        
        return discrete_cols, continuous_cols, discrete_values
    
    def get_feature_columns(self, df: pd.DataFrame, stop_feature: Optional[str], 
                           response_column: str) -> List[str]:
        columns = df.columns.tolist()
        
        if stop_feature and stop_feature in columns:
            feature_list = columns[:columns.index(stop_feature)]
        else:
            feature_list = [c for c in columns if c != response_column]
        
        # Remove index-like columns
        return [f for f in feature_list if f.lower() not in ['run', 'index', 'unnamed: 0']]

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# SearchSpace
# ─────────────────────────────────────────────────────────────────────────────

class SearchSpace:
    """
    Manages bounds and discrete feature enumeration.
    
    CHANGED: Now handles discrete features with user-specified allowed values
    instead of just binary features.
    
    Discrete features are enumerated because:
    - GPs struggle with discontinuous functions
    - Categorical boundaries are sharp in chemistry
    - Enumeration is tractable for typical discrete feature combinations
    """
    
    def __init__(self, X: pd.DataFrame, discrete_features: List[str],
                 discrete_values: Dict[str, List[Any]],
                 continuous_features: List[str], bounds_margin: float = 0.1):
        self.discrete_features = discrete_features
        self.discrete_values = discrete_values
        self.continuous_features = continuous_features
        
        self._bounds = {}
        self._discrete_combinations = []
        
        # Compute continuous bounds with margin for extrapolation
        for col in continuous_features:
            col_min, col_max = X[col].min(), X[col].max()
            margin = (col_max - col_min) * bounds_margin
            self._bounds[col] = {
                'min': col_min - margin, 
                'max': col_max + margin,
                'observed_min': col_min, 
                'observed_max': col_max, 
                'type': 'continuous'
            }
        
        # Store discrete feature bounds (for reference)
        for col in discrete_features:
            allowed = discrete_values[col]
            self._bounds[col] = {
                'allowed_values': allowed,
                'n_levels': len(allowed),
                'type': 'discrete'
            }
        
        # Enumerate all discrete combinations
        if discrete_features:
            value_lists = [discrete_values[col] for col in discrete_features]
            combinations = list(product(*value_lists))
            self._discrete_combinations = [
                dict(zip(discrete_features, combo)) for combo in combinations
            ]
        else:
            self._discrete_combinations = [{}]
    
    def get_continuous_bounds(self) -> List[Tuple[float, float]]:
        return [(self._bounds[col]['min'], self._bounds[col]['max']) 
                for col in self.continuous_features]
    
    def get_bounds_df(self) -> pd.DataFrame:
        rows = []
        for col, bounds in self._bounds.items():
            if bounds['type'] == 'continuous':
                rows.append({
                    'feature': col,
                    'type': 'continuous',
                    'min': bounds['min'],
                    'max': bounds['max'],
                    'observed_min': bounds['observed_min'],
                    'observed_max': bounds['observed_max']
                })
            else:
                rows.append({
                    'feature': col,
                    'type': 'discrete',
                    'allowed_values': str(bounds['allowed_values']),
                    'n_levels': bounds['n_levels']
                })
        return pd.DataFrame(rows)
    
    @property
    def bounds(self) -> Dict:
        return self._bounds
    
    @property
    def discrete_combinations(self) -> List[Dict]:
        return self._discrete_combinations
    
    @property
    def n_discrete_combinations(self) -> int:
        return len(self._discrete_combinations)

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# SurrogateModel
# ─────────────────────────────────────────────────────────────────────────────

class SurrogateModel:
    """
    Gaussian Process surrogate model.
    
    Design choices:
    - Matern 5/2 kernel: twice differentiable, handles non-smooth responses
      better than RBF while remaining smooth enough for optimization
    - StandardScaler on X and y: improves numerical stability and 
      makes length scale interpretation consistent across features
    - WhiteKernel: explicitly models observation noise
    """
    
    def __init__(self):
        self.model = None
        self.scaler_X = StandardScaler()
        self.scaler_y = StandardScaler()
        self._is_fitted = False
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        X_scaled = self.scaler_X.fit_transform(X)
        y_scaled = self.scaler_y.fit_transform(y.reshape(-1, 1)).ravel()
        
        # Matern 5/2: good default for physical processes
        kernel = (
            ConstantKernel(1.0, (1e-3, 1e3)) * 
            Matern(length_scale=np.ones(X.shape[1]), 
                   length_scale_bounds=(1e-3, 1e3), nu=2.5) +
            WhiteKernel(noise_level=0.1, noise_level_bounds=(1e-5, 1e1))
        )
        
        self.model = GaussianProcessRegressor(
            kernel=kernel, n_restarts_optimizer=10,
            normalize_y=False, random_state=RANDOM_STATE
        )
        self.model.fit(X_scaled, y_scaled)
        self._is_fitted = True
        return self
    
    def predict(self, X: np.ndarray, return_std: bool = True):
        X_scaled = self.scaler_X.transform(X)
        
        if return_std:
            mu_scaled, sigma_scaled = self.model.predict(X_scaled, return_std=True)
            mu = self.scaler_y.inverse_transform(mu_scaled.reshape(-1, 1)).ravel()
            sigma = sigma_scaled * self.scaler_y.scale_[0]
            return mu, sigma
        else:
            mu_scaled = self.model.predict(X_scaled)
            return self.scaler_y.inverse_transform(mu_scaled.reshape(-1, 1)).ravel()
    
    def score(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
        y_pred = self.predict(X, return_std=False)
        return {
            'r2': r2_score(y, y_pred),
            'rmse': np.sqrt(mean_squared_error(y, y_pred))
        }
    
    @property
    def is_fitted(self) -> bool:
        return self._is_fitted

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# AcquisitionOptimizer
# ─────────────────────────────────────────────────────────────────────────────

class AcquisitionOptimizer:
    """
    Optimizes acquisition function for experiment suggestions.
    
    NEW: Supports three optimization modes:
    1. Maximization (maximize_response=True, target_range=None)
    2. Minimization (maximize_response=False, target_range=None)
    3. Range optimization (target_range=(lower, upper))
    
    For range optimization, uses Expected Improvement for Target (EIT)
    which rewards predictions that fall within the target range.
    """
    
    def __init__(self, surrogate: SurrogateModel, search_space: SearchSpace,
                 config: Phase2Config):
        self.surrogate = surrogate
        self.search_space = search_space
        self.config = config
        self._best_y = None
    
    def set_best_y(self, best_y: float):
        self._best_y = float(best_y)
    
    def expected_improvement(self, X: np.ndarray, xi: float = 0.01) -> np.ndarray:
        """
        Standard EI for maximization/minimization.
        EI(x) = (mu - f_best - xi) * Phi(Z) + sigma * phi(Z)
        """
        X = np.array(X, dtype=np.float64)
        mu, sigma = self.surrogate.predict(X, return_std=True)
        mu = np.array(mu, dtype=np.float64)
        sigma = np.array(sigma, dtype=np.float64)
        sigma = np.maximum(sigma, 1e-9)
        
        if self.config.maximize_response:
            improvement = mu - self._best_y - xi
        else:
            improvement = self._best_y - mu - xi
        
        Z = improvement / sigma
        ei = improvement * norm.cdf(Z) + sigma * norm.pdf(Z)
        ei[sigma < 1e-9] = 0.0
        return ei
    
    def expected_improvement_for_range(self, X: np.ndarray, xi: float = 0.01) -> np.ndarray:
        """
        NEW: Expected Improvement for Target Range (EIT).
        
        Computes the probability-weighted expected "closeness" to the target range.
        
        For a target range [L, U]:
        - If prediction is below L: reward for moving toward L
        - If prediction is above U: reward for moving toward U  
        - If prediction is in [L, U]: reward based on probability of being in range
        
        This formulation:
        EIT(x) = P(L <= y <= U | x) + alpha * (phi(Z_L) + phi(Z_U)) * sigma
        
        Where the second term encourages exploration at the boundaries.
        """
        X = np.array(X, dtype=np.float64)
        mu, sigma = self.surrogate.predict(X, return_std=True)
        mu = np.array(mu, dtype=np.float64)
        sigma = np.array(sigma, dtype=np.float64)
        sigma = np.maximum(sigma, 1e-9)
        
        L, U = self.config.target_range
        
        # Standardized distances to bounds
        Z_L = (L - mu) / sigma  # Distance to lower bound
        Z_U = (U - mu) / sigma  # Distance to upper bound
        
        # Probability of being in range: P(L <= y <= U)
        prob_in_range = norm.cdf(Z_U) - norm.cdf(Z_L)
        
        # Expected improvement toward range
        # If below range: expected improvement toward L
        # If above range: expected improvement toward U
        
        # Component 1: Probability of being in the target range
        eit = prob_in_range.copy()
        
        # Component 2: For points outside range, add expected movement toward range
        # Below range: encourage moving up toward L
        below_range = mu < L
        if np.any(below_range):
            improvement_to_L = (L - mu[below_range] - xi)
            Z_improve = improvement_to_L / sigma[below_range]
            ei_below = improvement_to_L * norm.cdf(Z_improve) + sigma[below_range] * norm.pdf(Z_improve)
            # Scale to be comparable with probability
            ei_below = ei_below / (U - L + 1e-9)
            eit[below_range] += ei_below * (1 - prob_in_range[below_range])
        
        # Above range: encourage moving down toward U
        above_range = mu > U
        if np.any(above_range):
            improvement_to_U = (mu[above_range] - U - xi)
            Z_improve = improvement_to_U / sigma[above_range]
            ei_above = improvement_to_U * norm.cdf(Z_improve) + sigma[above_range] * norm.pdf(Z_improve)
            # Scale to be comparable with probability
            ei_above = ei_above / (U - L + 1e-9)
            eit[above_range] += ei_above * (1 - prob_in_range[above_range])
        
        # Component 3: Exploration bonus at boundaries (small)
        exploration_bonus = xi * (norm.pdf(Z_L) + norm.pdf(Z_U)) * sigma / (U - L + 1e-9)
        eit += exploration_bonus
        
        eit[sigma < 1e-9] = 0.0
        return eit
    
    def acquisition_function(self, X: np.ndarray, xi: float = 0.01) -> np.ndarray:
        """
        NEW: Unified acquisition function that selects appropriate method
        based on configuration.
        """
        if self.config.target_range is not None:
            return self.expected_improvement_for_range(X, xi)
        else:
            return self.expected_improvement(X, xi)
    
    def _construct_full_x(self, continuous_x: np.ndarray, discrete_values: Dict) -> np.ndarray:
        """Combine continuous values with fixed discrete values."""
        full_x = []
        cont_idx = 0
        for col in self.search_space.continuous_features + self.search_space.discrete_features:
            if col in self.search_space.discrete_features:
                full_x.append(float(discrete_values[col]))
            else:
                full_x.append(float(continuous_x[cont_idx]))
                cont_idx += 1
        return np.array(full_x, dtype=np.float64)
    
    def _negative_acq(self, x: np.ndarray, discrete_values: Dict) -> float:
        """Negative acquisition function for minimization."""
        x = np.array(x, dtype=np.float64)
        full_x = self._construct_full_x(x, discrete_values)
        acq = self.acquisition_function(full_x.reshape(1, -1), xi=self.config.exploration_weight)
        return -float(acq[0])
    
    def _compute_distance(self, point: np.ndarray, reference_points: np.ndarray) -> float:
        """Compute minimum distance from point to any reference point."""
        point = np.array(point, dtype=np.float64).flatten()
        reference_points = np.array(reference_points, dtype=np.float64)
        
        if reference_points.size == 0:
            return float('inf')
        
        if reference_points.ndim == 1:
            reference_points = reference_points.reshape(1, -1)
        
        diff = reference_points - point
        distances = np.sqrt(np.sum(diff ** 2, axis=1))
        return float(np.min(distances))
    
    def optimize_single(self, discrete_values: Dict, 
                        existing_points: Optional[np.ndarray] = None) -> Tuple[Optional[np.ndarray], float]:
        """Find best point for given discrete combination using multi-start L-BFGS-B."""
        bounds = self.search_space.get_continuous_bounds()
        
        # Handle case with no continuous features
        if not bounds:
            full_x = self._construct_full_x(np.array([]), discrete_values)
            acq = self.acquisition_function(full_x.reshape(1, -1), xi=self.config.exploration_weight)[0]
            return full_x, float(acq)
        
        # Convert bounds to float
        bounds = [(float(b[0]), float(b[1])) for b in bounds]
        
        best_x = None
        best_acq = float('-inf')
        
        for _ in range(self.config.n_optimizer_restarts):
            x0 = np.array([np.random.uniform(b[0], b[1]) for b in bounds], dtype=np.float64)
            
            try:
                result = minimize(
                    lambda x: self._negative_acq(x, discrete_values),
                    x0, method='L-BFGS-B', bounds=bounds
                )
                
                candidate_acq = -float(result.fun)
                
                if candidate_acq > best_acq:
                    candidate_x = self._construct_full_x(result.x, discrete_values)
                    
                    # Enforce diversity constraint
                    if existing_points is not None and existing_points.size > 0:
                        min_dist = self._compute_distance(candidate_x, existing_points)
                        if min_dist < self.config.min_distance:
                            continue
                    
                    best_x = candidate_x
                    best_acq = candidate_acq
            except Exception:
                continue
        
        return best_x, best_acq
    
    def find_next_points(self, n_points: int, existing_X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Find n_points suggestions using configured strategy."""
        # Ensure proper array format
        if existing_X is not None and existing_X.size > 0:
            all_points = np.array(existing_X, dtype=np.float64).copy()
            if all_points.ndim == 1:
                all_points = all_points.reshape(1, -1)
        else:
            all_points = None
        
        if self.config.selection_strategy == 'greedy':
            suggestions, acq_values = self._greedy_selection(n_points, all_points)
        else:
            suggestions, acq_values = self._diverse_selection(n_points, all_points)
        
        # Fallback if no suggestions generated
        if len(suggestions) == 0:
            print("  Warning: Acquisition optimization failed. Using uncertainty-based exploration.")
            suggestions, acq_values = self._fallback_selection(n_points, all_points)
        
        return suggestions, acq_values
    
    def _greedy_selection(self, n_points: int, existing_points: Optional[np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
        """Select top n points by acquisition value."""
        candidates = []
        
        for discrete_combo in self.search_space.discrete_combinations:
            best_x, best_acq = self.optimize_single(discrete_combo, existing_points)
            if best_x is not None:
                candidates.append((np.array(best_x, dtype=np.float64), float(best_acq)))
        
        # Sort by acquisition value (descending)
        candidates.sort(key=lambda x: x[1], reverse=True)
        
        suggestions, acq_values = [], []
        
        for x, acq in candidates:
            # Check duplicate threshold
            if existing_points is not None and existing_points.size > 0:
                min_dist = self._compute_distance(x, existing_points)
                if min_dist < self.config.duplicate_threshold:
                    continue
            
            suggestions.append(x.tolist())
            acq_values.append(acq)
            
            if len(suggestions) >= n_points:
                break
        
        # Return empty arrays with correct shape if no suggestions
        n_features = len(self.search_space.continuous_features) + len(self.search_space.discrete_features)
        if len(suggestions) == 0:
            return np.array([]).reshape(0, n_features), np.array([])
        
        return np.array(suggestions, dtype=np.float64), np.array(acq_values, dtype=np.float64)
    
    def _diverse_selection(self, n_points: int, existing_points: Optional[np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
        """Distribute suggestions across discrete combinations with spacing."""
        suggestions, acq_values = [], []
        
        # Initialize points to avoid
        if existing_points is not None and existing_points.size > 0:
            points_to_avoid = np.array(existing_points, dtype=np.float64).copy()
            if points_to_avoid.ndim == 1:
                points_to_avoid = points_to_avoid.reshape(1, -1)
        else:
            points_to_avoid = None
        
        n_combos = self.search_space.n_discrete_combinations
        combo_counts = {i: 0 for i in range(n_combos)}
        
        for _ in range(n_points):
            # Round-robin across discrete combinations
            combo_idx = min(combo_counts, key=combo_counts.get)
            discrete_combo = self.search_space.discrete_combinations[combo_idx]
            
            best_x, best_acq = self.optimize_single(discrete_combo, points_to_avoid)
            
            if best_x is not None:
                best_x = np.array(best_x, dtype=np.float64)
                suggestions.append(best_x.tolist())
                acq_values.append(float(best_acq))
                combo_counts[combo_idx] += 1
                
                # Add to avoidance set for diversity
                if points_to_avoid is not None:
                    points_to_avoid = np.vstack([points_to_avoid, best_x.reshape(1, -1)])
                else:
                    points_to_avoid = best_x.reshape(1, -1)
            else:
                combo_counts[combo_idx] = float('inf')
        
        # Return empty arrays with correct shape if no suggestions
        n_features = len(self.search_space.continuous_features) + len(self.search_space.discrete_features)
        if len(suggestions) == 0:
            return np.array([]).reshape(0, n_features), np.array([])
        
        return np.array(suggestions, dtype=np.float64), np.array(acq_values, dtype=np.float64)
    
    def _fallback_selection(self, n_points: int, existing_points: Optional[np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
        """
        Fallback when acquisition optimization fails: sample points with highest uncertainty.
        """
        n_candidates = 1000
        candidates = []
        
        # Generate random candidates across search space
        for discrete_combo in self.search_space.discrete_combinations:
            n_per_combo = n_candidates // max(1, self.search_space.n_discrete_combinations)
            
            for _ in range(n_per_combo):
                x = []
                for col in self.search_space.continuous_features:
                    bounds = self.search_space.bounds[col]
                    low = float(bounds['min'])
                    high = float(bounds['max'])
                    x.append(np.random.uniform(low, high))
                for col in self.search_space.discrete_features:
                    x.append(float(discrete_combo[col]))
                candidates.append(x)
        
        candidates = np.array(candidates, dtype=np.float64)
        
        # Get uncertainty for all candidates
        _, sigma = self.surrogate.predict(candidates, return_std=True)
        sigma = np.array(sigma, dtype=np.float64)
        
        # Sort by uncertainty (descending)
        sorted_idx = np.argsort(sigma)[::-1]
        
        suggestions, uncertainty_values = [], []
        
        for idx in sorted_idx:
            candidate = candidates[idx].copy()
            
            # Check not too close to existing data
            if existing_points is not None and existing_points.size > 0:
                min_dist = self._compute_distance(candidate, existing_points)
                if min_dist < self.config.duplicate_threshold:
                    continue
            
            # Check not too close to already suggested points
            if len(suggestions) > 0:
                suggestions_arr = np.array(suggestions, dtype=np.float64)
                min_dist = self._compute_distance(candidate, suggestions_arr)
                if min_dist < self.config.min_distance:
                    continue
            
            suggestions.append(candidate.tolist())
            uncertainty_values.append(float(sigma[idx]))
            
            if len(suggestions) >= n_points:
                break
        
        n_features = len(self.search_space.continuous_features) + len(self.search_space.discrete_features)
        if len(suggestions) == 0:
            return np.array([]).reshape(0, n_features), np.array([])
        
        return np.array(suggestions, dtype=np.float64), np.array(uncertainty_values, dtype=np.float64)

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# ConvergenceTracker
# ─────────────────────────────────────────────────────────────────────────────

class ConvergenceTracker:
    """
    Tracks optimization progress with patience-based stopping.
    
    NEW: For range optimization, tracks closeness to target range
    instead of just best value.
    """
    
    def __init__(self, config: Phase2Config):
        self.config = config
        self.history = []
        self.no_improvement_count = 0
    
    def load_history(self, history: List[Dict]):
        self.history = history
        self._update_no_improvement_count()
    
    def _compute_range_score(self, response: float) -> float:
        """
        NEW: Compute how close a response is to the target range.
        Returns 0 if in range, negative distance otherwise.
        """
        if self.config.target_range is None:
            return response if self.config.maximize_response else -response
        
        L, U = self.config.target_range
        if L <= response <= U:
            return 0.0  # Perfect - in range
        elif response < L:
            return -(L - response)  # Below range (negative)
        else:
            return -(response - U)  # Above range (negative)
    
    def _is_improvement(self, current: float, previous: float) -> bool:
        """Check if current is better than previous."""
        if self.config.target_range is not None:
            # For range optimization: closer to 0 is better
            current_score = self._compute_range_score(current)
            previous_score = self._compute_range_score(previous)
            return current_score > previous_score + self.config.min_improvement
        else:
            if self.config.maximize_response:
                return current > previous * (1 + self.config.min_improvement)
            else:
                return current < previous * (1 - self.config.min_improvement)
    
    def _update_no_improvement_count(self):
        if len(self.history) < 2:
            self.no_improvement_count = 0
            return
        
        count = 0
        best_so_far = self.history[0]['best_response']
        
        for record in self.history[1:]:
            if self._is_improvement(record['best_response'], best_so_far):
                best_so_far = record['best_response']
                count = 0
            else:
                count += 1
        
        self.no_improvement_count = count
    
    def record_iteration(self, iteration: int, best_response: float, 
                         n_experiments: int, suggestions: pd.DataFrame,
                         in_range_count: int = 0, total_count: int = 0):
        record = {
            'iteration': iteration,
            'timestamp': datetime.now().isoformat(),
            'best_response': best_response,
            'n_experiments': n_experiments,
            'n_suggestions': len(suggestions)
        }
        
        # NEW: Add range-specific metrics
        if self.config.target_range is not None:
            record['in_range_count'] = in_range_count
            record['in_range_pct'] = (in_range_count / total_count * 100) if total_count > 0 else 0
            record['range_score'] = self._compute_range_score(best_response)
        
        if len(self.history) > 0:
            prev_best = self.history[-1]['best_response']
            improved = self._is_improvement(best_response, prev_best)
            
            if self.config.target_range is not None:
                record['improvement'] = self._compute_range_score(best_response) - self._compute_range_score(prev_best)
            else:
                if self.config.maximize_response:
                    record['improvement'] = (best_response - prev_best) / abs(prev_best) if prev_best != 0 else 0
                else:
                    record['improvement'] = (prev_best - best_response) / abs(prev_best) if prev_best != 0 else 0
            
            record['improved'] = improved
            self.no_improvement_count = 0 if improved else self.no_improvement_count + 1
        
        self.history.append(record)
    
    @property
    def should_stop(self) -> bool:
        return self.no_improvement_count >= self.config.patience
    
    @property
    def current_iteration(self) -> int:
        return len(self.history)
    
    @property
    def best_response(self) -> Optional[float]:
        if not self.history:
            return None
        
        if self.config.target_range is not None:
            # For range optimization, return the response closest to range
            scores = [(h['best_response'], self._compute_range_score(h['best_response'])) 
                      for h in self.history]
            return max(scores, key=lambda x: x[1])[0]
        else:
            if self.config.maximize_response:
                return max(h['best_response'] for h in self.history)
            return min(h['best_response'] for h in self.history)
    
    def get_history_df(self) -> pd.DataFrame:
        return pd.DataFrame(self.history)

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# Checkpoint Manager
# ─────────────────────────────────────────────────────────────────────────────

class Phase2Checkpoint:
    """Persists state between iterations."""
    
    def __init__(self, output_dir: str):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.checkpoint_file = self.output_dir / 'phase2_checkpoint.pkl'
        self.json_file = self.output_dir / 'phase2_checkpoint.json'
    
    def save(self, state: Dict[str, Any]):
        with open(self.checkpoint_file, 'wb') as f:
            pickle.dump(state, f)
        
        json_state = {k: v for k, v in state.items() 
                      if k not in ['history']}
        with open(self.json_file, 'w') as f:
            json.dump(json_state, f, indent=2, default=str)
    
    def load(self) -> Optional[Dict[str, Any]]:
        if not self.checkpoint_file.exists():
            return None
        with open(self.checkpoint_file, 'rb') as f:
            return pickle.load(f)
    
    def exists(self) -> bool:
        return self.checkpoint_file.exists()
    
    def clear(self):
        if self.checkpoint_file.exists():
            self.checkpoint_file.unlink()
        if self.json_file.exists():
            self.json_file.unlink()

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# Plotter (Enhanced)
# ─────────────────────────────────────────────────────────────────────────────

class Phase2Plotter:
    """Generates diagnostic visualizations."""
    
    def __init__(self, output_dir: str):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
    
    def plot_convergence(self, tracker: ConvergenceTracker, config: Phase2Config):
        history_df = tracker.get_history_df()
        if len(history_df) < 1:
            return
        
        fig, ax = plt.subplots(figsize=(8, 5))
        ax.plot(history_df['iteration'], history_df['best_response'], 'bo-', linewidth=2, markersize=8)
        
        # NEW: Add target range visualization
        if config.target_range is not None:
            L, U = config.target_range
            ax.axhline(y=L, color='g', linestyle='--', alpha=0.7, label=f'Target Range [{L}, {U}]')
            ax.axhline(y=U, color='g', linestyle='--', alpha=0.7)
            ax.fill_between(history_df['iteration'], L, U, alpha=0.1, color='green')
            ax.set_title('Optimization Progress (Target Range)')
        else:
            ax.set_title(f'Optimization Progress ({"Max" if config.maximize_response else "Min"})')
        
        ax.set_xlabel('Iteration')
        ax.set_ylabel('Best Response')
        ax.grid(True, alpha=0.3)
        ax.legend()
        
        plt.tight_layout()
        plt.savefig(self.output_dir / 'convergence_plot.png', dpi=150, bbox_inches='tight')
        plt.show()
    
    def plot_gp_1d(self, surrogate: SurrogateModel, X: np.ndarray, y: np.ndarray,
                   feature_names: List[str], feature_idx: int, search_space: SearchSpace,
                   config: Phase2Config):
        """1D slice through GP (other features at mean)."""
        feature_name = feature_names[feature_idx]
        if feature_name not in search_space.continuous_features:
            return
        
        fig, ax = plt.subplots(figsize=(8, 5))
        
        n_test = 100
        X_mean = X.mean(axis=0)
        bounds = search_space.bounds[feature_name]
        test_values = np.linspace(bounds['min'], bounds['max'], n_test)
        
        X_test = np.tile(X_mean, (n_test, 1))
        X_test[:, feature_idx] = test_values
        
        mu, sigma = surrogate.predict(X_test, return_std=True)
        
        ax.fill_between(test_values, mu - 2*sigma, mu + 2*sigma, alpha=0.3, label='95% CI')
        ax.plot(test_values, mu, 'b-', linewidth=2, label='GP Mean')
        ax.scatter(X[:, feature_idx], y, c='red', s=50, zorder=5, edgecolors='black', label='Observed')
        
        # NEW: Add target range visualization
        if config.target_range is not None:
            L, U = config.target_range
            ax.axhline(y=L, color='g', linestyle='--', alpha=0.7, label=f'Target Range')
            ax.axhline(y=U, color='g', linestyle='--', alpha=0.7)
            ax.fill_between(test_values, L, U, alpha=0.1, color='green')
        
        ax.set_xlabel(feature_name)
        ax.set_ylabel('Response')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(self.output_dir / f'gp_1d_{feature_name}.png', dpi=150, bbox_inches='tight')
        plt.show()
    
    def plot_gp_2d(self, surrogate: SurrogateModel, X: np.ndarray, y: np.ndarray,
                   feature_names: List[str], idx1: int, idx2: int, search_space: SearchSpace,
                   config: Phase2Config):
        """2D contour of GP surface."""
        feat_1, feat_2 = feature_names[idx1], feature_names[idx2]
        
        if feat_1 not in search_space.continuous_features or \
           feat_2 not in search_space.continuous_features:
            return
        
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        
        n_grid = 50
        x1_range = np.linspace(search_space.bounds[feat_1]['min'], 
                               search_space.bounds[feat_1]['max'], n_grid)
        x2_range = np.linspace(search_space.bounds[feat_2]['min'], 
                               search_space.bounds[feat_2]['max'], n_grid)
        X1, X2 = np.meshgrid(x1_range, x2_range)
        
        X_mean = X.mean(axis=0)
        X_test = np.tile(X_mean, (n_grid * n_grid, 1))
        X_test[:, idx1] = X1.ravel()
        X_test[:, idx2] = X2.ravel()
        
        mu, sigma = surrogate.predict(X_test, return_std=True)
        Mu = mu.reshape(n_grid, n_grid)
        Sigma = sigma.reshape(n_grid, n_grid)
        
        # Mean surface
        cs1 = axes[0].contourf(X1, X2, Mu, levels=20, cmap='viridis')
        axes[0].scatter(X[:, idx1], X[:, idx2], c='red', s=50, edgecolors='white')
        axes[0].set_xlabel(feat_1)
        axes[0].set_ylabel(feat_2)
        axes[0].set_title('GP Mean')
        plt.colorbar(cs1, ax=axes[0])
        
        # NEW: Add contour lines for target range if specified
        if config.target_range is not None:
            L, U = config.target_range
            axes[0].contour(X1, X2, Mu, levels=[L, U], colors='green', linewidths=2, linestyles='--')
        
        # Uncertainty surface
        cs2 = axes[1].contourf(X1, X2, Sigma, levels=20, cmap='YlOrRd')
        axes[1].scatter(X[:, idx1], X[:, idx2], c='blue', s=50, edgecolors='white')
        axes[1].set_xlabel(feat_1)
        axes[1].set_ylabel(feat_2)
        axes[1].set_title('GP Uncertainty')
        plt.colorbar(cs2, ax=axes[1])
        
        plt.tight_layout()
        plt.savefig(self.output_dir / f'gp_2d_{feat_1}_{feat_2}.png', dpi=150, bbox_inches='tight')
        plt.show()

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# ExperimentSuggester
# ─────────────────────────────────────────────────────────────────────────────

class ExperimentSuggester:
    """Formats and exports experiment suggestions."""
    
    def __init__(self, search_space: SearchSpace, config: Phase2Config):
        self.search_space = search_space
        self.config = config
    
    def format_suggestions(self, suggested_X: np.ndarray, acq_values: np.ndarray,
                           feature_names: List[str], surrogate: SurrogateModel,
                           discrete_values: Dict[str, List[Any]]) -> pd.DataFrame:
        if len(suggested_X) == 0:
            return pd.DataFrame()
        
        mu, sigma = surrogate.predict(suggested_X, return_std=True)
        
        df = pd.DataFrame(suggested_X, columns=feature_names)
        
        # Round continuous features
        for col in self.search_space.continuous_features:
            df[col] = df[col].round(4)
        
        # For discrete features, snap to nearest allowed value
        for col in self.search_space.discrete_features:
            if col in discrete_values:
                allowed = discrete_values[col]
                # Snap to nearest allowed value
                df[col] = df[col].apply(lambda x: min(allowed, key=lambda v: abs(float(v) - x)))
        
        df['predicted_response'] = mu.round(4)
        df['uncertainty'] = sigma.round(4)
        df['acquisition_value'] = acq_values.round(6)
        
        # NEW: Add range probability if target_range is specified
        if self.config.target_range is not None:
            L, U = self.config.target_range
            # Compute probability of being in range
            Z_L = (L - mu) / np.maximum(sigma, 1e-9)
            Z_U = (U - mu) / np.maximum(sigma, 1e-9)
            prob_in_range = norm.cdf(Z_U) - norm.cdf(Z_L)
            df['prob_in_range'] = prob_in_range.round(4)
        
        df = df.sort_values('acquisition_value', ascending=False).reset_index(drop=True)
        df.insert(0, 'rank', range(1, len(df) + 1))
        
        return df
    
    def export_csv(self, suggestions_df: pd.DataFrame, iteration: int) -> str:
        output_path = Path(self.config.output_dir)
        output_path.mkdir(exist_ok=True)
        filename = output_path / f'suggestions_iter{iteration}.csv'
        suggestions_df.to_csv(filename, index=False)
        return str(filename)

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# Results Container
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class Phase2Results:
    """Container for pipeline outputs."""
    iteration: int
    suggestions: pd.DataFrame
    current_best: float
    previous_best: Optional[float]
    improved: bool
    should_continue: bool
    no_improvement_count: int
    model_metrics: Dict[str, float]
    suggestions_file: str
    # NEW: Range optimization metrics
    in_range_count: Optional[int] = None
    in_range_percentage: Optional[float] = None
    optimization_mode: str = "maximize"  # "maximize", "minimize", or "range"

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# Main Pipeline
# ─────────────────────────────────────────────────────────────────────────────

class Phase2Pipeline:
    """
    Bayesian Optimization pipeline for chemical experiments.
    
    NEW FEATURES:
    1. Range optimization: Optimize response toward a target range [L, U]
    2. Discrete features: User-specified discrete features with allowed values
    """
    
    def __init__(self, config: Phase2Config):
        self.config = config
        self._output_dir = Path(config.output_dir)
        self._output_dir.mkdir(exist_ok=True)
        
        self._loader = DataLoader()
        self._checkpoint_mgr = Phase2Checkpoint(config.output_dir)
        self._plotter = Phase2Plotter(config.output_dir)
        self._tracker = ConvergenceTracker(config)
        
        self._X = None
        self._y = None
        self._feature_names = []
        self._discrete_features = []
        self._continuous_features = []
        self._discrete_values = {}
        self._search_space = None
        self._surrogate = None
        self._iteration = 0
    
    def run(self, data_file: str) -> Phase2Results:
        """Run one BO iteration."""
        print("=" * 60)
        print("Phase 2: Bayesian Optimization (Enhanced)")
        print("=" * 60)
        
        # Print optimization mode
        if self.config.target_range is not None:
            print(f"  Mode: Range Optimization [{self.config.target_range[0]}, {self.config.target_range[1]}]")
            opt_mode = "range"
        elif self.config.maximize_response:
            print("  Mode: Maximization")
            opt_mode = "maximize"
        else:
            print("  Mode: Minimization")
            opt_mode = "minimize"
        
        # Print discrete features config
        if self.config.discrete_features:
            print(f"  Discrete Features: {list(self.config.discrete_features.keys())}")
        
        # Load checkpoint if continuing
        previous_best = None
        if not self.config.fresh_start and self._checkpoint_mgr.exists():
            self._load_checkpoint()
            previous_best = self._tracker.best_response
            print(f"  Loaded checkpoint (iteration {self._iteration})")
        
        self._load_data(data_file)
        self._iteration += 1
        
        print(f"\n  Iteration: {self._iteration}")
        print(f"  Experiments: {len(self._X)}")
        
        # Build search space
        self._search_space = SearchSpace(
            self._X, self._discrete_features, self._discrete_values,
            self._continuous_features, self.config.bounds_margin
        )
        
        # Fit surrogate
        self._fit_surrogate()
        
        # Find current best and range metrics
        in_range_count = None
        in_range_pct = None
        
        if self.config.target_range is not None:
            L, U = self.config.target_range
            in_range_mask = (self._y >= L) & (self._y <= U)
            in_range_count = in_range_mask.sum()
            in_range_pct = in_range_count / len(self._y) * 100
            
            # Best is the one closest to range (or in range with lowest variance)
            if in_range_count > 0:
                # Pick one in range (could be center of range)
                in_range_responses = self._y[in_range_mask]
                target_center = (L + U) / 2
                current_best = in_range_responses.iloc[(in_range_responses - target_center).abs().argmin()]
            else:
                # Pick closest to range
                distances = np.minimum(np.abs(self._y - L), np.abs(self._y - U))
                distances[in_range_mask] = 0
                current_best = self._y.iloc[distances.argmin()]
            
            print(f"  Current best: {current_best:.4f}")
            print(f"  In range: {in_range_count}/{len(self._y)} ({in_range_pct:.1f}%)")
        else:
            if self.config.maximize_response:
                current_best = self._y.max()
            else:
                current_best = self._y.min()
            print(f"  Current best: {current_best:.4f}")
        
        # Generate suggestions
        suggestions_df = self._generate_suggestions(current_best)
        
        # Export
        suggester = ExperimentSuggester(self._search_space, self.config)
        suggestions_file = suggester.export_csv(suggestions_df, self._iteration)
        print(f"  Suggestions saved: {suggestions_file}")
        
        # Update tracker
        self._tracker.record_iteration(
            self._iteration, current_best, len(self._X), suggestions_df,
            in_range_count=in_range_count or 0, total_count=len(self._y)
        )
        
        # Check improvement
        improved = False
        if previous_best is not None:
            improved = self._tracker._is_improvement(current_best, previous_best)
        
        # Save checkpoint
        self._save_checkpoint(current_best)
        
        # Generate plots
        self._generate_plots()
        
        # Print status
        self._print_status(current_best, previous_best, improved, in_range_count, in_range_pct)
        
        return Phase2Results(
            iteration=self._iteration,
            suggestions=suggestions_df,
            current_best=current_best,
            previous_best=previous_best,
            improved=improved,
            should_continue=not self._tracker.should_stop,
            no_improvement_count=self._tracker.no_improvement_count,
            model_metrics=self._surrogate.score(self._X.values, self._y.values),
            suggestions_file=suggestions_file,
            in_range_count=in_range_count,
            in_range_percentage=in_range_pct,
            optimization_mode=opt_mode
        )
    
    def _load_data(self, data_file: str):
        print(f"\n  Loading: {data_file}")
        
        df = self._loader.load_excel(data_file, self.config.sheet_name, self.config.header_row)
        
        if self.config.split_keyword:
            df, _ = self._loader.split_at_keyword(df, self.config.split_keyword)
        
        df = self._loader.clean_data(df)
        
        feature_cols = self._loader.get_feature_columns(
            df, self.config.stop_feature, self.config.response_column
        )
        
        # NEW: Use discrete_features from config
        self._discrete_features, self._continuous_features, self._discrete_values = \
            self._loader.classify_features(df, feature_cols, self.config.discrete_features)
        
        self._feature_names = self._continuous_features + self._discrete_features
        self._X = df[self._feature_names].copy()
        self._y = df[self.config.response_column].copy()
        
        # Drop missing response
        valid = ~self._y.isnull()
        self._X = self._X[valid].reset_index(drop=True)
        self._y = self._y[valid].reset_index(drop=True)
        
        print(f"  Features: {len(self._feature_names)} "
              f"({len(self._continuous_features)} continuous, {len(self._discrete_features)} discrete)")
        
        if self._discrete_features:
            for feat in self._discrete_features:
                print(f"    - {feat}: {self._discrete_values[feat]}")
    
    def _fit_surrogate(self):
        print("\n  Fitting GP model...")
        self._surrogate = SurrogateModel()
        self._surrogate.fit(self._X.values, self._y.values)
        metrics = self._surrogate.score(self._X.values, self._y.values)
        print(f"  R²: {metrics['r2']:.4f}, RMSE: {metrics['rmse']:.4f}")
    
    def _generate_suggestions(self, current_best: float) -> pd.DataFrame:
        print(f"\n  Generating {self.config.n_suggestions} suggestions ({self.config.selection_strategy})...")
        
        optimizer = AcquisitionOptimizer(self._surrogate, self._search_space, self.config)
        optimizer.set_best_y(current_best)
        
        suggested_X, acq_values = optimizer.find_next_points(
            self.config.n_suggestions, self._X.values
        )
        
        suggester = ExperimentSuggester(self._search_space, self.config)
        return suggester.format_suggestions(
            suggested_X, acq_values, self._feature_names,
            self._surrogate, self._discrete_values
        )
    
    def _save_checkpoint(self, current_best: float):
        state = {
            'iteration': self._iteration,
            'timestamp': datetime.now().isoformat(),
            'n_experiments': len(self._X),
            'best_response': current_best,
            'no_improvement_count': self._tracker.no_improvement_count,
            'should_stop': self._tracker.should_stop,
            'features': self._feature_names,
            'discrete_features': self._discrete_features,
            'continuous_features': self._continuous_features,
            'discrete_values': self._discrete_values,
            'history': self._tracker.history,
            # NEW: Save range config
            'target_range': self.config.target_range
        }
        self._checkpoint_mgr.save(state)
    
    def _load_checkpoint(self):
        state = self._checkpoint_mgr.load()
        if state:
            self._iteration = state['iteration']
            self._tracker.load_history(state.get('history', []))
    
    def _generate_plots(self):
        print("\n  Generating plots...")
        
        if len(self._tracker.history) > 0:
            self._plotter.plot_convergence(self._tracker, self.config)
        
        # 1D plots for first 3 continuous features
        for i, feat in enumerate(self._continuous_features[:3]):
            feat_idx = self._feature_names.index(feat)
            self._plotter.plot_gp_1d(
                self._surrogate, self._X.values, self._y.values,
                self._feature_names, feat_idx, self._search_space, self.config
            )
        
        # 2D plot if 2+ continuous features
        if len(self._continuous_features) >= 2:
            idx1 = self._feature_names.index(self._continuous_features[0])
            idx2 = self._feature_names.index(self._continuous_features[1])
            self._plotter.plot_gp_2d(
                self._surrogate, self._X.values, self._y.values,
                self._feature_names, idx1, idx2, self._search_space, self.config
            )
    
    def _print_status(self, current_best: float, previous_best: Optional[float], 
                      improved: bool, in_range_count: Optional[int] = None,
                      in_range_pct: Optional[float] = None):
        print("\n" + "-" * 60)
        print("Status")
        print("-" * 60)
        
        if self.config.target_range is not None:
            L, U = self.config.target_range
            print(f"  Target Range: [{L}, {U}]")
            if in_range_count is not None:
                print(f"  In Range: {in_range_count} experiments ({in_range_pct:.1f}%)")
        
        if previous_best is not None:
            arrow = "↑" if improved else "→"
            print(f"  Best: {previous_best:.4f} {arrow} {current_best:.4f}")
            print(f"  Improved: {'Yes' if improved else 'No'}")
        else:
            print(f"  Best: {current_best:.4f} (first iteration)")
        
        print(f"  No improvement: {self._tracker.no_improvement_count}/{self.config.patience}")
        
        if self._tracker.should_stop:
            print("\n  ⚠ STOPPING CRITERION MET")
        else:
            print("\n  ✓ Continue with suggested experiments")
        print("-" * 60)
    
    def reset(self):
        """Clear checkpoint and start fresh."""
        self._checkpoint_mgr.clear()
        self._tracker = ConvergenceTracker(self.config)
        self._iteration = 0
        print("  Pipeline reset.")
    
    @property
    def iteration(self) -> int:
        return self._iteration
    
    @property
    def history(self) -> pd.DataFrame:
        return self._tracker.get_history_df()
    
    @property
    def bounds(self) -> pd.DataFrame:
        if self._search_space:
            return self._search_space.get_bounds_df()
        return pd.DataFrame()

In [None]:
# ─────────────────────────────────────────────────────────────────────────────
# Convenience Function (Enhanced)
# ─────────────────────────────────────────────────────────────────────────────

def run_bo_iteration(data_file: str, response_column: str, 
                     n_suggestions: int = 5, maximize: bool = True,
                     target_range: Optional[Tuple[float, float]] = None,
                     discrete_features: Optional[Dict[str, List[Any]]] = None,
                     strategy: str = 'diverse', output_dir: str = 'bo_results',
                     bounds_margin: float = 0.1,
                     sheet_name: str = 'data',
                     header_row: int = 5,
                     stop_feature: Optional[str] = None,
                     split_keyword: Optional[str] = "PREDICTED OPTIMUM RUNS",
                     exploration_weight: float = 0.01,
                     min_distance: float = 0.1,
                     duplicate_threshold: float = 0.05,
                     n_optimizer_restarts: int = 25,
                     patience: int = 3,
                     min_improvement: float = 0.01,
                     fresh_start: bool = False,
                     **kwargs) -> Phase2Results:
    """
    Quick single-iteration BO run.
    
    Parameters
    ----------
    data_file : str
        Path to Excel file containing experimental data.
    response_column : str
        Name of the column containing the response variable to optimize.
    n_suggestions : int, default=5
        Number of experiment suggestions to generate.
    maximize : bool, default=True
        If True, maximize the response. If False, minimize.
        Ignored if target_range is specified.
    target_range : tuple of (float, float), optional (NEW)
        Target range for response optimization as (lower_bound, upper_bound).
        If specified, optimizes to get responses within this range.
        Overrides the maximize parameter.
    discrete_features : dict, optional (NEW)
        Dictionary mapping feature names to lists of allowed values.
        Example: {'Temperature': [100, 150, 200], 'Catalyst': [1, 2, 3]}
        Features not in this dict are treated as continuous.
    strategy : str, default='diverse'
        Selection strategy: 'diverse' (spread across search space) or 
        'greedy' (focus on best predicted region).
    output_dir : str, default='bo_results'
        Directory for output files (suggestions CSV, plots, checkpoints).
    bounds_margin : float, default=0.1
        Fraction to extend continuous feature bounds beyond observed range.
        Allows mild extrapolation.
    sheet_name : str, default='data'
        Name of Excel sheet containing data.
    header_row : int, default=5
        Row number (0-indexed) containing column headers.
    stop_feature : str, optional
        Feature column name where feature list ends (columns after are not features).
    split_keyword : str, optional
        Keyword in 'Run' column that separates training data from predictions.
    exploration_weight : float, default=0.01
        Xi parameter for acquisition function. Higher values encourage exploration.
    min_distance : float, default=0.1
        Minimum normalized distance between suggested points (for diversity).
    duplicate_threshold : float, default=0.05
        Minimum distance from existing data points to avoid duplicates.
    n_optimizer_restarts : int, default=25
        Number of random restarts for acquisition function optimization.
    patience : int, default=3
        Number of iterations without improvement before stopping.
    min_improvement : float, default=0.01
        Minimum relative improvement (1% default) to count as progress.
    fresh_start : bool, default=False
        If True, ignore existing checkpoint and start fresh.
    **kwargs
        Additional arguments passed to Phase2Config.
    
    Returns
    -------
    Phase2Results
        Results object containing:
        - iteration: Current iteration number
        - suggestions: DataFrame of suggested experiments
        - current_best: Best response value in current data
        - previous_best: Best response from previous iteration (if any)
        - improved: Whether improvement was achieved
        - should_continue: Whether to continue optimization
        - no_improvement_count: Consecutive iterations without improvement
        - model_metrics: Dict with 'r2' and 'rmse' of GP model
        - suggestions_file: Path to saved suggestions CSV
        - in_range_count: Number of experiments in target range (if range optimization)
        - in_range_percentage: Percentage of experiments in range (if range optimization)
        - optimization_mode: 'maximize', 'minimize', or 'range'
    
    Examples
    --------
    # Standard maximization
    >>> results = run_bo_iteration('experiments.xlsx', 'Yield', maximize=True)
    
    # Standard minimization
    >>> results = run_bo_iteration('experiments.xlsx', 'Impurity', maximize=False)
    
    # Range optimization (NEW) - optimize Yield to be between 5.0 and 7.0
    >>> results = run_bo_iteration(
    ...     'experiments.xlsx', 'Yield',
    ...     target_range=(5.0, 7.0)
    ... )
    
    # With discrete features (NEW)
    >>> results = run_bo_iteration(
    ...     'experiments.xlsx', 'Yield',
    ...     discrete_features={
    ...         'Temperature': [100, 150, 200, 250],
    ...         'Catalyst_Type': [1, 2, 3]
    ...     }
    ... )
    
    # Combined: Range optimization with discrete features
    >>> results = run_bo_iteration(
    ...     'experiments.xlsx', 'Yield',
    ...     target_range=(5.0, 7.0),
    ...     discrete_features={'Temperature': [100, 150, 200]},
    ...     bounds_margin=0.15,
    ...     n_suggestions=10
    ... )
    """
    config = Phase2Config(
        response_column=response_column,
        sheet_name=sheet_name,
        header_row=header_row,
        split_keyword=split_keyword,
        stop_feature=stop_feature,
        maximize_response=maximize,
        target_range=target_range,
        discrete_features=discrete_features or {},
        n_suggestions=n_suggestions,
        selection_strategy=strategy,
        min_distance=min_distance,
        exploration_weight=exploration_weight,
        bounds_margin=bounds_margin,
        n_optimizer_restarts=n_optimizer_restarts,
        patience=patience,
        min_improvement=min_improvement,
        duplicate_threshold=duplicate_threshold,
        output_dir=output_dir,
        fresh_start=fresh_start,
        **kwargs
    )
    return Phase2Pipeline(config).run(data_file)


# ─────────────────────────────────────────────────────────────────────────────
# Example Usage
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Example 1: Standard maximization (original behavior)
    config_maximize = Phase2Config(
        response_column='Yield',
        sheet_name='data',
        header_row=5,
        stop_feature='Batch ID',
        maximize_response=True,
        n_suggestions=5,
        selection_strategy='diverse',
        patience=3,
        output_dir='bo_phase2_output'
    )

    # Example 2: Range optimization (NEW)
    # Optimize yield to be between 5.0 and 7.0
    config_range = Phase2Config(
        response_column='Yield',
        sheet_name='data',
        header_row=5,
        stop_feature='Batch ID',
        target_range=(5.0, 7.0),  # NEW: Target range
        n_suggestions=5,
        selection_strategy='diverse',
        patience=3,
        output_dir='bo_phase2_range_output'
    )

    # Example 3: With discrete features (NEW)
    # Define discrete features and their allowed values
    config_discrete = Phase2Config(
        response_column='Yield',
        sheet_name='data',
        header_row=5,
        stop_feature='Batch ID',
        maximize_response=True,
        discrete_features={  # NEW: Discrete features configuration
            'Temperature': [100, 150, 200, 250],
            'Catalyst_Loading': [0.5, 1.0, 1.5, 2.0],
            'Solvent_Type': [1, 2, 3]  # Encoded categorical
        },
        bounds_margin=0.15,  # Customizable margin for continuous features
        n_suggestions=5,
        selection_strategy='diverse',
        patience=3,
        output_dir='bo_phase2_discrete_output'
    )

    # Example 4: Combined - Range optimization with discrete features (NEW)
    config_combined = Phase2Config(
        response_column='Yield',
        sheet_name='data',
        header_row=5,
        stop_feature='Batch ID',
        target_range=(5.0, 7.0),  # Target range
        discrete_features={  # Discrete features
            'Temperature': [100, 150, 200],
            'Catalyst_Type': [1, 2, 3]
        },
        bounds_margin=0.1,
        n_suggestions=5,
        selection_strategy='diverse',
        patience=3,
        output_dir='bo_phase2_combined_output'
    )

    # ─────────────────────────────────────────────────────────────────────────
    # Run the pipeline
    # ─────────────────────────────────────────────────────────────────────────
    
    # Uncomment the configuration you want to use:
    # pipeline = Phase2Pipeline(config_maximize)
    # pipeline = Phase2Pipeline(config_range)
    # pipeline = Phase2Pipeline(config_discrete)
    # pipeline = Phase2Pipeline(config_combined)
    
    # Example run:
    # results = pipeline.run('experiments.xlsx')
    
    # Print results:
    # print("\n" + "=" * 60)
    # print("RESULTS SUMMARY")
    # print("=" * 60)
    # print(f"\nOptimization Mode: {results.optimization_mode}")
    # print(f"Iteration: {results.iteration}")
    # print(f"Current Best: {results.current_best:.4f}")
    # 
    # if results.previous_best is not None:
    #     print(f"Previous Best: {results.previous_best:.4f}")
    #     print(f"Improved: {results.improved}")
    # 
    # if results.in_range_count is not None:
    #     print(f"In Range: {results.in_range_count} ({results.in_range_percentage:.1f}%)")
    # 
    # print(f"\nModel Performance:")
    # print(f"  R²: {results.model_metrics['r2']:.4f}")
    # print(f"  RMSE: {results.model_metrics['rmse']:.4f}")
    # 
    # print(f"\nSuggestions saved to: {results.suggestions_file}")
    # print(f"Continue optimization: {results.should_continue}")
    # print(f"No improvement count: {results.no_improvement_count}")
    # 
    # print("\nSuggested Experiments:")
    # print(results.suggestions.to_string(index=False))

    # ─────────────────────────────────────────────────────────────────────────
    # Quick convenience function examples
    # ─────────────────────────────────────────────────────────────────────────
    
    # Standard maximization:
    # results = run_bo_iteration('experiments.xlsx', 'Yield', maximize=True)
    
    # Standard minimization:
    # results = run_bo_iteration('experiments.xlsx', 'Impurity', maximize=False)
    
    # Range optimization (NEW):
    # results = run_bo_iteration(
    #     'experiments.xlsx', 
    #     'Yield', 
    #     target_range=(5.0, 7.0)
    # )
    
    # With discrete features (NEW):
    # results = run_bo_iteration(
    #     'experiments.xlsx', 
    #     'Yield',
    #     discrete_features={
    #         'Temperature': [100, 150, 200],
    #         'Pressure': [1.0, 2.0, 3.0, 4.0]
    #     }
    # )
    
    # Full example with all new features:
    # results = run_bo_iteration(
    #     data_file='experiments.xlsx',
    #     response_column='Yield',
    #     target_range=(5.0, 7.0),
    #     discrete_features={
    #         'Temperature': [100, 150, 200],
    #         'Catalyst_Type': [1, 2, 3]
    #     },
    #     bounds_margin=0.15,
    #     n_suggestions=10,
    #     strategy='diverse',
    #     output_dir='my_optimization_results',
    #     patience=5
    # )
    
    print("Pipeline ready. Uncomment the desired configuration and run.")