# KeML Framework for RCFST Columns under Eccentric Loading

This notebook implements a **knowledge-enhanced machine learning (KeML)** framework
for predicting the ultimate strength of **rectangular concrete-filled steel tube (RCFST)**
columns under eccentric compression.

**Before running:**

1. Create a folder called `data` in the same directory as this notebook.
2. Put your database file there, named: `rcfst_database.csv`  
3. The CSV should contain at least these columns (you can rename in the code if needed):

   - `H`, `B`, `t`, `L`, `e`, `fy`, `fc_prime`, `Nu_exp`
   - Optionally: `N_empirical` with predictions from any design equation (e.g. Han, Naser, EC4).
     If not present, the plastic resistance `Npl` will be used as the knowledge term.

The notebook will automatically create:

- `artifacts/` for saved models
- `outputs/figures/` for parity plots
- `outputs/metrics/` for CSV tables of metrics
- `outputs/predictions/` for CSV files of predictions


In [None]:
import os
from pathlib import Path
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple, Optional

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
from sklearn.feature_selection import mutual_info_regression, f_regression
from sklearn.linear_model import LinearRegression
from sklearn.metrics import (
    r2_score,
    mean_absolute_error,
    mean_squared_error
)
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor

from xgboost import XGBRegressor
import joblib

# Optional (only needed if you enable tuning later)
import optuna

# ----------------------
# Paths and basic config
# ----------------------

PROJECT_ROOT = Path.cwd()
DATA_DIR     = PROJECT_ROOT / "data"
ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
OUTPUTS_DIR   = PROJECT_ROOT / "outputs"

for p in [
    ARTIFACTS_DIR,
    ARTIFACTS_DIR / "models",
    ARTIFACTS_DIR / "optuna",
    OUTPUTS_DIR,
    OUTPUTS_DIR / "figures",
    OUTPUTS_DIR / "metrics",
    OUTPUTS_DIR / "predictions",
]:
    os.makedirs(p, exist_ok=True)

DB_CSV_PATH = DATA_DIR / "rcfst_database.csv"

RANDOM_STATE = 42
TEST_SIZE    = 0.2

print("Project root:", PROJECT_ROOT)
print("Data path:", DB_CSV_PATH)


In [None]:
# =======================================
# 1. Data utilities & secondary features
# =======================================

def steel_area_rect_hollow(H: float, B: float, t: float) -> float:
    """Approximate steel area of a rectangular hollow section:
    As = H*B - (H - 2t)*(B - 2t)
    Units must be consistent with your input.
    """
    return H * B - (H - 2.0 * t) * (B - 2.0 * t)

def concrete_area_rect(H: float, B: float, t: float) -> float:
    """Concrete core area inside the steel tube:
    Ac = (H - 2t)*(B - 2t)
    """
    return (H - 2.0 * t) * (B - 2.0 * t)

