In [3]:
import warnings
warnings.filterwarnings("ignore")

In [4]:
import pandas as pd
import numpy as np
import gc
import time
import warnings
from pathlib import Path
from tqdm.auto import tqdm
from datetime import datetime

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
    matthews_corrcoef, brier_score_loss
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
from tensorflow.keras import models, layers, optimizers
from tensorflow.keras.callbacks import EarlyStopping

import shap
import optuna

# Global Configuration
base_dir = Path().resolve()
results_dir = base_dir.parent / "results"
results_dir.mkdir(exist_ok=True)

TUNE_MODE = True        # Enable Optuna tuning
N_TRIALS = 20
SEED = 42

print(f"TUNE_MODE: {'ON' if TUNE_MODE else 'OFF'} | Optuna Trials: {N_TRIALS}")

TUNE_MODE: ON | Optuna Trials: 20


In [5]:
# Select Number of Feature for Each Modality
tr_n = 250
fl_n = 200

In [6]:
# Evaluation Function for each model
def evaluate_model(model, X_test, y_test, model_name):
    """Compute all metrics including Brier and MCC."""
    if hasattr(model, "predict_proba"):
        y_prob = model.predict_proba(X_test)[:, 1]
        y_pred = (y_prob > 0.5).astype(int)
    else:
        y_prob = model.predict(X_test)
        y_pred = (y_prob > 0.5).astype(int)

    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred)
    rec = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_prob)
    mcc = matthews_corrcoef(y_test, y_pred)
    brier = brier_score_loss(y_test, y_prob)

    print(f"✅ {model_name}: acc={acc:.3f}, f1={f1:.3f}, auc={auc:.3f}, mcc={mcc:.3f}, brier={brier:.3f}")
    return acc, prec, rec, f1, auc, mcc, brier


In [7]:
# Load Transcriptomics and Fluxomics data
def load_transcriptomic(base_dir, meta):
    trans = pd.read_csv(base_dir.parent / "dataset/csv/transcriptomic_hvg.csv", index_col=0)
    trans = pd.merge(trans, meta[["response"]], left_index=True, right_index=True, how="inner")
    trans = trans.loc[:, ~trans.columns.str.upper().str.startswith("MT-")]
    trans["response"] = trans["response"].map({"Responder": 1, "Non-responder": 0})
    y = trans["response"].astype(np.int8)
    X = trans.drop(columns=["response"]).astype(np.float32)
    return X, y


def load_fluxomic(base_dir):
    flux = pd.read_csv(base_dir.parent / "dataset/csv/fluxomics.csv", index_col=0)
    rxns = [c for c in flux.columns if c != "response" and not c.startswith("EX_")]
    flux = flux[rxns + ["response"]]
    flux["response"] = flux["response"].map({"Responder": 1, "Non-responder": 0})
    y = flux["response"].astype(np.int8)
    X = flux.drop(columns=["response"]).astype(np.float32)
    return X, y

In [8]:
# Load Meta Data and select features
meta = pd.read_csv(base_dir.parent / "dataset/csv/metadata.csv", index_col=0)
hvg_genes = pd.read_csv(base_dir.parent / "dataset/csv/hvg_genes.csv", header=None).squeeze().tolist()

X_trans, y_trans = load_transcriptomic(base_dir, meta)
X_flux, y_flux = load_fluxomic(base_dir)

# select top features
tr_top = [g for g in hvg_genes if g in X_trans.columns][:tr_n]
fl_top = X_flux.var().sort_values(ascending=False).index[:fl_n]

X_tr_250 = X_trans[tr_top]
X_fl_200 = X_flux[fl_top]

# multimodal merge
X_multi_full = pd.merge(X_trans, X_flux, left_index=True, right_index=True, how="inner")
y_multi = meta.loc[X_multi_full.index, "response"].map({"Responder": 1, "Non-responder": 0}).astype(np.int8)
X_multi = X_multi_full[list(tr_top) + list(fl_top)]
print("Data loaded and selected.")


Data loaded and selected.


In [9]:
# Optuna Objective Functions with class balance
def class_balance(y):
    neg, pos = (y == 0).sum(), (y == 1).sum()
    return neg / pos if pos > 0 else 1.0


def tune_logreg(trial, X_train, X_val, y_train, y_val):
    C = trial.suggest_float("C", 1e-3, 100, log=True)
    w = class_balance(y_train)
    model = LogisticRegression(solver="lbfgs", max_iter=1000, C=C, class_weight={0: 1, 1: w})
    model.fit(X_train, y_train)
    preds = model.predict_proba(X_val)[:, 1]
    return roc_auc_score(y_val, preds)


