
# Ensemble Component Ablation

This notebook evaluates four ensemble pairs on the chosen C-MAPSS subset using three scenarios per pair:
- Single model (A)
- Single model (B)
- Fixed-weight averaging (70/30) — the higher-weighted side is chosen via cross-validation on the training split
- Stacking (Ridge, α=1) — meta-learner fitted on out-of-fold predictions

**Outputs**
- A CSV summary with R², MSE, MAE, and execution time (seconds)
- Bar charts for R² / MSE / MAE for each pair


In [None]:

from __future__ import annotations

import os
import time
from dataclasses import dataclass
from typing import Dict, Tuple, List

import numpy as np
import pandas as pd

from sklearn.model_selection import GroupShuffleSplit, GroupKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.linear_model import Ridge

from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor, HistGradientBoostingRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.neural_network import MLPRegressor

try:
    from lightgbm import LGBMRegressor
except Exception:
    LGBMRegressor = None
try:
    from catboost import CatBoostRegressor
except Exception:
    CatBoostRegressor = None
try:
    from xgboost import XGBRegressor
except Exception:
    XGBRegressor = None

import matplotlib.pyplot as plt

pd.set_option("display.max_columns", 100)
pd.set_option("display.width", 180)


In [None]:

# -------------------
# Configuration
# -------------------
DATA_DIR = "data/CMAPSS"  # location of C-MAPSS files
DATASET = "FD001"         # FD001/FD002/FD003/FD004

RANDOM_STATE = 42
TEST_SIZE = 0.2
N_FOLDS_STACK = 5
N_FOLDS_WEIGHT = 3

OUT_DIR = "ablation_outputs"
os.makedirs(OUT_DIR, exist_ok=True)

INCLUDE_RUL_SCORE = True

ENSEMBLE_PAIRS = [
    ("LGBM", "CAT"),
    ("GBR", "XGB"),
    ("RF",  "HGBR"),
    ("MLP", "KNN"),
]

PARAMS = {
    "LGBM": dict(n_estimators=500, learning_rate=0.1, max_depth=-1, num_leaves=31, subsample=0.8, colsample_bytree=0.8, random_state=RANDOM_STATE),
    "CAT":  dict(iterations=500, learning_rate=0.1, depth=6, random_seed=RANDOM_STATE, verbose=0),
    "GBR":  dict(n_estimators=300, max_depth=7, learning_rate=0.1, subsample=0.8, random_state=RANDOM_STATE),
    "XGB":  dict(objective="reg:squarederror", learning_rate=0.2, max_depth=4, n_estimators=200, subsample=1.0, colsample_bytree=0.8, random_state=RANDOM_STATE, n_jobs=0),
    "RF":   dict(n_estimators=500, random_state=RANDOM_STATE, n_jobs=0),
    "HGBR": dict(random_state=RANDOM_STATE),
    "MLP":  dict(hidden_layer_sizes=(50,50,50), activation="relu", solver="adam", alpha=0.01, learning_rate="constant", max_iter=500, random_state=RANDOM_STATE),
    "KNN":  dict(n_neighbors=14, weights="uniform", metric="manhattan"),
}

NEEDS_SCALING = {"MLP","KNN"}


In [None]:

def load_cmapss_train(dataset: str, data_dir: str) -> pd.DataFrame:
    fname = os.path.join(data_dir, f"train_{dataset}.txt")
    if not os.path.exists(fname):
        raise FileNotFoundError(f"Missing file: {fname}")
    df = pd.read_csv(fname, sep="\s+", header=None)
    cols = ["unit","cycle"] + [f"op{i}" for i in range(1,4)] + [f"s{i}" for i in range(1,22)]
    df = df.iloc[:, :len(cols)]
    df.columns = cols
    return df

def add_rul_labels(df: pd.DataFrame) -> pd.DataFrame:
    max_cycles = df.groupby("unit")["cycle"].max().rename("max_cycle")
    df = df.merge(max_cycles, on="unit", how="left")
    df["RUL"] = df["max_cycle"] - df["cycle"]
    return df.drop(columns=["max_cycle"])

def train_test_group_split(df: pd.DataFrame, test_size: float, random_state: int):
    gss = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    groups = df["unit"].values
    idx = np.arange(len(df))
    tr, te = next(gss.split(idx, groups=groups))
    return df.iloc[tr].copy(), df.iloc[te].copy()

def rul_score(y_true, y_pred):
    d = y_pred - y_true
    s = np.where(d < 0, np.exp(-d / 10.0), np.exp(d / 13.0))
    return float(np.sum(s))

