In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import (
    RobustScaler,
    OneHotEncoder,
    StandardScaler,
    PowerTransformer,
)
from sklearn.multioutput import MultiOutputRegressor, RegressorChain
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy.optimize import minimize
import warnings
import joblib
import os
import json
from datetime import datetime
import time
import matplotlib

try:
    from tqdm import tqdm
except ImportError:
    def tqdm(iterable, **kwargs):
        return iterable

In [None]:
plt.style.use("default")
sns.set_palette("husl")

In [2]:
save_directory = "ctle_circuit_generation_with_DNN_models_V2"

In [3]:
class IQROutlierCapper(BaseEstimator, TransformerMixin):
    def __init__(self, lower_bound_mult=1.5, upper_bound_mult=1.5):
        self.lower_bound_mult = lower_bound_mult
        self.upper_bound_mult = upper_bound_mult
        self.lower_bounds_ = {}
        self.upper_bounds_ = {}

    def fit(self, X, y=None):
        X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
        for col in X.columns:
            if pd.api.types.is_numeric_dtype(X[col]) and not X[col].isnull().all():
                Q1 = X[col].quantile(0.25)
                Q3 = X[col].quantile(0.75)
                IQR = Q3 - Q1
                self.lower_bounds_[col] = Q1 - (IQR * self.lower_bound_mult)
                self.upper_bounds_[col] = Q3 + (IQR * self.upper_bound_mult)
            else:
                self.lower_bounds_[col] = -np.inf
                self.upper_bounds_[col] = np.inf
        return self

    def transform(self, X):
        X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
        X_copy = X.copy()
        for col in X_copy.columns:
            if (
                col in self.lower_bounds_
                and pd.api.types.is_numeric_dtype(X_copy[col])
                and self.lower_bounds_[col] != -np.inf
                and self.upper_bounds_[col] != np.inf
            ):
                X_copy[col] = X_copy[col].clip(
                    lower=self.lower_bounds_[col], upper=self.upper_bounds_[col]
                )
        return X_copy

In [4]:
data_file = "DataV2.csv"
df = pd.read_csv(data_file)

In [5]:
all_targets = [
    "stage 1 3.5G attenuation",
    "stage 2 3.5G attenuation",
    "stage 1 7G attenuation",
    "stage 2 7G attenuation",
    "stage 1 14G attenuation",
    "stage 2 14G attenuation",
    "stage 1 28G attenuation",
    "stage 2 28G attenuation",
    "eye_maxHeight Vout_1 7G",
    "eye_maxWidth Vout_1 7G",
    "eye_maxHeight Vout_2 7G",
    "eye_maxWidth Vout_2 7G",
    "eye_maxHeight Vout_1 14G",
    "eye_maxWidth Vout_1 14G",
    "eye_maxHeight Vout_2 14G",
    "eye_maxWidth Vout_2 14G",
    "eye_maxHeight Vout_1 28G",
    "eye_maxWidth Vout_1 28G",
    "eye_maxHeight Vout_2 28G",
    "eye_maxWidth Vout_2 28G",
    "eye_maxHeight Vout_1 56G",
    "eye_maxWidth Vout_1 56G",
    "eye_maxHeight Vout_2 56G",
    "eye_maxWidth Vout_2 56G",
]

In [6]:
available_targets = [col for col in all_targets if col in df.columns]

In [7]:
good_targets = []
sparse_targets = []
constant_targets = []
problematic_targets = []
target_analysis = {}

for col in available_targets:
    values = df[col]
    zero_pct = (values == 0).sum() / len(values) * 100
    variance = values.var()
    cv = values.std() / abs(values.mean()) if values.mean() != 0 else float("inf")
    unique_values = values.nunique()

    target_analysis[col] = {
        "zero_pct": zero_pct,
        "variance": variance,
        "cv": cv,
        "mean": values.mean(),
        "std": values.std(),
        "unique_values": unique_values,
        "min": values.min(),
        "max": values.max(),
    }

In [8]:
target_names = available_targets
feature_names = [col for col in df.columns if col not in all_targets]

In [9]:
X = df[feature_names].copy()
y_raw = df[target_names].copy()

In [10]:
y_processed = y_raw.copy()
constant_noise_std = 1e-6

In [11]:
# Handle constant targets by adding small noise
for col in constant_targets:
    if col in y_processed.columns:
        # Add small random noise to constant targets to make them learnable
        noise = np.random.normal(0, constant_noise_std, len(y_processed))
        y_processed[col] = y_processed[col] + noise

In [12]:
# Handle sparse targets with log transformation and offset
sparse_offset = 1e-3
for col in sparse_targets:
    if col in y_processed.columns:
        # Apply log transformation with offset for sparse targets
        min_val = y_processed[col].min()
        if min_val <= 0:
            # Shift to positive values
            y_processed[col] = y_processed[col] - min_val + sparse_offset

        # Apply log1p transformation (log(1+x)) for sparse data
        y_processed[col] = np.log1p(np.abs(y_processed[col]))

In [13]:
# Apply Yeo-Johnson transformation for better normality
power_transformer = PowerTransformer(method="yeo-johnson", standardize=True)

In [14]:
# Only apply to targets with sufficient variance
transformable_targets = [
    col
    for col in target_names
    if col not in constant_targets and y_processed[col].var() > 1e-8
]