def tune_rf(trial, X_train, X_val, y_train, y_val):
    w = class_balance(y_train)
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 200, 800, step=200),
        "max_depth": trial.suggest_int("max_depth", 6, 16),
        "min_samples_split": trial.suggest_int("min_samples_split", 2, 6),
        "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 4),
        "max_features": trial.suggest_categorical("max_features", ["sqrt", "log2"]),
        "class_weight": {0: 1, 1: w}
    }
    model = RandomForestClassifier(random_state=SEED, n_jobs=-1, **params)
    model.fit(X_train, y_train)
    preds = model.predict_proba(X_val)[:, 1]
    return roc_auc_score(y_val, preds)


def tune_xgb(trial, X_train, X_val, y_train, y_val):
    w = class_balance(y_train)
    params = {
        "objective": "binary:logistic",
        "eval_metric": "auc",
        "tree_method": "hist",
        "scale_pos_weight": w,
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "max_depth": trial.suggest_int("max_depth", 4, 12),
        "subsample": trial.suggest_float("subsample", 0.6, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
        "n_estimators": trial.suggest_int("n_estimators", 200, 1000, step=200)
    }
    model = xgb.XGBClassifier(**params, n_jobs=-1, random_state=SEED, verbosity=0)
    model.fit(X_train, y_train)
    preds = model.predict_proba(X_val)[:, 1]
    return roc_auc_score(y_val, preds)


def tune_ann(trial, X_train, X_val, y_train, y_val):
    w = class_balance(y_train)
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    dropout = trial.suggest_float("dropout", 0.2, 0.6)
    n1 = trial.suggest_int("n1", 32, 128, step=32)
    n2 = trial.suggest_int("n2", 16, 64, step=16)

    model = models.Sequential([
        layers.Input(shape=(X_train.shape[1],)),
        layers.Dense(n1, activation="relu"),
        layers.Dropout(dropout),
        layers.Dense(n2, activation="relu"),
        layers.Dense(1, activation="sigmoid")
    ])
    model.compile(optimizer=optimizers.Adam(learning_rate=lr),
                  loss="binary_crossentropy", metrics=["accuracy"])

    early = EarlyStopping(patience=5, restore_best_weights=True)
    sample_weights = np.where(y_train == 1, w, 1)
    model.fit(X_train, y_train, sample_weight=sample_weights,
              validation_data=(X_val, y_val),
              epochs=30, batch_size=32, verbose=0, callbacks=[early])
    preds = model.predict(X_val)
    return roc_auc_score(y_val, preds)

In [10]:
# Build Models (Logistic Regression, Random Forest, XGBoost and ANN)
def build_models(X_train, y_train):
    """
    Build all models, using class weights (balanced) for both tuned and untuned cases.
    """
    w = class_balance(y_train)   # Compute class ratio
    print(f"Class balance ratio: neg/pos = {w:.2f}")

    if not TUNE_MODE:
        # -------------------------
        # Default models (no Optuna)
        # -------------------------
        models_out = {
            "LogReg": (
                LogisticRegression(
                    C=0.5, solver="lbfgs", max_iter=1000, class_weight={0: 1, 1: w}
                ),
                {"C": 0.5, "class_weight": {0: 1, 1: w}}
            ),
            "RandomForest": (
                RandomForestClassifier(
                    n_estimators=500, max_depth=10,
                    class_weight={0: 1, 1: w},
                    n_jobs=-1, random_state=SEED
                ),
                {"n_estimators": 500, "max_depth": 10, "class_weight": {0: 1, 1: w}}
            ),
            "XGBoost": (
                xgb.XGBClassifier(
                    learning_rate=0.05, max_depth=8, n_estimators=800,
                    subsample=0.9, colsample_bytree=0.8,
                    scale_pos_weight=w, n_jobs=-1, verbosity=0, random_state=SEED
                ),
                {
                    "learning_rate": 0.05,
                    "max_depth": 8,
                    "n_estimators": 800,
                    "scale_pos_weight": w
                }
            ),
            # ANN placeholder — weights applied during training
            "ANN": (None, {"lr": 0.001, "dropout": 0.5, "n1": 64, "n2": 32})
        }
        return models_out

    else:
        # -------------------------
        # Optuna tuning case
        # -------------------------
        X_t, X_v, y_t, y_v = train_test_split(X_train, y_train, test_size=0.2, stratify=y_train, random_state=SEED)
        print("Running Optuna tuning for all models...")
        models_out = {}

        for name, tuner in zip(
            ["LogReg", "RandomForest", "XGBoost", "ANN"],
            [tune_logreg, tune_rf, tune_xgb, tune_ann]
        ):
            study = optuna.create_study(direction="maximize")
            study.optimize(lambda trial: tuner(trial, X_t, X_v, y_t, y_v), n_trials=N_TRIALS, show_progress_bar=True)
            print(f"✅ {name} best AUC={study.best_value:.3f} | params={study.best_params}")

            if name == "LogReg":
                model = LogisticRegression(
                    **study.best_params, solver="lbfgs", max_iter=1000, class_weight={0: 1, 1: w}
                )
            elif name == "RandomForest":
                model = RandomForestClassifier(**study.best_params, random_state=SEED, n_jobs=-1, class_weight={0: 1, 1: w})
            elif name == "XGBoost":
                model = xgb.XGBClassifier(**study.best_params, n_jobs=-1, random_state=SEED, verbosity=0, scale_pos_weight=w)
            else:
                model = None  # ANN uses params separately

            models_out[name] = (model, study.best_params)

        return models_out


In [None]:
# Main Training Loop
results = []
modalities = {
    "Transcriptomic (250)": (X_tr_250, y_trans),
    "Fluxomic (200)": (X_fl_200, y_flux),
    "Multimodal (250+200)": (X_multi, y_multi)
}

for label, (X, y) in modalities.items():
    print(f"\n==============================\n{label}\n==============================")
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=SEED)
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    models_dict = build_models(X_train_scaled, y_train)

    for model_name, (model, best_params) in models_dict.items():
        print(f"\n Training {model_name} ...")

        if model_name == "ANN":
            params = best_params if best_params else {"lr": 0.001, "dropout": 0.5, "n1": 64, "n2": 32}
            w = class_balance(y_train)
            ann = models.Sequential([
                layers.Input(shape=(X_train_scaled.shape[1],)),
                layers.Dense(params["n1"], activation="relu"),
                layers.Dropout(params["dropout"]),
                layers.Dense(params["n2"], activation="relu"),
                layers.Dense(1, activation="sigmoid")
            ])
            ann.compile(optimizer=optimizers.Adam(learning_rate=params["lr"]),
                        loss="binary_crossentropy", metrics=["accuracy"])
            early_stop = EarlyStopping(patience=5, restore_best_weights=True)
            sample_weights = np.where(y_train == 1, w, 1)
            ann.fit(X_train_scaled, y_train, validation_data=(X_test_scaled, y_test),
                    epochs=30, batch_size=32, verbose=0,
                    sample_weight=sample_weights, callbacks=[early_stop])
            y_prob = ann.predict(X_test_scaled)
            y_pred = (y_prob > 0.5).astype(int)
            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred)
            rec = recall_score(y_test, y_pred)
            f1 = f1_score(y_test, y_pred)
            auc = roc_auc_score(y_test, y_prob)
            mcc = matthews_corrcoef(y_test, y_pred)
            brier = brier_score_loss(y_test, y_prob)
        else:
            model.fit(X_train_scaled, y_train)
            acc, prec, rec, f1, auc, mcc, brier = evaluate_model(model, X_test_scaled, y_test, model_name)

        result = {
            "Modality": label,
            "Model": model_name,
            "Acc": acc, "Prec": prec, "Rec": rec, "F1": f1,
            "AUC": auc, "MCC": mcc, "Brier": brier
        }

        if best_params:
            result["Best_Params"] = str(best_params)

        # SHAP for multimodal XGBoost
        if label == "Multimodal (250+200)" and model_name == "XGBoost":
            print("Computing SHAP contributions...")
            sample_X = pd.DataFrame(X_test_scaled, columns=X.columns).sample(n=min(100, len(X_test)), random_state=42)
            explainer = shap.TreeExplainer(model)
            shap_vals = explainer.shap_values(sample_X)
            shap_df = pd.DataFrame({
                "Feature": sample_X.columns,
                "MeanAbsSHAP": np.abs(shap_vals).mean(axis=0)
            })
            shap_df["Modality"] = ["Fluxomic" if f in X_flux.columns else "Transcriptomic" for f in shap_df["Feature"]]
            contrib = shap_df.groupby("Modality")["MeanAbsSHAP"].sum()
            perc = 100 * contrib / contrib.sum()
            result["SHAP_Tr_%"] = perc.get("Transcriptomic", 0)
            result["SHAP_Fl_%"] = perc.get("Fluxomic", 0)
            print(f"SHAP → Tr: {perc.get('Transcriptomic',0):.1f}%, Fl: {perc.get('Fluxomic',0):.1f}%")

        results.append(result)
        gc.collect()

In [None]:
results_df = pd.DataFrame(results)
file = results_dir / f"model_results_optuna_extmetrics_{datetime.now().strftime('%Y%m%d_%H%M')}.csv"
results_df.to_csv(file, index=False)
print(f"\n Results saved to {file}")
display(results_df.sort_values(["Modality", "AUC"], ascending=[True, False]))