def compute_metrics(y_true, y_pred, include_rul=False):
    out = {"R2": r2_score(y_true, y_pred),
           "MSE": mean_squared_error(y_true, y_pred),
           "MAE": mean_absolute_error(y_true, y_pred)}
    if include_rul:
        out["RUL_Score"] = rul_score(y_true, y_pred)
    return out

def feature_columns(df: pd.DataFrame):
    return [c for c in df.columns if c not in ("unit","cycle","RUL")]


In [None]:

def make_model(tag: str):
    p = PARAMS[tag]
    if tag == "LGBM":
        if LGBMRegressor is None: raise ImportError("lightgbm not installed.")
        return LGBMRegressor(**p)
    if tag == "CAT":
        if CatBoostRegressor is None: raise ImportError("catboost not installed.")
        return CatBoostRegressor(**p)
    if tag == "GBR": return GradientBoostingRegressor(**p)
    if tag == "XGB":
        if XGBRegressor is None: raise ImportError("xgboost not installed.")
        return XGBRegressor(**p)
    if tag == "RF": return RandomForestRegressor(**p)
    if tag == "HGBR": return HistGradientBoostingRegressor(**p)
    if tag == "MLP": return MLPRegressor(**p)
    if tag == "KNN": return KNeighborsRegressor(**p)
    raise ValueError(tag)

def build_pipeline(tag: str, numeric_features: List[str]):
    model = make_model(tag)
    if tag in NEEDS_SCALING:
        pre = ColumnTransformer([("num", StandardScaler(), numeric_features)], remainder="drop")
        return Pipeline([("pre", pre), ("model", model)])
    return Pipeline([("model", model)])

def timed_fit_predict(pipe: Pipeline, X_train, y_train, X_test):
    t0 = time.perf_counter()
    pipe.fit(X_train, y_train)
    y_pred = pipe.predict(X_test)
    return y_pred, time.perf_counter() - t0


In [None]:

def choose_weights_by_cv(tagA, tagB, X_train, y_train, groups, numeric_cols):
    gkf = GroupKFold(n_splits=N_FOLDS_WEIGHT)
    mseA, mseB = [], []
    for tr, va in gkf.split(X_train, y_train, groups):
        X_tr, X_va = X_train.iloc[tr], X_train.iloc[va]
        y_tr, y_va = y_train[tr], y_train[va]
        pA = build_pipeline(tagA, numeric_cols); pB = build_pipeline(tagB, numeric_cols)
        pA.fit(X_tr, y_tr); pB.fit(X_tr, y_tr)
        mseA.append(mean_squared_error(y_va, pA.predict(X_va)))
        mseB.append(mean_squared_error(y_va, pB.predict(X_va)))
    return (0.7, 0.3) if float(np.mean(mseA)) <= float(np.mean(mseB)) else (0.3, 0.7)

def stacking_predict(tagA, tagB, X_train, y_train, X_test, groups, numeric_cols):
    t0 = time.perf_counter()
    gkf = GroupKFold(n_splits=N_FOLDS_STACK)
    oof_A = np.zeros(len(X_train)); oof_B = np.zeros(len(X_train))
    for tr, va in gkf.split(X_train, y_train, groups):
        X_tr, X_va = X_train.iloc[tr], X_train.iloc[va]; y_tr = y_train[tr]
        pA = build_pipeline(tagA, numeric_cols); pB = build_pipeline(tagB, numeric_cols)
        pA.fit(X_tr, y_tr); pB.fit(X_tr, y_tr)
        oof_A[va] = pA.predict(X_va); oof_B[va] = pB.predict(X_va)
    meta = Ridge(alpha=1.0, random_state=RANDOM_STATE).fit(np.c_[oof_A,oof_B], y_train)
    pA_full = build_pipeline(tagA, numeric_cols); pB_full = build_pipeline(tagB, numeric_cols)
    pA_full.fit(X_train, y_train); pB_full.fit(X_train, y_train)
    Z_test = np.c_[pA_full.predict(X_test), pB_full.predict(X_test)]
    return meta.predict(Z_test), time.perf_counter() - t0


In [None]:

from dataclasses import dataclass
@dataclass
class ScenarioResult:
    pair: str
    scenario: str
    R2: float
    MSE: float
    MAE: float
    ExecTimeSec: float
    RUL_Score: float | None = None