def compute_secondary_features(df: pd.DataFrame) -> pd.DataFrame:
    """Compute secondary features for RCFST columns.

    Assumes primary columns:
      H, B, t, L, e, fy, fc_prime
    """
    out = df.copy()

    H  = out["H"].values
    B  = out["B"].values
    t  = out["t"].values
    L  = out["L"].values
    e  = out["e"].values
    fy = out["fy"].values
    fc = out["fc_prime"].values

    # Areas and plastic axial resistance
    As = steel_area_rect_hollow(H, B, t)
    Ac = concrete_area_rect(H, B, t)

    Asfy = As * fy
    Acfc = Ac * fc
    Npl  = Asfy + Acfc  # simple plastic resistance

    # Geometric slenderness-like quantities
    lam = 2.0 * (3.0 ** 0.5) * L / H      # λ = 2*sqrt(3)*L/H
    e_over_H = e / H
    H_over_t = H / t
    delta = Asfy / np.maximum(Npl, 1e-8)
    alpha = As / np.maximum(Ac, 1e-8)

    # Approximate material stiffness (adjust Es, Ec if you have better values)
    Es = 2.0e5  # MPa
    Ec = 4700.0 * np.sqrt(fc)  # MPa, EC2-style

    # 2nd moments of area as hollow rectangle (about strong axis)
    I_steel = (B * H**3 - (B - 2.0*t) * (H - 2.0*t)**3) / 12.0
    I_conc  = ((B - 2.0*t) * (H - 2.0*t)**3) / 12.0

    EIeff_s = Es * I_steel
    EIeff_c = Ec * I_conc

    # Euler-type elastic buckling resistance (very approximate)
    Ncr = (np.pi**2) * (EIeff_s + 0.6 * EIeff_c) / np.maximum(L**2, 1e-8)

    # Non-dimensional slenderness and reduction factor chi (EC3 / EC4 style)
    lam_bar = np.sqrt(np.maximum(Npl / np.maximum(Ncr, 1e-8), 1e-8))
    alpha_chi = 0.21
    phi = 0.5 * (1.0 + alpha_chi * (lam_bar - 0.2) + lam_bar**2)
    chi = 1.0 / (phi + np.sqrt(np.maximum(phi**2 - lam_bar**2, 1e-8)))

    # Attach to DataFrame
    out["As"] = As
    out["Ac"] = Ac
    out["Asfy"]   = Asfy
    out["Acfc"]   = Acfc
    out["Npl"]    = Npl
    out["lambda"] = lam
    out["e_over_H"] = e_over_H
    out["H_over_t"] = H_over_t
    out["delta"]    = delta
    out["alpha"]    = alpha
    out["EIeff_s"]  = EIeff_s
    out["EIeff_c"]  = EIeff_c
    out["Ncr"]      = Ncr
    out["chi"]      = chi

    return out

def load_and_prepare_database(
    csv_path: Optional[Path] = None,
    use_empirical_column: bool = True,
    empirical_col_name: str = "N_empirical"
) -> Tuple[pd.DataFrame, pd.Series, pd.Series, List[str]]:
    """
    Load the database CSV, compute secondary features, and return:
      X        : feature matrix (DataFrame)
      y        : experimental ultimate strength
      y_emp    : empirical baseline (Han, Naser, EC4, etc.)
      features : list of feature names used

    If `use_empirical_column` is False or the column is missing,
    the baseline is taken as `Npl`.
    """
    path = csv_path or DB_CSV_PATH
    df_raw = pd.read_csv(path)

    # Compute secondary features
    df = compute_secondary_features(df_raw)

    # Experimental target
    y = df["Nu_exp"].astype(float)

    # Knowledge term: empirical design equation or fallback Npl
    if use_empirical_column and empirical_col_name in df.columns:
        y_emp = df[empirical_col_name].astype(float)
    else:
        y_emp = df["Npl"].astype(float)

    # 15-feature example (you can customise this list)
    primary_feats = ["H", "B", "t", "L", "e", "fy", "fc_prime"]
    secondary_feats = [
        "e_over_H", "lambda", "Asfy", "Acfc",
        "alpha", "EIeff_s", "EIeff_c", "Ncr",
    ]
    feature_names = primary_feats + secondary_feats

    X = df[feature_names].astype(float)

    return X, y, y_emp, feature_names

def train_test_split_rcfst(
    X: pd.DataFrame,
    y: pd.Series,
    y_emp: pd.Series,
    test_size: float = TEST_SIZE,
    random_state: int = RANDOM_STATE
):
    """Train-test split, keeping the empirical predictions aligned."""
    X_train, X_test, y_train, y_test, y_emp_train, y_emp_test = train_test_split(
        X, y, y_emp,
        test_size=test_size,
        random_state=random_state
    )
    return X_train, X_test, y_train, y_test, y_emp_train, y_emp_test

# Quick sanity check (will fail if CSV is missing)
if DB_CSV_PATH.exists():
    X_demo, y_demo, y_emp_demo, feats_demo = load_and_prepare_database()
    print("Loaded data with shape:", X_demo.shape)
else:
    print("WARNING: rcfst_database.csv not found yet. Place it under ./data")


In [None]:
# =======================================
# 2. Feature scaling & combined weights
# =======================================