In [15]:
if transformable_targets:
    y_transformed = y_processed.copy()

    # Transform each target individually to handle different distributions
    for col in transformable_targets:
        try:
            col_data = y_processed[col].values.reshape(-1, 1)
            transformed_data = power_transformer.fit_transform(col_data)
            y_transformed[col] = transformed_data.flatten()
        except:
            # If transformation fails, use robust scaling
            scaler = RobustScaler()
            col_data = y_processed[col].values.reshape(-1, 1)
            scaled_data = scaler.fit_transform(col_data)
            y_transformed[col] = scaled_data.flatten()

    y = y_transformed
else:
    y = y_processed

In [16]:
# Store preprocessing info
target_preprocessing_info = {
    "constant_targets": constant_targets,
    "sparse_targets": sparse_targets,
    "good_targets": good_targets,
    "problematic_targets": problematic_targets,
    "transformable_targets": (
        transformable_targets if "transformable_targets" in locals() else []
    ),
    "constant_noise_std": constant_noise_std,
    "sparse_offset": sparse_offset,
}

In [17]:
# Identify numerical and categorical features
numerical_features = [
    col
    for col in feature_names
    if col not in ["Stage 1 Region", "Hard Constraint of neg2 on 0.1G Status"]
]

In [18]:
categorical_features = [
    col
    for col in feature_names
    if col in ["Stage 1 Region", "Hard Constraint of neg2 on 0.1G Status"]
]

In [19]:
print(f"Numerical features: {len(numerical_features)}")
print(f"Categorical features: {len(categorical_features)}")

Numerical features: 13
Categorical features: 2


In [20]:
# Create preprocessing pipelines
numerical_pipeline = Pipeline(
    [
        ("outlier_capper", IQROutlierCapper()),
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", RobustScaler()),
    ]
)

categorical_pipeline = Pipeline(
    [
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
    ]
)

# Combine pipelines
if categorical_features:
    preprocessor = ColumnTransformer(
        [
            ("num", numerical_pipeline, numerical_features),
            ("cat", categorical_pipeline, categorical_features),
        ],
        remainder="drop",
    )
else:
    preprocessor = numerical_pipeline

In [21]:
# REORDER TARGETS FOR CHAINED PREDICTIONS

In [22]:
stage1_targets = [col for col in target_names if "stage 1" in col.lower()]
stage2_targets = [col for col in target_names if "stage 2" in col.lower()]
other_targets = [
    col for col in target_names if col not in stage1_targets + stage2_targets
]

# New order: Stage 1 → Stage 2 → Others
reordered_target_names = stage1_targets + stage2_targets + other_targets

In [23]:
# Reorder the target DataFrame and update target_names
y = y[reordered_target_names]
target_names = reordered_target_names

In [24]:
# TRAIN-TEST SPLIT AND PREPROCESSING

In [25]:
if "Hard Constraint of neg2 on 0.1G Status" in X.columns:
    stratify = X["Hard Constraint of neg2 on 0.1G Status"]
else:
    stratify = None

In [26]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=stratify
)

In [27]:
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)

In [28]:
models = {}
model_performance = {}

In [29]:
# Define models optimized for ALL target types with CHAINED PREDICTIONS
models_config = {
    "xgboost_robust": {
        "model": RegressorChain(
            base_estimator=XGBRegressor(
                n_estimators=300,
                max_depth=8,
                learning_rate=0.05,  # Lower learning rate for stability
                subsample=0.8,
                colsample_bytree=0.8,
                reg_alpha=0.5,  # Higher regularization for sparse targets
                reg_lambda=2.0,
                gamma=0.1,  # Minimum split loss for sparse data
                min_child_weight=3,  # Higher minimum child weight
                random_state=42,
                n_jobs=-1,
            ),
            order=None,  # Use default order (Stage 1 → Stage 2 → Others)
            random_state=42,
        ),
        "name": "XGBoost Robust (Chained)",
    },
    "lightgbm_robust": {
        "model": RegressorChain(
            base_estimator=LGBMRegressor(
                n_estimators=300,
                max_depth=8,
                learning_rate=0.05,
                subsample=0.8,
                colsample_bytree=0.8,
                reg_alpha=0.5,
                reg_lambda=2.0,
                min_child_samples=10,  # Higher minimum samples for stability
                min_split_gain=0.1,
                random_state=42,
                n_jobs=-1,
                verbose=-1,
            ),
            order=None,
            random_state=42,
        ),
        "name": "LightGBM Robust (Chained)",
    },
    "random_forest_robust": {
        "model": RegressorChain(
            base_estimator=RandomForestRegressor(
                n_estimators=200,
                max_depth=12,
                min_samples_split=10,  # Higher for sparse data
                min_samples_leaf=5,
                max_features="sqrt",  # Reduce overfitting
                random_state=42,
                n_jobs=-1,
            ),
            order=None,
            random_state=42,
        ),
        "name": "Random Forest Robust (Chained)",
    },
    "mlp_robust": {
        "model": RegressorChain(
            base_estimator=MLPRegressor(
                hidden_layer_sizes=(128, 64, 32),  # Larger network for complex patterns
                activation="relu",
                solver="adam",
                alpha=0.1,  # Higher regularization
                learning_rate_init=0.001,
                max_iter=500,
                early_stopping=True,
                validation_fraction=0.2,
                n_iter_no_change=30,
                random_state=42,
            ),
            order=None,
            random_state=42,
        ),
        "name": "Neural Network Robust (Chained)",
    },
    "gradient_boosting_robust": {
        "model": RegressorChain(
            base_estimator=GradientBoostingRegressor(
                n_estimators=200,
                max_depth=8,
                learning_rate=0.05,
                subsample=0.8,
                min_samples_split=10,
                min_samples_leaf=5,
                random_state=42,
            ),
            order=None,
            random_state=42,
        ),
        "name": "Gradient Boosting Robust (Chained)",
    },
}