def run_ablation_for_pair(tagA, tagB, df_train):
    df = add_rul_labels(df_train.copy())
    X_cols = feature_columns(df)
    df_tr, df_te = train_test_group_split(df, test_size=TEST_SIZE, random_state=RANDOM_STATE)
    X_tr, X_te = df_tr[X_cols], df_te[X_cols]
    y_tr, y_te = df_tr["RUL"].values, df_te["RUL"].values
    groups_tr = df_tr["unit"].values
    numeric_cols = X_cols

    results = []

    pipeA = build_pipeline(tagA, numeric_cols)
    predA, tA = timed_fit_predict(pipeA, X_tr, y_tr, X_te)
    metA = compute_metrics(y_te, predA, include_rul=INCLUDE_RUL_SCORE)
    results.append(ScenarioResult(pair=f"{tagA}+{tagB}", scenario=f"{tagA} (Single)", R2=metA['R2'], MSE=metA['MSE'], MAE=metA['MAE'], ExecTimeSec=tA, RUL_Score=metA.get('RUL_Score')))

    pipeB = build_pipeline(tagB, numeric_cols)
    predB, tB = timed_fit_predict(pipeB, X_tr, y_tr, X_te)
    metB = compute_metrics(y_te, predB, include_rul=INCLUDE_RUL_SCORE)
    results.append(ScenarioResult(pair=f"{tagA}+{tagB}", scenario=f"{tagB} (Single)", R2=metB['R2'], MSE=metB['MSE'], MAE=metB['MAE'], ExecTimeSec=tB, RUL_Score=metB.get('RUL_Score')))

    wA, wB = choose_weights_by_cv(tagA, tagB, X_tr, y_tr, groups_tr, numeric_cols)
    predW = wA*predA + wB*predB
    tW = tA + tB
    metW = compute_metrics(y_te, predW, include_rul=INCLUDE_RUL_SCORE)
    results.append(ScenarioResult(pair=f"{tagA}+{tagB}", scenario="Weighted Average (7:3)", R2=metW['R2'], MSE=metW['MSE'], MAE=metW['MAE'], ExecTimeSec=tW, RUL_Score=metW.get('RUL_Score')))

    predS, tS = stacking_predict(tagA, tagB, X_tr, y_tr, X_te, groups_tr, numeric_cols)
    metS = compute_metrics(y_te, predS, include_rul=INCLUDE_RUL_SCORE)
    results.append(ScenarioResult(pair=f"{tagA}+{tagB}", scenario="Stacking", R2=metS['R2'], MSE=metS['MSE'], MAE=metS['MAE'], ExecTimeSec=tS, RUL_Score=metS.get('RUL_Score')))

    return results

def run_full_ablation(dataset=DATASET):
    df_train = load_cmapss_train(dataset, DATA_DIR)
    all_results = []
    for tagA, tagB in ENSEMBLE_PAIRS:
        all_results.extend(run_ablation_for_pair(tagA, tagB, df_train))
    rows = []
    for r in all_results:
        row = dict(EnsemblePair=r.pair, Scenario=r.scenario, R2=r.R2, MSE=r.MSE, MAE=r.MAE, ExecTimeSec=r.ExecTimeSec)
        if INCLUDE_RUL_SCORE: row["RUL_Score"] = r.RUL_Score
        rows.append(row)
    return pd.DataFrame(rows)


In [None]:

print("Ready. Call run_full_ablation() to compute results on the selected subset.")


In [None]:

# Execute and save
try:
    results_df = run_full_ablation(DATASET)
    display(results_df.head(20))
    csv_path = os.path.join(OUT_DIR, f"ablation_results_{DATASET}.csv")
    results_df.to_csv(csv_path, index=False)
    print("Saved:", csv_path)
except FileNotFoundError as e:
    print(str(e))
    print("→ Check DATA_DIR and C-MAPSS files.")


In [None]:

def plot_metric_bars(df: pd.DataFrame, pair: str, metric: str):
    subset = df[df["EnsemblePair"] == pair]
    labels = subset["Scenario"].tolist()
    values = subset[metric].tolist()
    plt.figure(figsize=(8,5))
    plt.bar(labels, values)
    plt.title(f"{pair} — {metric}")
    plt.ylabel(metric); plt.xlabel("Scenario")
    plt.xticks(rotation=20); plt.tight_layout()
    p = os.path.join(OUT_DIR, f"{pair.replace('+','_')}_{metric}.png")
    plt.savefig(p, dpi=160); plt.show(); print("Saved:", p)


In [None]:

if 'results_df' in globals():
    for pair in sorted(results_df["EnsemblePair"].unique()):
        for metric in ["R2","MSE","MAE"]:
            plot_metric_bars(results_df, pair, metric)
else:
    print("Run the ablation cell first to generate results.")