def standardize_features(
    X_train: pd.DataFrame,
    X_test: pd.DataFrame
) -> Tuple[np.ndarray, np.ndarray, StandardScaler]:
    """Standardize features using StandardScaler fitted on training data only."""
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train.values)
    X_test_scaled  = scaler.transform(X_test.values)
    return X_train_scaled, X_test_scaled, scaler

def compute_combined_feature_weights(
    X: pd.DataFrame,
    y: pd.Series
) -> pd.DataFrame:
    """
    Compute combined feature weights using:
      - SVR (linear) coefficients
      - XGBRegressor feature importances
      - mutual_info_regression
      - f_regression

    Each set of scores is normalized and then averaged; the combined weight
    is finally rescaled by its maximum.
    """
    feature_names = X.columns.tolist()
    X_val = X.values
    y_val = y.values

    # 1) SVR with linear kernel
    svr = SVR(kernel="linear")
    svr.fit(X_val, y_val)
    svr_imp = np.abs(svr.coef_).ravel()

    # 2) XGBoost gains
    xgb = XGBRegressor(
        n_estimators=200,
        max_depth=4,
        learning_rate=0.05,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="reg:squarederror",
        random_state=0
    )
    xgb.fit(X_val, y_val)
    xgb_imp = xgb.feature_importances_

    # 3) Mutual information
    mi = mutual_info_regression(X_val, y_val, random_state=0)

    # 4) f_regression
    f_vals, _ = f_regression(X_val, y_val)
    f_vals = np.nan_to_num(f_vals, nan=0.0)

    def normalize(v):
        s = v.sum()
        return v / s if s > 0 else np.zeros_like(v)

    w_svr = normalize(svr_imp)
    w_xgb = normalize(xgb_imp)
    w_mi  = normalize(mi)
    w_f   = normalize(f_vals)

    mean_w = (w_svr + w_xgb + w_mi + w_f) / 4.0
    cwv = mean_w / np.max(mean_w)  # combined weight value

    df_w = pd.DataFrame({
        "feature": feature_names,
        "w_SVR": w_svr,
        "w_XGB": w_xgb,
        "w_MI":  w_mi,
        "w_F":   w_f,
        "mean_w": mean_w,
        "CWV": cwv
    }).sort_values("CWV", ascending=False).reset_index(drop=True)

    return df_w