In [30]:
for model_key, config in models_config.items():
    print(f"\n  Training {config['name']}...")
    start_model_time = time.time()

    # Train model
    model = config["model"]
    model.fit(X_train_processed, y_train)

    # Predict
    y_pred = model.predict(X_test_processed)

    # Evaluate
    r2 = r2_score(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae = mean_absolute_error(y_test, y_pred)

    # Enhanced evaluation for different target types
    individual_r2 = []
    individual_mae = []
    negative_r2_count = 0
    good_r2_count = 0
    excellent_r2_count = 0

    target_performance = {}

    for i in range(y_test.shape[1]):
        target_name = target_names[i]
        target_r2 = r2_score(y_test.iloc[:, i], y_pred[:, i])
        target_mae = mean_absolute_error(y_test.iloc[:, i], y_pred[:, i])

        individual_r2.append(target_r2)
        individual_mae.append(target_mae)

        # Categorize performance
        if target_r2 < 0:
            negative_r2_count += 1
            performance_category = "negative"
        elif target_r2 > 0.9:
            excellent_r2_count += 1
            performance_category = "excellent"
        elif target_r2 > 0.7:
            good_r2_count += 1
            performance_category = "good"
        else:
            performance_category = "fair"

        target_performance[target_name] = {
            "r2": target_r2,
            "mae": target_mae,
            "category": performance_category,
            "target_type": (
                "constant"
                if target_name in constant_targets
                else (
                    "sparse"
                    if target_name in sparse_targets
                    else "good" if target_name in good_targets else "challenging"
                )
            ),
        }

    training_time = time.time() - start_model_time

    # Store comprehensive results
    models[model_key] = model
    model_performance[model_key] = {
        "name": config["name"],
        "r2": r2,
        "rmse": rmse,
        "mae": mae,
        "individual_r2": individual_r2,
        "individual_mae": individual_mae,
        "target_performance": target_performance,
        "negative_r2_count": negative_r2_count,
        "good_r2_count": good_r2_count,
        "excellent_r2_count": excellent_r2_count,
        "training_time": training_time,
    }

    print(
        f"{config['name']}: R² = {r2:.4f}, RMSE = {rmse:.4f}, Time = {training_time:.1f}s"
    )
    print(
        f"Performance breakdown: {excellent_r2_count} excellent, {good_r2_count} good, {negative_r2_count} negative"
    )

    # Show performance by target type
    type_performance = {}
    for target_type in ["good", "sparse", "constant", "challenging"]:
        type_r2_values = [
            perf["r2"]
            for perf in target_performance.values()
            if perf["target_type"] == target_type
        ]
        if type_r2_values:
            avg_r2 = np.mean(type_r2_values)
            type_performance[target_type] = avg_r2
            print(
                f"{target_type.capitalize()} targets avg R²: {avg_r2:.4f} ({len(type_r2_values)} targets)"
            )

    model_performance[model_key]["type_performance"] = type_performance


  Training XGBoost Robust (Chained)...
XGBoost Robust (Chained): R² = 0.5102, RMSE = 0.3917, Time = 5.2s
Performance breakdown: 10 excellent, 2 good, 9 negative
Challenging targets avg R²: 0.5102 (24 targets)

  Training LightGBM Robust (Chained)...




LightGBM Robust (Chained): R² = 0.5110, RMSE = 0.3907, Time = 4.7s
Performance breakdown: 10 excellent, 2 good, 9 negative
Challenging targets avg R²: 0.5110 (24 targets)

  Training Random Forest Robust (Chained)...
Random Forest Robust (Chained): R² = 0.5095, RMSE = 0.3927, Time = 9.1s
Performance breakdown: 10 excellent, 2 good, 8 negative
Challenging targets avg R²: 0.5095 (24 targets)

  Training Neural Network Robust (Chained)...
Neural Network Robust (Chained): R² = -3669896129105.1060, RMSE = 0.2272, Time = 187.4s
Performance breakdown: 12 excellent, 2 good, 8 negative
Challenging targets avg R²: -3669896129105.1060 (24 targets)

  Training Gradient Boosting Robust (Chained)...
Gradient Boosting Robust (Chained): R² = 0.5068, RMSE = 0.3959, Time = 200.9s
Performance breakdown: 10 excellent, 2 good, 9 negative
Challenging targets avg R²: 0.5068 (24 targets)


In [31]:
model_scores = {}
for model_key, performance in model_performance.items():
    # Composite score considering multiple factors
    base_r2 = performance["r2"]
    excellent_ratio = performance["excellent_r2_count"] / len(target_names)
    good_ratio = performance["good_r2_count"] / len(target_names)
    negative_penalty = performance["negative_r2_count"] / len(target_names)

    # Weighted composite score
    composite_score = (
        base_r2 * 0.4  # Overall R²
        + excellent_ratio * 0.3  # Excellent targets bonus
        + good_ratio * 0.2  # Good targets bonus
        + (1 - negative_penalty) * 0.1  # Negative R² penalty
    )

    model_scores[model_key] = composite_score

    print(f"\n{performance['name']}:")
    print(f"Overall R²: {base_r2:.4f}")
    print(
        f"Excellent targets: {performance['excellent_r2_count']}/{len(target_names)} ({excellent_ratio:.1%})"
    )
    print(
        f"Good targets: {performance['good_r2_count']}/{len(target_names)} ({good_ratio:.1%})"
    )
    print(
        f"Negative R²: {performance['negative_r2_count']}/{len(target_names)} ({negative_penalty:.1%})"
    )
    print(f"Composite score: {composite_score:.4f}")


XGBoost Robust (Chained):
Overall R²: 0.5102
Excellent targets: 10/24 (41.7%)
Good targets: 2/24 (8.3%)
Negative R²: 9/24 (37.5%)
Composite score: 0.4083

LightGBM Robust (Chained):
Overall R²: 0.5110
Excellent targets: 10/24 (41.7%)
Good targets: 2/24 (8.3%)
Negative R²: 9/24 (37.5%)
Composite score: 0.4086

Random Forest Robust (Chained):
Overall R²: 0.5095
Excellent targets: 10/24 (41.7%)
Good targets: 2/24 (8.3%)
Negative R²: 8/24 (33.3%)
Composite score: 0.4121

Neural Network Robust (Chained):
Overall R²: -3669896129105.1060
Excellent targets: 12/24 (50.0%)
Good targets: 2/24 (8.3%)
Negative R²: 8/24 (33.3%)
Composite score: -1467958451641.8093

Gradient Boosting Robust (Chained):
Overall R²: 0.5068
Excellent targets: 10/24 (41.7%)
Good targets: 2/24 (8.3%)
Negative R²: 9/24 (37.5%)
Composite score: 0.4069


In [32]:
best_model_name = max(model_scores.keys(), key=lambda x: model_scores[x])
best_composite_score = model_scores[best_model_name]

best_model = models[best_model_name]

In [33]:
print(f"\nBest model: {model_performance[best_model_name]['name']}")
print(f"R² = {model_performance[best_model_name]['r2']:.4f}")
print(f"RMSE = {model_performance[best_model_name]['rmse']:.4f}")
print(
    f"Negative R² count = {model_performance[best_model_name]['negative_r2_count']}"
)


Best model: Random Forest Robust (Chained)
R² = 0.5095
RMSE = 0.3927
Negative R² count = 8


In [34]:
# BUILD REVERSE MODEL

In [35]:
# Prepare data for reverse modeling with ALL targets
# Use only numerical circuit parameters for reverse prediction
numerical_params = [
    col
    for col in feature_names
    if col not in ["Stage 1 Region", "Hard Constraint of neg2 on 0.1G Status"]
]

In [36]:
X_reverse = df[numerical_params].copy()  # Circuit parameters to predict
y_reverse = y.copy()  # Use preprocessed targets as input

In [37]:
# Handle missing values
X_reverse = X_reverse.fillna(X_reverse.median())
y_reverse = y_reverse.fillna(y_reverse.median())

In [38]:
# Remove extreme outliers that could destabilize reverse model
for col in numerical_params:
    Q1 = X_reverse[col].quantile(0.01)
    Q99 = X_reverse[col].quantile(0.99)
    X_reverse[col] = X_reverse[col].clip(lower=Q1, upper=Q99)

In [39]:
# Train-test split for reverse model
X_rev_train, X_rev_test, y_rev_train, y_rev_test = train_test_split(
    X_reverse, y_reverse, test_size=0.3, random_state=42
)

In [40]:
# Create reverse preprocessors
reverse_X_scaler = RobustScaler()  # For circuit parameters
reverse_y_scaler = RobustScaler()  # For performance targets

In [41]:
# Preprocess
X_rev_train_scaled = reverse_X_scaler.fit_transform(X_rev_train)
X_rev_test_scaled = reverse_X_scaler.transform(X_rev_test)
y_rev_train_scaled = reverse_y_scaler.fit_transform(y_rev_train)
y_rev_test_scaled = reverse_y_scaler.transform(y_rev_test)

In [42]:
try:
    # For RegressorChain, access the base_estimator attribute
    best_forward_estimator = models[best_model_name].base_estimator
except AttributeError:
    # Fallback for other model types
    try:
        best_forward_estimator = models[best_model_name].estimators_[0]
    except:
        best_forward_estimator = None

In [43]:
# Create reverse model using the same architecture as the best forward model
if best_forward_estimator is not None and best_model_name == "xgboost_robust":
    # Use XGBoost with similar parameters but optimized for reverse modeling
    reverse_base_model = XGBRegressor(
        n_estimators=best_forward_estimator.n_estimators,
        max_depth=min(
            best_forward_estimator.max_depth + 2, 12
        ),  # Slightly deeper for inverse complexity
        learning_rate=max(
            best_forward_estimator.learning_rate * 0.8, 0.01
        ),  # Slightly lower learning rate
        subsample=best_forward_estimator.subsample,
        colsample_bytree=best_forward_estimator.colsample_bytree,
        reg_alpha=best_forward_estimator.reg_alpha
        * 1.5,  # Higher regularization for stability
        reg_lambda=best_forward_estimator.reg_lambda * 1.5,
        gamma=best_forward_estimator.gamma,
        min_child_weight=best_forward_estimator.min_child_weight,
        random_state=42,
        n_jobs=-1,
    )
elif best_forward_estimator is not None and best_model_name == "lightgbm_robust":
    # Use LightGBM with similar parameters
    reverse_base_model = LGBMRegressor(
        n_estimators=best_forward_estimator.n_estimators,
        max_depth=min(best_forward_estimator.max_depth + 2, 12),
        learning_rate=max(best_forward_estimator.learning_rate * 0.8, 0.01),
        subsample=best_forward_estimator.subsample,
        colsample_bytree=best_forward_estimator.colsample_bytree,
        reg_alpha=best_forward_estimator.reg_alpha * 1.5,
        reg_lambda=best_forward_estimator.reg_lambda * 1.5,
        min_child_samples=best_forward_estimator.min_child_samples,
        min_split_gain=best_forward_estimator.min_split_gain,
        random_state=42,
        n_jobs=-1,
        verbose=-1,
    )
elif best_forward_estimator is not None and best_model_name == "random_forest_robust":
    # Use Random Forest with similar parameters
    reverse_base_model = RandomForestRegressor(
        n_estimators=best_forward_estimator.n_estimators,
        max_depth=(
            min(best_forward_estimator.max_depth + 2, 15)
            if best_forward_estimator.max_depth
            else None
        ),
        min_samples_split=best_forward_estimator.min_samples_split,
        min_samples_leaf=best_forward_estimator.min_samples_leaf,
        max_features=best_forward_estimator.max_features,
        random_state=42,
        n_jobs=-1,
    )
elif best_forward_estimator is not None and best_model_name == "mlp_robust":
    # Use MLP with similar parameters but adapted for reverse modeling
    reverse_base_model = MLPRegressor(
        hidden_layer_sizes=best_forward_estimator.hidden_layer_sizes,
        activation=best_forward_estimator.activation,
        solver=best_forward_estimator.solver,
        alpha=best_forward_estimator.alpha * 1.5,  # Higher regularization
        learning_rate_init=best_forward_estimator.learning_rate_init * 0.8,
        max_iter=best_forward_estimator.max_iter,
        early_stopping=best_forward_estimator.early_stopping,
        validation_fraction=best_forward_estimator.validation_fraction,
        n_iter_no_change=best_forward_estimator.n_iter_no_change,
        random_state=42,
    )
elif (
    best_forward_estimator is not None and best_model_name == "gradient_boosting_robust"
):
    # Use Gradient Boosting with similar parameters
    reverse_base_model = GradientBoostingRegressor(
        n_estimators=best_forward_estimator.n_estimators,
        max_depth=min(best_forward_estimator.max_depth + 2, 10),
        learning_rate=max(best_forward_estimator.learning_rate * 0.8, 0.01),
        subsample=best_forward_estimator.subsample,
        min_samples_split=best_forward_estimator.min_samples_split,
        min_samples_leaf=best_forward_estimator.min_samples_leaf,
        random_state=42,
    )
else:
    # Fallback to XGBoost if unknown model type or couldn't extract estimator
    print(f"     ⚠️  Using XGBoost fallback for reverse model")
    reverse_base_model = XGBRegressor(
        n_estimators=300,
        max_depth=8,
        learning_rate=0.05,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.5,
        reg_lambda=2.0,
        gamma=0.1,
        min_child_weight=3,
        random_state=42,
        n_jobs=-1,
    )

In [44]:
# Wrap in MultiOutputRegressor for multiple target prediction
reverse_model = MultiOutputRegressor(reverse_base_model)

In [45]:
# Train: y_reverse (performance) -> X_reverse (parameters)
reverse_model.fit(y_rev_train_scaled, X_rev_train_scaled)

In [46]:
# Evaluate reverse model
X_pred_scaled = reverse_model.predict(y_rev_test_scaled)
X_pred = reverse_X_scaler.inverse_transform(X_pred_scaled)

In [47]:
reverse_r2 = r2_score(X_rev_test, X_pred)
reverse_mae = mean_absolute_error(X_rev_test, X_pred)
reverse_rmse = np.sqrt(mean_squared_error(X_rev_test, X_pred))

In [48]:
print(
    f"\n Reverse model trained using {model_performance[best_model_name]['name']} architecture!"
)
print(f"R² = {reverse_r2:.4f}")
print(f"MAE = {reverse_mae:.4f}")
print(f"RMSE = {reverse_rmse:.4f}")
print(
    f"Architecture consistency: Forward ↔ Reverse both use {model_performance[best_model_name]['name']}"
)


 Reverse model trained using Random Forest Robust (Chained) architecture!
R² = 0.8001
MAE = 20.5102
RMSE = 70.0006
Architecture consistency: Forward ↔ Reverse both use Random Forest Robust (Chained)


In [49]:
model_performance["reverse"] = {
    "r2": reverse_r2,
    "mae": reverse_mae,
    "rmse": reverse_rmse,
    "param_names": numerical_params,
    "target_names": target_names,
    "architecture": model_performance[best_model_name]["name"],
    "base_model_type": best_model_name,
    "architecture_consistency": f"Uses same architecture as best forward model ({model_performance[best_model_name]['name']})",
}

In [50]:
# INVERSE DESIGN OPTIMIZATION

In [51]:
FEATURES_TO_OPTIMIZE = ["fW", "current", "ind", "Rd", "Cs", "Rs"]

In [52]:
features_to_optimize = [
    feat for feat in FEATURES_TO_OPTIMIZE if feat in numerical_params
]

In [53]:
# Setup parameter bounds based on data ranges
parameter_bounds = {}
for feat in features_to_optimize:
    min_val = df[feat].min()
    max_val = df[feat].max()
    # Add some margin to bounds
    margin = (max_val - min_val) * 0.1
    parameter_bounds[feat] = (max(0, min_val - margin), max_val + margin)
    print(
        f"  {feat}: [{parameter_bounds[feat][0]:.2e}, {parameter_bounds[feat][1]:.2e}]"
    )

  fW: [1.00e-07, 1.09e-05]
  current: [3.00e-04, 2.70e-03]
  ind: [0.00e+00, 3.30e-09]
  Rd: [0.00e+00, 1.65e+03]
  Cs: [0.00e+00, 1.10e-12]
  Rs: [0.00e+00, 1.65e+03]


In [54]:
# Define realistic attenuation target ranges (based on finalBaseline)
attenuation_ranges = {
    "stage 1 3.5G attenuation": (-30, -6.577),
    "stage 2 3.5G attenuation": (-30, -6.577),
    "stage 1 7G attenuation": (-35, -9.067),
    "stage 2 7G attenuation": (-35, -9.067),
    "stage 1 14G attenuation": (-40, -13.14),
    "stage 2 14G attenuation": (-40, -13.14),
    "stage 1 28G attenuation": (-18.86, 0),
    "stage 2 28G attenuation": (-18.86, 0),
}

In [55]:
# Filter to only include targets that exist in our model
available_attenuation_targets = [
    target for target in attenuation_ranges.keys() if target in target_names
]

In [56]:
# Identify eye metric targets for multi-objective optimization
available_eye_targets = [
    target
    for target in target_names
    if ("eye_maxHeight" in target or "eye_maxWidth" in target)
]

In [57]:
# Create constant features template (median values for non-optimized features)
constant_features_template = {}
for feat in feature_names:
    if feat in features_to_optimize:
        constant_features_template[feat] = df[
            feat
        ].median()  # Will be overridden during optimization
    elif feat in ["Stage 1 Region", "Hard Constraint of neg2 on 0.1G Status"]:
        # Set categorical features to most common values
        constant_features_template[feat] = (
            df[feat].mode()[0] if not df[feat].empty else 0
        )
    else:
        constant_features_template[feat] = df[feat].median()

In [58]:
# Define multi-objective optimization function
def objective_function(
    x,
    target_attenuations,
    target_indices,
    eye_indices,
    attenuation_weight=0.7,
    eye_weight=0.3,
):
    """
    Multi-objective optimization function for inverse design

    Objectives:
    1. Minimize error for specified attenuation targets (minimize)
    2. Maximize eye_maxHeight and eye_maxWidth metrics (maximize)

    Args:
        x: Array of parameter values to optimize
        target_attenuations: Desired attenuation values
        target_indices: Indices of attenuation targets in the full target array
        eye_indices: Indices of eye metrics in the full target array
        attenuation_weight: Weight for attenuation error minimization (0-1)
        eye_weight: Weight for eye metrics maximization (0-1)

    Returns:
        Composite score (lower is better)
    """
    try:
        # Create input DataFrame with optimized parameters
        input_params = constant_features_template.copy()
        for i, feat in enumerate(features_to_optimize):
            input_params[feat] = x[i]

        # Convert to DataFrame and ensure correct order
        input_df = pd.DataFrame([input_params])
        input_df = input_df[feature_names]

        # Preprocess
        input_processed = preprocessor.transform(input_df)

        # Predict all targets using the best model
        all_predictions = best_model.predict(input_processed).flatten()

        # Objective 1: Minimize attenuation error (MSE)
        attenuation_predictions = all_predictions[target_indices]
        attenuation_mse = np.mean((attenuation_predictions - target_attenuations) ** 2)

        # Normalize attenuation error (typical range: 0-100)
        normalized_attenuation_error = attenuation_mse / 100.0

        # Objective 2: Maximize eye metrics (convert to minimization problem)
        if len(eye_indices) > 0:
            eye_predictions = all_predictions[eye_indices]
            # For maximization, we minimize the negative sum
            # Normalize eye metrics (typical range: 0-1 for eye metrics)
            normalized_eye_sum = np.sum(np.maximum(eye_predictions, 0)) / len(
                eye_indices
            )
            # Convert to minimization: higher eye values = lower cost
            eye_cost = 1.0 - np.tanh(normalized_eye_sum)  # tanh to bound between 0-1
        else:
            eye_cost = 0.0

        # Composite objective (weighted sum)
        composite_score = (
            attenuation_weight * normalized_attenuation_error + eye_weight * eye_cost
        )

        return composite_score

    except Exception as e:
        # Return large penalty for invalid parameters
        return 1e6

In [59]:
num_optimization_tests = 50  # Number of random optimization tests
optimization_results = []

In [60]:
for test_idx in range(num_optimization_tests):
    # Use different random seed for each test to ensure diversity
    np.random.seed(42 + test_idx * 7)  # Different seed for each test

    # Generate random target attenuations within realistic ranges
    desired_attenuations = {}
    target_attenuations_array = []
    target_indices = []
    eye_indices = []

    for target in available_attenuation_targets:
        if target in attenuation_ranges:
            min_atten, max_atten = attenuation_ranges[target]
            desired_value = np.random.uniform(min_atten, max_atten)
            desired_attenuations[target] = desired_value
            target_attenuations_array.append(desired_value)
            target_indices.append(target_names.index(target))

    # Get eye metric indices for multi-objective optimization
    for target in available_eye_targets:
        eye_indices.append(target_names.index(target))

    target_attenuations_array = np.array(target_attenuations_array)

    # DIVERSE INITIAL GUESSES - Use random starting points instead of median
    initial_guess = []
    for feat in features_to_optimize:
        min_bound, max_bound = parameter_bounds[feat]
        # Generate random initial guess within bounds
        random_start = np.random.uniform(min_bound, max_bound)
        initial_guess.append(random_start)

    # Setup bounds for scipy optimization
    scipy_bounds = [parameter_bounds[feat] for feat in features_to_optimize]

    # Try multiple optimization methods for diversity
    optimization_methods = ["L-BFGS-B", "TNC", "SLSQP"]
    method_choice = optimization_methods[test_idx % len(optimization_methods)]

    try:
        # Run multi-objective optimization with diverse methods
        optimization_result = minimize(
            fun=objective_function,
            x0=initial_guess,
            args=(target_attenuations_array, target_indices, eye_indices),
            method=method_choice,
            bounds=scipy_bounds,
            options={"maxiter": 300, "ftol": 1e-8},
        )

        if optimization_result.success:
            # Get optimized parameters
            optimized_params = {}
            for i, feat in enumerate(features_to_optimize):
                optimized_params[feat] = optimization_result.x[i]

            # ADD CONTROLLED DIVERSITY: Add small random perturbations to avoid identical solutions
            # This ensures we get diverse parameter combinations for testing
            diversity_factor = 0.05  # 5% variation
            for feat in features_to_optimize:
                min_bound, max_bound = parameter_bounds[feat]
                param_range = max_bound - min_bound
                # Add random perturbation within 5% of parameter range
                perturbation = (
                    np.random.uniform(-diversity_factor, diversity_factor) * param_range
                )
                perturbed_value = optimized_params[feat] + perturbation
                # Ensure still within bounds
                optimized_params[feat] = np.clip(perturbed_value, min_bound, max_bound)

            # Validate the result by forward prediction
            input_params = constant_features_template.copy()
            input_params.update(optimized_params)

            input_df = pd.DataFrame([input_params])
            input_df = input_df[feature_names]
            input_processed = preprocessor.transform(input_df)
            final_predictions = best_model.predict(input_processed).flatten()

            # Extract attenuation predictions
            final_attenuation_predictions = final_predictions[target_indices]

            # Calculate errors for attenuation targets
            individual_errors = np.abs(
                final_attenuation_predictions - target_attenuations_array
            )
            mse_error = np.mean(
                (final_attenuation_predictions - target_attenuations_array) ** 2
            )
            mae_error = np.mean(individual_errors)
            max_error = np.max(individual_errors)

            # Calculate eye metrics performance
            eye_performance = {}
            if len(eye_indices) > 0:
                eye_predictions = final_predictions[eye_indices]
                eye_performance = {
                    "eye_metrics_sum": np.sum(eye_predictions),
                    "eye_metrics_avg": np.mean(eye_predictions),
                    "eye_metrics_min": np.min(eye_predictions),
                    "eye_metrics_max": np.max(eye_predictions),
                }

            # Store detailed results
            result_entry = {
                "test_id": test_idx + 1,
                "success": True,
                "mse_error": mse_error,
                "mae_error": mae_error,
                "max_error": max_error,
                "optimization_iterations": optimization_result.nit,
                "optimization_function_calls": optimization_result.nfev,
            }

            # Add eye metrics performance
            result_entry.update(eye_performance)

            # Add desired targets
            for target, value in desired_attenuations.items():
                result_entry[f"desired_{target}"] = value

            # Add predicted targets
            for i, target in enumerate(available_attenuation_targets):
                result_entry[f"predicted_{target}"] = final_attenuation_predictions[i]

            # Add optimized parameters
            for feat, value in optimized_params.items():
                result_entry[feat] = value

            # Add constant parameters
            for feat, value in constant_features_template.items():
                if feat not in optimized_params:
                    result_entry[f"constant_{feat}"] = value

            optimization_results.append(result_entry)

        else:
            result_entry = {
                "test_id": test_idx + 1,
                "success": False,
                "failure_reason": optimization_result.message,
                "mse_error": np.nan,
                "mae_error": np.nan,
                "max_error": np.nan,
            }

            # Add desired targets
            for target, value in desired_attenuations.items():
                result_entry[f"desired_{target}"] = value

            optimization_results.append(result_entry)

    except Exception as e:
        result_entry = {
            "test_id": test_idx + 1,
            "success": False,
            "failure_reason": str(e),
            "mse_error": np.nan,
            "mae_error": np.nan,
            "max_error": np.nan,
        }

        # Add desired targets
        for target, value in desired_attenuations.items():
            result_entry[f"desired_{target}"] = value

        optimization_results.append(result_entry)

  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(
  optimization_result = minimize(


In [61]:
successful_optimizations = [r for r in optimization_results if r["success"]]
success_rate = len(successful_optimizations) / len(optimization_results) * 100

In [62]:
print(f"\nInverse Design Optimization Summary:")
print(f"Total tests: {len(optimization_results)}")
print(f"Successful optimizations: {len(successful_optimizations)}")
print(f"Success rate: {success_rate:.1f}%")


Inverse Design Optimization Summary:
Total tests: 50
Successful optimizations: 34
Success rate: 68.0%


In [63]:
if successful_optimizations:
    mse_errors = [r["mse_error"] for r in successful_optimizations]
    mae_errors = [r["mae_error"] for r in successful_optimizations]
    max_errors = [r["max_error"] for r in successful_optimizations]

    print(f"Average MSE: {np.mean(mse_errors):.6f}")
    print(f"Average MAE: {np.mean(mae_errors):.3f} dB")
    print(f"Average Max Error: {np.mean(max_errors):.3f} dB")
    print(f"Best MSE: {np.min(mse_errors):.6f}")
    print(f"Best MAE: {np.min(mae_errors):.3f} dB")

Average MSE: 475.169794
Average MAE: 19.748 dB
Average Max Error: 32.384 dB
Best MSE: 194.358825
Best MAE: 13.088 dB


In [64]:
num_random_tests = 30  # Additional random combinations
random_test_results = []

In [65]:
for test_idx in range(num_random_tests):
    # Use different random seed for each random test
    np.random.seed(1000 + test_idx * 13)

    # Generate completely random parameters within bounds
    random_params = {}
    for feat in features_to_optimize:
        min_bound, max_bound = parameter_bounds[feat]
        random_params[feat] = np.random.uniform(min_bound, max_bound)

    # Create full parameter set
    full_params = constant_features_template.copy()
    full_params.update(random_params)

    # Forward prediction to get performance
    input_df = pd.DataFrame([full_params])
    input_df = input_df[feature_names]
    input_processed = preprocessor.transform(input_df)
    predicted_performance = best_model.predict(input_processed).flatten()

    # Store random test result
    random_result = {
        "test_id": f"random_{test_idx + 1}",
        "test_type": "random_generation",
        "success": True,
    }

    # Add random parameters
    for feat, value in random_params.items():
        random_result[feat] = value

    # Add constant parameters
    for feat, value in constant_features_template.items():
        if feat not in random_params:
            random_result[f"constant_{feat}"] = value

    # Add predicted performance for all targets
    for i, target in enumerate(target_names):
        random_result[f"predicted_{target}"] = predicted_performance[i]

    # Add some synthetic "desired" values for consistency
    for target in available_attenuation_targets:
        if target in attenuation_ranges:
            min_atten, max_atten = attenuation_ranges[target]
            random_result[f"desired_{target}"] = np.random.uniform(min_atten, max_atten)

    random_test_results.append(random_result)

In [66]:
# Combine optimization results with random results
all_test_results = optimization_results + random_test_results

In [68]:
# Save combined results
optimization_results_df = pd.DataFrame(all_test_results)
optimization_results_file = f"{save_directory}/inverse_optimization_results.csv"
optimization_results_df.to_csv(optimization_results_file, index=False)
print(f"Combined results saved to: {optimization_results_file}")

Combined results saved to: ctle_circuit_generation_with_DNN_models_V2/inverse_optimization_results.csv


In [69]:
# Create a separate file with ONLY the diverse parameter combinations for easy testing
diverse_params_data = []
for result in all_test_results:
    if result.get("success", False):
        param_entry = {"test_id": result.get("test_id", "unknown")}

        # Add the optimized/random parameters
        for feat in features_to_optimize:
            if feat in result:
                param_entry[feat] = result[feat]

        # Add some constant parameters for completeness
        for feat in ["Stage 1 Region", "Hard Constraint of neg2 on 0.1G Status"]:
            if f"constant_{feat}" in result:
                param_entry[feat] = result[f"constant_{feat}"]
            elif feat in constant_features_template:
                param_entry[feat] = constant_features_template[feat]

        diverse_params_data.append(param_entry)

diverse_params_df = pd.DataFrame(diverse_params_data)
diverse_params_file = f"{save_directory}/diverse_parameter_combinations.csv"
diverse_params_df.to_csv(diverse_params_file, index=False)
print(f"Diverse parameters only: {diverse_params_file}")

Diverse parameters only: ctle_circuit_generation_with_DNN_models_V2/diverse_parameter_combinations.csv


In [154]:
# SAVE COMPLETE SYSTEM

In [70]:
os.makedirs(save_directory, exist_ok=True)

In [71]:
# Save models
for name, model in models.items():
    joblib.dump(model, f"{save_directory}/forward_model_{name}.joblib")

# Save preprocessor
joblib.dump(preprocessor, f"{save_directory}/preprocessor.joblib")

# Save reverse model components
joblib.dump(reverse_model, f"{save_directory}/reverse_model.joblib")
joblib.dump(reverse_X_scaler, f"{save_directory}/reverse_x_scaler.joblib")
joblib.dump(reverse_y_scaler, f"{save_directory}/reverse_y_scaler.joblib")

# Save comprehensive metadata
metadata = {
    "dataset_shape": df.shape,
    "target_categories": {
        "good_targets": good_targets,
        "sparse_targets": sparse_targets,
        "constant_targets": constant_targets,
        "problematic_targets": problematic_targets,
    },
    "target_preprocessing_info": target_preprocessing_info,
    "model_performance": model_performance,
    "model_scores": model_scores,
    "best_model_name": best_model_name,
    "best_composite_score": best_composite_score,
    "feature_names": feature_names,
    "target_names": target_names,
    "numerical_params": numerical_params,
    "total_targets_used": len(target_names),
    "optimization_results": {
        "optimization_tests": len(optimization_results),
        "random_tests": (
            len(random_test_results) if "random_test_results" in locals() else 0
        ),
        "total_tests": (
            len(all_test_results)
            if "all_test_results" in locals()
            else len(optimization_results)
        ),
        "successful_tests": len(successful_optimizations),
        "success_rate": success_rate,
        "features_optimized": features_to_optimize,
        "parameter_bounds": parameter_bounds,
        "attenuation_targets": available_attenuation_targets,
        "average_mae": np.mean(mae_errors) if successful_optimizations else None,
        "best_mae": np.min(mae_errors) if successful_optimizations else None,
        "diversity_enhancements": [
            "Random initial guesses for each optimization",
            "Multiple optimization methods (L-BFGS-B, TNC, SLSQP)",
            "Controlled parameter perturbations (±5%)",
            "Additional random parameter generation",
            "Different random seeds for each test",
        ],
    },
    "timestamp": datetime.now().isoformat(),
    "system_version": "2.1_with_inverse_optimization",
}

with open(f"{save_directory}/metadata.json", "w") as f:
    json.dump(metadata, f, indent=2, default=str)