def drop_highly_correlated_features(
    X: pd.DataFrame,
    cwv_df: pd.DataFrame,
    corr_threshold: float = 0.75
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Drop highly correlated features based on a correlation threshold,
    keeping the more important one according to CWV.
    """
    corr = X.corr().abs()
    ordered = cwv_df["feature"].tolist()

    to_keep = set(ordered)
    to_drop = set()

    for i, fi in enumerate(ordered):
        if fi in to_drop:
            continue
        for fj in ordered[i+1:]:
            if fj in to_drop:
                continue
            if corr.loc[fi, fj] > corr_threshold:
                wi = cwv_df.loc[cwv_df["feature"] == fi, "CWV"].values[0]
                wj = cwv_df.loc[cwv_df["feature"] == fj, "CWV"].values[0]
                if wi >= wj:
                    to_drop.add(fj)
                else:
                    to_drop.add(fi)
                    break

    retained = list(to_keep - to_drop)
    retained = [f for f in ordered if f in retained]

    X_red = X[retained].copy()
    return X_red, retained


In [None]:
# =======================================
# 3. KeML residual-learning regressor
# =======================================

@dataclass
class KeMLRegressor:
    """Knowledge-enhanced ML regressor.

    Model form:
        y = y_emp + δ(x, y_emp)
        δ(x, y_emp) = f_L(x, y_emp) + f_n(x, y_emp)

    where:
      - y_emp: empirical prediction (knowledge)
      - f_L  : linear residual (LinearRegression)
      - f_n  : nonlinear residual (any regressor, e.g., SVR/RF/XGB/ANN)
    """
    base_model: Any
    fit_base_model: bool = True

    def __post_init__(self):
        self.lin_ = LinearRegression()
        self._fitted = False

    def _stack_features(self, X: np.ndarray, y_emp: np.ndarray) -> np.ndarray:
        y_emp_col = np.asarray(y_emp).reshape(-1, 1)
        return np.hstack([X, y_emp_col])

    def fit(self, X: np.ndarray, y: np.ndarray, y_emp: np.ndarray):
        X = np.asarray(X)
        y = np.asarray(y).ravel()
        y_emp = np.asarray(y_emp).ravel()

        Z = self._stack_features(X, y_emp)
        delta = y - y_emp

        # 1) Fit linear component
        self.lin_.fit(Z, delta)
        delta_lin = self.lin_.predict(Z)

        # 2) Fit nonlinear residual
        delta_nl = delta - delta_lin
        if self.fit_base_model and self.base_model is not None:
            self.base_model.fit(Z, delta_nl)

        self._fitted = True
        return self

    def predict(self, X: np.ndarray, y_emp: np.ndarray) -> np.ndarray:
        if not self._fitted:
            raise RuntimeError("KeMLRegressor must be fitted before predicting.")

        X = np.asarray(X)
        y_emp = np.asarray(y_emp).ravel()
        Z = self._stack_features(X, y_emp)

        delta_lin = self.lin_.predict(Z)
        if self.base_model is not None:
            delta_nl = self.base_model.predict(Z)
        else:
            delta_nl = 0.0

        return y_emp + delta_lin + delta_nl


In [None]:
# =======================================
# 4. Original models & Optuna tuning
# =======================================

def build_original_models() -> Dict[str, Any]:
    """Return a dictionary of original ML models (no knowledge term)."""
    models = {
        "KNN": KNeighborsRegressor(
            n_neighbors=3,
            weights="distance"
        ),
        "SVR": SVR(
            kernel="rbf",
            C=100.0,
            gamma="scale",
            epsilon=0.1
        ),
        "DT": DecisionTreeRegressor(
            max_depth=20,
            random_state=0
        ),
        "ANN": MLPRegressor(
            hidden_layer_sizes=(80, 80),
            activation="relu",
            solver="adam",
            learning_rate_init=0.01,
            max_iter=500,
            random_state=0
        ),
        "RF": RandomForestRegressor(
            n_estimators=200,
            max_depth=30,
            random_state=0,
            n_jobs=-1
        ),
        "XGB": XGBRegressor(
            n_estimators=300,
            max_depth=5,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            objective="reg:squarederror",
            random_state=0
        )
    }
    return models

# -------------------
# Optuna for XGB (optional)
# -------------------

def objective_xgb(trial: optuna.Trial, X, y) -> float:
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 50, 400),
        "max_depth": trial.suggest_int("max_depth", 2, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "min_child_weight": trial.suggest_float("min_child_weight", 1.0, 10.0),
        "gamma": trial.suggest_float("gamma", 0.0, 5.0),
    }

    kf = KFold(n_splits=5, shuffle=True, random_state=0)
    rmses = []
    for tr_idx, val_idx in kf.split(X):
        X_tr, X_val = X[tr_idx], X[val_idx]
        y_tr, y_val = y[tr_idx], y[val_idx]

        model = XGBRegressor(
            objective="reg:squarederror",
            random_state=0,
            **params
        )
        model.fit(X_tr, y_tr)
        y_pred = model.predict(X_val)
        rmse = np.sqrt(mean_squared_error(y_val, y_pred))
        rmses.append(rmse)

    return float(np.mean(rmses))

def tune_xgb_with_optuna(
    X,
    y,
    n_trials: int = 50,
    study_name: str = "xgb_keml_tuning"
) -> XGBRegressor:
    """Run Optuna tuning for XGBRegressor and return the best-fitted model."""
    study = optuna.create_study(
        study_name=study_name,
        direction="minimize"
    )
    study.optimize(lambda trial: objective_xgb(trial, X, y), n_trials=n_trials)

    best_params = study.best_params
    best_model = XGBRegressor(
        objective="reg:squarederror",
        random_state=0,
        **best_params
    )
    best_model.fit(X, y)

    # Save study and model
    optuna_path = ARTIFACTS_DIR / "optuna" / f"{study_name}_study.pkl"
    model_path  = ARTIFACTS_DIR / "models"  / f"{study_name}_best_xgb.joblib"
    joblib.dump(study, optuna_path)
    joblib.dump(best_model, model_path)

    print("Best XGB params:", best_params)
    return best_model


In [None]:
# =======================================
# 5. Evaluation helpers (metrics + plots)
# =======================================

def regression_metrics(y_true, y_pred) -> Dict[str, float]:
    y_true = np.asarray(y_true).ravel()
    y_pred = np.asarray(y_pred).ravel()

    r2 = r2_score(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mape = np.mean(np.abs((y_pred - y_true) / y_true)) * 100.0

    return {
        "R2": float(r2),
        "MAE": float(mae),
        "RMSE": float(rmse),
        "MAPE": float(mape),
    }

def save_metrics_table(
    metrics_dict: Dict[str, Dict[str, float]],
    filename: str
):
    df = pd.DataFrame(metrics_dict).T
    path = OUTPUTS_DIR / "metrics" / filename
    df.to_csv(path, index=True)
    print("Saved metrics to:", path)
    return path

def parity_plot(
    y_true,
    y_pred,
    title: str,
    fname: str,
    split: str = ""
):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    fig, ax = plt.subplots()
    ax.scatter(y_true, y_pred, s=20, alpha=0.7)

    # y = x and ±20% bounds
    min_val = min(y_true.min(), y_pred.min())
    max_val = max(y_true.max(), y_pred.max())
    x_line = np.linspace(min_val, max_val, 200)
    ax.plot(x_line, x_line, "k-", label="y = x")
    ax.plot(x_line, 0.8 * x_line, "b--", label="±20% bounds")
    ax.plot(x_line, 1.2 * x_line, "b--")

    ax.set_xlabel("Experimental Nu")
    ax.set_ylabel("Predicted Nu")
    full_title = title if not split else f"{title} ({split})"
    ax.set_title(full_title)
    ax.legend()

    fig_path = OUTPUTS_DIR / "figures" / fname
    fig.tight_layout()
    fig.savefig(fig_path, dpi=300)
    plt.close(fig)
    print("Saved parity plot to:", fig_path)
    return fig_path

def save_predictions(
    specimen_ids,
    y_true,
    y_pred,
    model_name: str,
    split: str
):
    df = pd.DataFrame({
        "specimen_id": specimen_ids,
        "Nu_exp": np.asarray(y_true).ravel(),
        f"Nu_pred_{model_name}_{split}": np.asarray(y_pred).ravel(),
    })
    path = OUTPUTS_DIR / "predictions" / f"pred_{model_name}_{split}.csv"
    df.to_csv(path, index=False)
    print("Saved predictions to:", path)
    return path


In [None]:
# =======================================
# 6. Main pipeline: ORIGINAL vs KeML
# =======================================

# 6.1 Load database
if not DB_CSV_PATH.exists():
    raise FileNotFoundError(
        f"Database CSV not found at {DB_CSV_PATH}. Please create 'data/rcfst_database.csv' first."
    )

X, y, y_emp, feature_names = load_and_prepare_database()
print("Feature names:", feature_names)
print("X shape:", X.shape)
print("y shape:", y.shape)
print("y_emp shape:", y_emp.shape)

# 6.2 Combined weights + optional correlation-based feature pruning
weights_df = compute_combined_feature_weights(X, y)
display(weights_df)

X_reduced, retained_features = drop_highly_correlated_features(X, weights_df, corr_threshold=0.75)
print("Retained features after correlation filtering:", retained_features)
print("X_reduced shape:", X_reduced.shape)

# 6.3 Train-test split (use reduced features)
X_train, X_test, y_train, y_test, y_emp_train, y_emp_test = train_test_split_rcfst(
    X_reduced, y, y_emp
)

specimen_ids_train = X_train.index.to_numpy()
specimen_ids_test  = X_test.index.to_numpy()

# 6.4 Standardize features
X_train_scaled, X_test_scaled, scaler = standardize_features(X_train, X_test)
scaler_path = ARTIFACTS_DIR / "models" / "scaler.joblib"
joblib.dump(scaler, scaler_path)
print("Saved scaler to:", scaler_path)

# 6.5 Original ML models (no knowledge)
orig_models = build_original_models()
orig_metrics_train = {}
orig_metrics_test  = {}

for name, model in orig_models.items():
    print(f"\nTraining ORIGINAL model: {name}")
    model.fit(X_train_scaled, y_train)

    y_pred_tr = model.predict(X_train_scaled)
    y_pred_te = model.predict(X_test_scaled)

    m_tr = regression_metrics(y_train, y_pred_tr)
    m_te = regression_metrics(y_test,  y_pred_te)

    orig_metrics_train[f"{name}_orig_train"] = m_tr
    orig_metrics_test[f"{name}_orig_test"]   = m_te

    # Save model
    model_path = ARTIFACTS_DIR / "models" / f"{name}_orig.joblib"
    joblib.dump(model, model_path)

    # Plots + predictions
    parity_plot(y_train, y_pred_tr, f"{name} ORIGINAL", f"parity_{name}_orig_train.png", "train")
    parity_plot(y_test,  y_pred_te, f"{name} ORIGINAL", f"parity_{name}_orig_test.png",  "test")

    save_predictions(specimen_ids_train, y_train, y_pred_tr, f"{name}_orig", "train")
    save_predictions(specimen_ids_test,  y_test,  y_pred_te,  f"{name}_orig", "test")


save_metrics_table(orig_metrics_train, "metrics_original_train.csv")
save_metrics_table(orig_metrics_test,  "metrics_original_test.csv")

# 6.6 (OPTIONAL) Hyperparameter tuning for XGB
do_tuning = False  # set True to run Optuna tuning (can take time)

if do_tuning:
    best_xgb = tune_xgb_with_optuna(
        X_train_scaled,
        y_train.values,
        n_trials=50,
        study_name="xgb_rcfst"
    )
    orig_models["XGB"] = best_xgb

# 6.7 KeML models (knowledge-enhanced)
keml_models = {}
keml_metrics_train = {}
keml_metrics_test  = {}

for name, base_model in orig_models.items():
    print(f"\nTraining KeML model (base = {name})")
    keml = KeMLRegressor(base_model=base_model)
    keml.fit(X_train_scaled, y_train.values, y_emp_train.values)

    y_pred_tr_k = keml.predict(X_train_scaled, y_emp_train.values)
    y_pred_te_k = keml.predict(X_test_scaled,  y_emp_test.values)

    m_tr_k = regression_metrics(y_train, y_pred_tr_k)
    m_te_k = regression_metrics(y_test,  y_pred_te_k)

    keml_metrics_train[f"{name}_KeML_train"] = m_tr_k
    keml_metrics_test[f"{name}_KeML_test"]   = m_te_k

    keml_models[name] = keml

    model_path = ARTIFACTS_DIR / "models" / f"{name}_KeML.joblib"
    joblib.dump(keml, model_path)

    parity_plot(y_train, y_pred_tr_k, f"{name} KeML", f"parity_{name}_keml_train.png", "train")
    parity_plot(y_test,  y_pred_te_k, f"{name} KeML", f"parity_{name}_keml_test.png",  "test")

    save_predictions(specimen_ids_train, y_train, y_pred_tr_k, f"{name}_KeML", "train")
    save_predictions(specimen_ids_test,  y_test,  y_pred_te_k,  f"{name}_KeML", "test")


save_metrics_table(keml_metrics_train, "metrics_keml_train.csv")
save_metrics_table(keml_metrics_test,  "metrics_keml_test.csv")


# 6.8 Combined comparison table (TEST only)
df_orig_test = pd.DataFrame(orig_metrics_test).T
df_keml_test = pd.DataFrame(keml_metrics_test).T

df_orig_test["model_type"] = "original"
df_keml_test["model_type"] = "KeML"

df_all = pd.concat([df_orig_test, df_keml_test], axis=0)
display(df_all)

combined_path = OUTPUTS_DIR / "metrics" / "metrics_combined_test.csv"
df_all.to_csv(combined_path, index=True)
print("Saved combined test metrics to:", combined_path)
