# Predictive Maintenance mit SCANIA-Daten – Modeling

**Projekt:** Bachelorarbeit Data Science  
**Thema:** 
**Datengrundlage:** SCANIA Component X Dataset  
**Autor:** Justin Stange-Heiduk  
**Betreuung:** Dr. Martin Prause  
**Ziel:** Modell erstellung XGBoost mit AFT und Random Forest Survival  

---

**Erstellt:** 2025-09-01  
**Letzte Änderung:** 2025-09-25


---

In [1]:
import numpy as np
import pandas as pd
from sksurv.ensemble import RandomSurvivalForest
from sksurv.util import Surv
from sksurv.metrics import concordance_index_censored, integrated_brier_score
import mlflow
import optuna
from optuna.pruners import SuccessiveHalvingPruner
import pickle
from pathlib import Path
import time, sys
from optuna.samplers import GridSampler
from hashlib import sha1
import xgboost as xgb
import scipy
import os


In [3]:
%run CommonFunctions.ipynb

In [3]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

### 1. Random Survival Forest HPO
### 2. XGBoost mit AFT HPO
### 3. Modellerstellung RSF
### 4. Modelerstellung XGBoost

#### Einlesen der Daten

In [7]:
def prepare_rsf_model_input(df: pd.DataFrame, columns_to_drop: list, frag: float, class_column: str, sampling: bool) -> tuple[pd.DataFrame, np.ndarray]: 
    """ Prepares the input data for the Random Survival Forest model with option to sample a fraction of each class. 
    
    Args: df (pd.DataFrame): The input dataframe containing features and target variables. 
    columns_to_drop (list): List of columns to drop from the dataframe. 
    frag (float): Fraction of data to sample from each class. 
    class_column (str): The name of the column representing the class labels. 
    sampling (bool): Whether to perform sampling or not.
    Returns: tuple[pd.DataFrame, np.ndarray]: A tuple containing the feature dataframe and the structured array for survival analysis. """ 
    df_list = [] 

    if sampling:
        for i in df[class_column].unique(): 
            df_list.append( df[df[class_column] == i].sample(frac=frag, random_state=42)) 
        df = pd.concat(df_list) 

    y_surv = Surv.from_arrays(event=df["event"].astype(bool), time=df["duration"].astype(float)) 
    X = df.drop(columns=columns_to_drop) 
    return X, y_surv 



In [8]:
X_train, y_train_surv = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_train_corr_labels").drop(columns=["upper_bound"]), columns_to_drop=["duration", "event", "vehicle_id", "class"], frag=0.01, class_column="class", sampling=True) 

X_val, y_val_surv = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_validation_corr_labels").drop(columns=["upper_bound"]), columns_to_drop=["duration", "event", "vehicle_id", "class_label"], frag=1.0, class_column="class_label", sampling=False) 

validation_data = load_df(ordner="04_feature", name = "feature_validation_corr_labels").drop(columns=["upper_bound"])

In [9]:
import os
# Sicherstellen, dass der Ordner existiert
save_dir = "../Data/05_model_input/HPO_RSF"
os.makedirs(save_dir, exist_ok=True)

# X_train und X_val (DataFrames) speichern
X_train.to_parquet(os.path.join(save_dir, "X_train.parquet"), index=False)
X_val.to_parquet(os.path.join(save_dir, "X_val.parquet"), index=False)

# y_train_surv und y_val_surv sind Structured Arrays -> DataFrame
y_train_df = pd.DataFrame({
    "event": y_train_surv["event"].astype(int),
    "duration": y_train_surv["time"].astype(float)
})
y_val_df = pd.DataFrame({
    "event": y_val_surv["event"].astype(int),
    "duration": y_val_surv["time"].astype(float)
})

y_train_df.to_parquet(os.path.join(save_dir, "y_train_surv.parquet"), index=False)
y_val_df.to_parquet(os.path.join(save_dir, "y_val_surv.parquet"), index=False)

# Validation Data auch speichern
validation_data.to_parquet(os.path.join(save_dir, "validation_data.parquet"), index=False)



In [None]:
X_train = pd.read_parquet("../Data/05_model_input/HPO_RSF/X_train.parquet")
X_val   = pd.read_parquet("../Data/05_model_input/HPO_RSF/X_val.parquet")
y_train_df = pd.read_parquet("../Data/05_model_input/HPO_RSF/y_train_surv.parquet")
y_val_df   = pd.read_parquet("../Data/05_model_input/HPO_RSF/y_val_surv.parquet")

# Zurück in Surv-Format
y_train_surv = Surv.from_arrays(event=y_train_df["event"].astype(bool),
                                time=y_train_df["duration"].astype(float))
y_val_surv   = Surv.from_arrays(event=y_val_df["event"].astype(bool),
                                time=y_val_df["duration"].astype(float))

validation_data = pd.read_parquet("../Data/05_model_input/HPO_RSF/validation_data.parquet")


### 1. Random Survival Forest HPO

#### Sichtbare Logs

In [13]:
def log(msg):
    print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)


#### Kostenfunktion und Klassen-Mapping aus der Survivalkurve

In [14]:
# Kostenmatrix aus deinem Paper (Zeilen = Actual n, Spalten = Predicted m)
COST = np.array([
    [0,   7,   8,   9,   10],
    [200, 0,   7,   8,    9],
    [300, 200, 0,   7,    8],
    [400, 300, 200, 0,    7],
    [500, 400, 300, 200,  0]
], dtype=float)

# Klassengrenzen für RUL in Zeiteinheiten, konsistent zu deinen Labels 0..4
# Beispiel: 4: [0,6), 3: [6,12), 2: [12,24), 1: [24,48), 0: [48, inf)
TAUS = np.array([6.0, 12.0, 24.0, 48.0], dtype=float)

In [15]:
def class_probs_from_S_tau(S_tau: np.ndarray) -> np.ndarray:
    """
    Berechnet p0..p4 direkt aus den Werten S(tau1..tau4).

    Args: 
        S_tau = [S(tau1), S(tau2), S(tau3), S(tau4)].
        
    Return: Wahrscheinlichkeiten p0..p4 für Klassen 0..4
    """
    S1, S2, S3, S4 = S_tau
    p4 = 1.0 - S1
    p3 = S1 - S2
    p2 = S2 - S3
    p1 = S3 - S4
    p0 = S4
    p = np.clip(np.array([p0, p1, p2, p3, p4], dtype=float), 0.0, 1.0)
    s = p.sum()
    return p / s if s > 0 else np.array([1.0, 0.0, 0.0, 0.0, 0.0], dtype=float)


In [17]:
def decide_with_cost_from_rsf_at_taus(
    rsf: RandomSurvivalForest,
    X: pd.DataFrame,
    taus: np.ndarray,
    cost: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Ermittelt je Instanz die kostenminimale Klasse m̂ und die dazugehörigen Größen.
    Rückgaben:
      pred_class  Länge N
      exp_cost_min Länge N
      probs       Form (N,5) für p0..p4
    """
    surv_fns = rsf.predict_survival_function(X, return_array=False)
    N = len(X)
    pred_class = np.zeros(N, dtype=int)
    exp_cost_min = np.zeros(N, dtype=float)
    probs = np.zeros((N, 5), dtype=float)

    for i, fn in enumerate(surv_fns):
        S_tau = fn(taus)
        p = class_probs_from_S_tau(S_tau)
        exp_vec = cost.T @ p
        m_hat = int(np.argmin(exp_vec))
        pred_class[i] = m_hat
        exp_cost_min[i] = float(exp_vec[m_hat])
        probs[i, :] = p

    return pred_class, exp_cost_min, probs

In [18]:
def evaluate_decision_costs_from_true(
    true_class: pd.Series | np.ndarray,
    pred_class: np.ndarray,
    cost: np.ndarray
) -> tuple[float, float, pd.DataFrame, pd.DataFrame]:
    """
    Berechnet realisierte Kosten Cost[n, m̂] und Konfusion.
    Rückgaben:
      avg_cost, total_cost
    """
    n = np.asarray(true_class, dtype=int)
    m = np.asarray(pred_class, dtype=int)
    assert n.shape == m.shape, "true_class und pred_class müssen gleich lang sein."

    realized = np.array([cost[n_i, m_i] for n_i, m_i in zip(n, m)], dtype=float)
    avg_cost = float(np.mean(realized))
    total_cost = float(np.sum(realized))



    return avg_cost, total_cost

#### RSF-Modell und Metriken

In [19]:
def fit_rsf(X: pd.DataFrame, duration: pd.Series, event: pd.Series, **params) -> RandomSurvivalForest:
    """
    Trainiere einen RandomSurvivalForest mit den gegebenen Hyperparametern.

    Args:
        X (pd.DataFrame): Die Eingabedaten.
        duration (pd.Series): Die Überlebenszeiten.
        event (pd.Series): Die Ereignisdaten.
        **params: Zusätzliche Hyperparameter für das Modell.

    Return:
        RandomSurvivalForest: Das trainierte Modell.
    """
    
    y = Surv.from_arrays(event=event.astype(bool), time=duration.astype(float))
    rsf = RandomSurvivalForest(
        n_estimators=params.get("n_estimators"),
        max_depth=params.get("max_depth"),
        max_features=params.get("max_features"),
        min_samples_split=params.get("min_samples_split"),
        min_samples_leaf=params.get("min_samples_leaf"),
        n_jobs=-1,
        random_state=42,
        verbose=0
    )
    log(f"→ starte FIT mit Parametern: {params}")
    rsf.fit(X, y)
    log("✓ FIT fertig")
    return rsf

#### MLflow initialisieren

In [27]:
def setup_mlflow(experiment_name: str, tracking_uri: str | None = None) -> None:
    """
    Setzt MLflow konsistent auf.
    - tracking_uri kann sein:
      * None           -> nutzt ./mlruns (wird angelegt)
      * lokaler Pfad   -> z.B. '/workspace/mlruns' (wird angelegt)
      * file-URI       -> z.B. 'file:///workspace/mlruns' (wird angelegt)
      * Remote/DB      -> z.B. 'http://...', 'https://...', 'sqlite:///mlflow.db' (kein Ordner nötig)
    """
    if tracking_uri is None:
        root = Path("../mlruns")
        root.mkdir(parents=True, exist_ok=True)
        mlflow.set_tracking_uri(root.resolve().as_uri())
    else:
        parsed = urlparse(tracking_uri)
        if parsed.scheme in ("", "file"):
            # Rohpfad oder file-URI -> lokalen Ordner anlegen
            root = Path(parsed.path if parsed.scheme == "file" else tracking_uri)
            root.mkdir(parents=True, exist_ok=True)
            mlflow.set_tracking_uri(root.resolve().as_uri())
        else:
            # http(s), sqlite, postgresql, ...
            mlflow.set_tracking_uri(tracking_uri)

    mlflow.set_experiment(experiment_name)
    print("MLflow tracking URI:", mlflow.get_tracking_uri())

#### Hyperparameter-Suche mit Optuna und MLflow-Logging

In [21]:
def progress_callback_totalcost(study: optuna.Study, trial: optuna.trial.FrozenTrial):
    """
    Konsolenfeedback pro Trial und Logging der bisherigen Best-Gesamtkosten.
    """
    try:
        print(f"[Trial {trial.number:03d}] state={trial.state.name} value={trial.value:.4f} best={study.best_value:.4f}")
        print("#"*20)
        mlflow.log_metric("best_total_cost_so_far", study.best_value, step=trial.number)

    except Exception:
        pass

In [22]:
# 2) Helper: baut params direkt aus GRID
def build_params_from_grid(trial, grid: dict) -> dict:
    """
    Liest alle Keys aus 'grid' und erzeugt passende trial.suggest_categorical()-Aufrufe.
    Einzelelement-Listen werden als Konstante gesetzt (keine Suggestion).
    """
    params = {}
    for name, choices in grid.items():
        if isinstance(choices, (list, tuple)) and len(choices) > 1:
            params[name] = trial.suggest_categorical(name, list(choices))
        elif isinstance(choices, (list, tuple)) and len(choices) == 1:
            params[name] = choices[0]
        else:
            # Falls jemand mal einen konstanten Wert statt Liste einträgt
            params[name] = choices
    return params


In [23]:
def hash_params(params: dict) -> str:
    return str(sorted(params.items()))

In [38]:
def rsf_objective_prunable_total_cost(
    trial: optuna.Trial,
    X_tr: pd.DataFrame, y_tr_surv,            # Train für Fit
    X_val: pd.DataFrame, y_val_class: pd.Series,  # Val für Entscheidung und echte Klasse
    grid: dict,
    TRIED_HASHES: set, 
) -> float:
    """
    Mehrstufiges Objective mit Successive Halving.
    Ressource = n_estimators. Auswahlkriterium = realisierte Gesamtkosten auf Val.
    """

    N = len(X_tr)

    min_leaf_fracs = [0.005, 0.01, 0.02]
    GRID = {
    "max_depth": [int(np.log2(N)), int(np.log2(N)) + 4],
    "max_features": ["sqrt", 0.4],
    "min_samples_leaf": [int(N * f) for f in min_leaf_fracs],
    "n_estimators":      [8, 16, 32, 64]
    }
    GRID["min_samples_split"] = [2 * v for v in GRID["min_samples_leaf"]]

    # Suchraum der Hyperparameter (ohne n_estimators, das ist unsere Stufen-Ressource)
    params = {
        "max_depth":        trial.suggest_categorical("max_depth", [int(np.log2(N)), int(np.log2(N)) + 4]),
        "max_features":     trial.suggest_categorical("max_features", ["sqrt", 0.4]),
        "min_samples_leaf": trial.suggest_categorical("min_samples_leaf",  [int(N * f) for f in min_leaf_fracs]),
    }
    params["min_samples_split"] = trial.suggest_categorical("min_samples_split", [2 * v for v in GRID["min_samples_leaf"]])
    


    # Hash berechnen
    params_hash = hash_params(params)

    # Trial überspringen, wenn schon getestet
    if params_hash in TRIED_HASHES:
        print(f"[SKIP] Trial {trial.number} übersprungen – bekannte Param-Kombi: {params}")
        raise optuna.exceptions.TrialPruned()

    TRIED_HASHES.add(params_hash)


    rung_trees = tuple(sorted(grid.get("n_estimators")))

    best_full_cost = np.inf
    best_n_trees   = None

    with mlflow.start_run(nested=True):
        mlflow.log_params(params)
        mlflow.set_tag("rungs", str(rung_trees))
        

        for step, n_trees in enumerate(rung_trees, start=1): 
            rsf = fit_rsf(
                X_tr,
                pd.Series(y_tr_surv["time"],  dtype=float),
                pd.Series(y_tr_surv["event"], dtype=bool),
                n_estimators=n_trees,
                **params
            )
            # Safety: Feature-Ausrichtung
            X_eval = X_val.loc[:, rsf.feature_names_in_] if hasattr(rsf, "feature_names_in_") else X_val

            # FULL-Entscheidung & realisierte Kosten
            pred_full, _, _ = decide_with_cost_from_rsf_at_taus(rsf, X_eval, TAUS, COST)
            avg_full, total_full = evaluate_decision_costs_from_true(
                true_class=y_val_class, pred_class=pred_full, cost=COST
            )

            print(f"[{time.strftime('%H:%M:%S')}] [rung {step}/{len(rung_trees)} FULL] "
                  f"n_eval={len(X_eval)}  total_cost={total_full:.2f}  avg_cost={avg_full:.4f}")

            # Logging
            mlflow.log_metric("val_total_cost_full", total_full, step=step)
            mlflow.log_metric("val_avg_cost_full",   avg_full,   step=step)
            mlflow.log_metric("val_n_eval_full",     len(X_eval), step=step)
            

            # Bestes FULL-Ergebnis über die Rungen merken
            if total_full < best_full_cost:
                best_full_cost = float(total_full)
                best_n_trees   = int(n_trees)
                best_full_params = dict(params)
                best_full_params["n_estimators"] = best_n_trees
                trial.set_user_attr("best_n_estimators", best_n_trees)
                trial.set_user_attr("full_params", best_full_params)
                mlflow.log_metric("best_so_far_total_cost_full", best_full_cost, step=step)
                mlflow.set_tag("best_so_far_n_estimators", best_n_trees)

            # Pruning basiert ebenfalls auf FULL (du wolltest nur FULL)
            if step < len(rung_trees):
                trial.report(total_full, step=step)
                if trial.should_prune():
                    raise optuna.TrialPruned()
    
        
        return float(best_full_cost)

In [37]:
def run_rsf_study_totalcost(
    X_train: pd.DataFrame, y_train_surv,
    X_val: pd.DataFrame, y_val_class: pd.Series,
    experiment_name: str = "RSF_HPO_TOTALCOST",
    tracking_uri: str | None = None
) -> optuna.Study:
    """
    Startet die Optuna-Studie mit Successive Halving und wählt Hyperparameter
    strikt nach minimalen REALISIERTEN Gesamtkosten auf dem Validationset.
    """

    TRIED_HASHES = set()

    setup_mlflow(experiment_name, tracking_uri)
    N = len(X_train)
    min_leaf_fracs = [0.005, 0.01, 0.02]
    GRID = {
    "max_depth": [int(np.log2(N)), int(np.log2(N)) + 4],
    "max_features": ["sqrt", 0.4],
    "min_samples_leaf": [int(N * f) for f in min_leaf_fracs],
    "n_estimators":      [8, 16, 32, 64]
    }
    GRID["min_samples_split"] = [2 * v for v in GRID["min_samples_leaf"]]
    

    n_combos = int(np.prod([len(v) for k, v in GRID.items() if k != "n_estimators"]))

    sampler = GridSampler(search_space=GRID)
    pruner = SuccessiveHalvingPruner(min_resource=1, reduction_factor=10)
    study = optuna.create_study(direction="minimize", sampler=sampler, pruner=pruner)

    with mlflow.start_run(run_name="rsf_hpo_total_cost_sh"):
        study.optimize(
            lambda t: rsf_objective_prunable_total_cost(t, X_train, y_train_surv, X_val, y_val_class, grid=GRID, TRIED_HASHES=TRIED_HASHES),
            n_trials=n_combos,
            show_progress_bar=True,
            callbacks=[progress_callback_totalcost],
        )
        mlflow.log_params(study.best_trial.user_attrs["full_params"])

    return study

#### Finales Training mit Bestparametern und Logging

In [275]:
def train_final_rsf_and_log(
    X_trval: pd.DataFrame, y_trval_surv: Surv, best_params: dict,
    experiment_name: str = "RSF_Cost_Tuning", model_name: str = "rsf_best.pkl"
):
    """
    Train the final RandomSurvivalForest model and log the results.

    Args:
        X_trval (pd.DataFrame): The training features for the final model.
        y_trval_surv (Surv): The training survival data for the final model.
        best_params (dict): The best hyperparameters from the Optuna study.
        experiment_name (str): The name of the MLflow experiment.
        model_name (str): The name of the model file to save.

    Returns:
        RandomSurvivalForest: The trained RandomSurvivalForest model.
    """
    setup_mlflow(experiment_name)
    with mlflow.start_run(run_name="rsf_final_train"):
        mlflow.log_params(best_params)
        rsf = fit_rsf(X_trval, pd.Series(y_trval_surv["time"]), pd.Series(y_trval_surv["event"]), **best_params)
        # Modell als Artefakt speichern
        out_path = Path("artifacts"); out_path.mkdir(exist_ok=True, parents=True)
        model_file = out_path / model_name
        with open(model_file, "wb") as f:
            pickle.dump(rsf, f)
        mlflow.log_artifact(str(model_file))
    return rsf

#### Anwendung

In [39]:
# y_train_surv bereits als structured array vorhanden
study = run_rsf_study_totalcost(
    X_train=X_train,
    y_train_surv=y_train_surv,
    X_val=X_val,
    y_val_class=validation_data["class_label"],   # deine vorhandenen Klassenlabels
    experiment_name="RSF_HPO_TOTALCOST"
)

best_params = study.best_trial.user_attrs["full_params"]
print("Beste Parameter nach realisierten Gesamtkosten:", best_params)
print("Beste Gesamtkosten:", study.best_value)


[I 2025-09-04 06:38:40,645] A new study created in memory with name: no-name-cb1ceb31-3bf1-4d57-9c4c-0488292800ed


MLflow tracking URI: file:///workspace/mlruns


  0%|          | 0/36 [00:00<?, ?it/s]

[06:38:41] → starte FIT mit Parametern: {'n_estimators': 8, 'max_depth': 13, 'max_features': 'sqrt', 'min_samples_leaf': 56, 'min_samples_split': 224}
[06:38:45] ✓ FIT fertig
[06:38:46] [rung 1/4 FULL] n_eval=5046  total_cost=52160.00  avg_cost=10.3369
[06:38:46] → starte FIT mit Parametern: {'n_estimators': 16, 'max_depth': 13, 'max_features': 'sqrt', 'min_samples_leaf': 56, 'min_samples_split': 224}
[06:38:51] ✓ FIT fertig
[06:38:53] [rung 2/4 FULL] n_eval=5046  total_cost=51818.00  avg_cost=10.2691
[06:38:53] → starte FIT mit Parametern: {'n_estimators': 32, 'max_depth': 13, 'max_features': 'sqrt', 'min_samples_leaf': 56, 'min_samples_split': 224}
[06:39:01] ✓ FIT fertig
[06:39:03] [rung 3/4 FULL] n_eval=5046  total_cost=50616.00  avg_cost=10.0309
[06:39:04] → starte FIT mit Parametern: {'n_estimators': 64, 'max_depth': 13, 'max_features': 'sqrt', 'min_samples_leaf': 56, 'min_samples_split': 224}
[06:39:21] ✓ FIT fertig
[06:39:23] [rung 4/4 FULL] n_eval=5046  total_cost=49299.00  av

In [40]:
save_df(pd.DataFrame([best_params]), ordner="05_model_input", name="rsf_best_params_totalcost")

### 2. XGBoosting mit AFT HPO

#### Daten vorbereiten

In [14]:
def prepare_rsf_model_input(df: pd.DataFrame, columns_to_drop) -> pd.DataFrame: 
    """ Prepares the input data for the XGBoost model with aft. 
    
    Args: 
    df (pd.DataFrame): The input dataframe containing features and target variables. 
    columns_to_drop (list): List of columns to drop from the dataframe. 

    Return:
    pd.DataFrame: The feature dataframe for XGBoost. 
    """

    y = {
        "lower_bound": df["duration"].astype(float),
        "upper_bound":  df["upper_bound"].astype(float),
    }

    x = df.drop(columns=columns_to_drop)

    d = xgb.DMatrix(data=x, label_lower_bound=y["lower_bound"],
                     label_upper_bound=y["upper_bound"])

    return d


In [15]:
dtrain = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_train_corr_labels"), columns_to_drop=["duration", "event", "vehicle_id", "class", "upper_bound"])
dval   = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_validation_corr_labels"), columns_to_drop=["duration", "event", "vehicle_id", "class_label", "upper_bound"])
labels_val = load_df(ordner="04_feature", name = "feature_validation_corr_labels")["class_label"]


In [None]:
os.makedirs("../data/05_model_input", exist_ok=True)

xdtrain.save_binary("../data/05_model_input/AFT/xdtrain.buffer")
xdval.save_binary("../data/05_model_input/AFT/xdval.buffer")

# Label separat als NumPy/Parquet speichern
label_val.to_frame("class_label").to_parquet("../data/05_model_input/AFT/label_val.parquet", index=False)

#### Loss + Params

In [16]:
def build_aft_params(trial: optuna.Trial) -> dict:
    return {
        "tree_method": "hist",
        "device": "cuda",
        "objective": "survival:aft",
        "aft_loss_distribution": "normal",
        "aft_loss_distribution_scale": trial.suggest_float("aft_loss_distribution_scale", 0.5, 2.0),
        "eta": trial.suggest_float("eta", 0.01, 0.3),
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "min_child_weight": trial.suggest_float("min_child_weight", 1, 50),
        "lambda": trial.suggest_float("lambda", 1e-3, 10.0, log=True),
        "alpha": trial.suggest_float("alpha", 1e-3, 10.0, log=True),
        "subsample": trial.suggest_float("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "booster": trial.suggest_categorical("booster", ["gbtree", "dart"]),
        "verbosity": 0
    }


#### Evaluation über erwartete Kosten

In [17]:
def predict_survival_prob(dval: xgb.DMatrix, booster: xgb.Booster, sigma: float) -> np.ndarray:
    mu = booster.predict(dval)  # (n_samples, 2): mean + std
    sigma = max(sigma, 1e-3)

    # Survivalfunktion S(t) für alle TAUS auswerten
    S = np.stack([
        1.0 - scipy.stats.norm.cdf(tau, loc=mu, scale=sigma)
        for tau in TAUS
    ], axis=1)

    return S  # shape: (n_samples, len(TAUS))


#### Optuna Objective

In [18]:
def aft_objective(trial: optuna.Trial,
                  y_val_class: pd.Series,
                  dtrain: xgb.DMatrix, dval: xgb.DMatrix
                  ) -> float:

    params = build_aft_params(trial)

    
    with mlflow.start_run(nested=True):
        mlflow.log_params(params)

        booster = xgb.train(
            params=params,
            dtrain=dtrain,
            evals=[(dtrain, "train"), (dval, "eval")],
            num_boost_round=trial.suggest_int("n_estimators", 50, 1000, step=50),
            early_stopping_rounds=50,
            verbose_eval=False
        )

        S_tau = predict_survival_prob(dval, booster, sigma=params["aft_loss_distribution_scale"])
        total_cost = total_expected_cost_at_surv(S_tau, TAUS, y_val_class, COST)
        mlflow.log_metric("val_total_expected_cost", total_cost)
        mlflow.log_metric("num_boost_round", booster.best_iteration)

        return total_cost


#### Cost Funktion

In [19]:
def total_expected_cost_at_surv(S_tau: np.ndarray, taus: np.ndarray,
                               true_classes: np.ndarray, cost: np.ndarray) -> float:
    total_cost = 0.0
    for i, s in enumerate(S_tau):
        p = class_probs_from_S_tau(s)
        exp_cost = cost.T @ p
        m_hat = int(np.argmin(exp_cost))
        total_cost += cost[int(true_classes[i]), m_hat]
    return total_cost


#### Anwendung

In [22]:
study = optuna.create_study(direction="minimize")


study.optimize(lambda trial: aft_objective(trial, y_val_class=validation_data["class_label"], dtrain=dtrain, dval=dval),
               n_trials=1000,
               callbacks=[progress_callback_totalcost],
               show_progress_bar=True)

# Logge das beste Ergebnis
best_trial = study.best_trial
best_params = best_trial.params
mlflow.log_params(best_trial.params)
mlflow.log_metric("best_val_total_expected_cost", best_trial.value)


[I 2025-09-08 07:40:24,122] A new study created in memory with name: no-name-4bb07b5b-f77b-4e8d-aad6-836aa43849a4


  0%|          | 0/1000 [00:00<?, ?it/s]

[W 2025-09-08 07:40:24,203] Trial 0 failed with parameters: {'aft_loss_distribution_scale': 1.938020287930645, 'eta': 0.08852303489892008, 'max_depth': 6, 'min_child_weight': 13.498496112311686, 'lambda': 3.5514991119621855, 'alpha': 0.001783294608300569, 'subsample': 0.8310880476691367, 'colsample_bytree': 0.5689057034183203, 'booster': 'dart'} because of the following error: MlflowException('Could not find experiment with ID 0').
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/optuna/study/_optimize.py", line 201, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/tmp/ipykernel_1282/691840537.py", line 4, in <lambda>
    study.optimize(lambda trial: aft_objective(trial, y_val_class=validation_data["class_label"], dtrain=dtrain, dval=dval),
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_1282/1070268166.py", line 

MlflowException: Could not find experiment with ID 0

In [119]:
best_params = best_trial.params

In [121]:
print("Beste Parameter nach realisierten Gesamtkosten:", best_params)

Beste Parameter nach realisierten Gesamtkosten: {'aft_loss_distribution': 'logistic', 'aft_loss_distribution_scale': 0.5, 'learning_rate': 0.28708886146175144, 'max_depth': 14, 'min_child_weight': 96, 'lambda': 0.00028905635749732183, 'alpha': 0.020669977889093343, 'subsample': 0.9859106682012534, 'colsample_bytree': 0.7390509265179644, 'n_estimators': 450}


In [120]:
save_df(pd.DataFrame([best_params]), ordner="05_model_input", name="atf_best_params_totalcost")


### 3. Modellerstellung RSF

#### Daten vorbereiten

In [None]:
X_train, y_train_surv = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_train_corr_labels").drop(columns=["upper_bound"]), columns_to_drop=["duration", "event", "vehicle_id", "class"], frag=1, class_column="class", sampling=True) 

X_val, y_val_surv = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_validation_corr_labels").drop(columns=["upper_bound"]), columns_to_drop=["duration", "event", "vehicle_id", "class_label"], frag=1.0, class_column="class_label", sampling=False) 

validation_data = load_df(ordner="04_feature", name = "feature_validation_corr_labels").drop(columns=["upper_bound"])

In [None]:

# Sicherstellen, dass der Ordner existiert
save_dir = "../Data/05_model_input/RSF"
os.makedirs(save_dir, exist_ok=True)

# X_train und X_val (DataFrames) speichern
X_train.to_parquet(os.path.join(save_dir, "X_train.parquet"), index=False)
X_val.to_parquet(os.path.join(save_dir, "X_val.parquet"), index=False)

# y_train_surv und y_val_surv sind Structured Arrays -> DataFrame
y_train_df = pd.DataFrame({
    "event": y_train_surv["event"].astype(int),
    "duration": y_train_surv["time"].astype(float)
})
y_val_df = pd.DataFrame({
    "event": y_val_surv["event"].astype(int),
    "duration": y_val_surv["time"].astype(float)
})

y_train_df.to_parquet(os.path.join(save_dir, "y_train_surv.parquet"), index=False)
y_val_df.to_parquet(os.path.join(save_dir, "y_val_surv.parquet"), index=False)

# Validation Data auch speichern
validation_data.to_parquet(os.path.join(save_dir, "validation_data.parquet"), index=False)



### 4. Modellerstellung XGBosst mit AFT

#### Daten vorbereiten

In [23]:
xdtrain = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_train_corr_labels"), columns_to_drop=["duration", "event", "vehicle_id", "class", "upper_bound"])
xdval   = prepare_rsf_model_input(load_df(ordner="04_feature", name = "feature_validation_corr_labels"), columns_to_drop=["duration", "event", "vehicle_id", "class_label", "upper_bound"])

#### Parameter vorbereiten

In [13]:
def prepare_params_df() -> tuple[dict, int]:
    """
    Liest die besten Parameter aus den HPOs und gibt sie als DataFrame zurück.

    return: pd.DataFrame mit den besten Parametern.
    """

    best_params = load_df(ordner="05_model_input", name="atf_best_params_totalcost").iloc[0].to_dict()

    params = {
        "tree_method": "hist",             # oder "gpu_hist" je nach Version
        "device": "cuda",
        "objective": "survival:aft",
        "aft_loss_distribution": best_params["aft_loss_distribution"],     # 'logistic'
        "aft_loss_distribution_scale": float(best_params["aft_loss_distribution_scale"]),
        "eta": float(best_params["learning_rate"]),
        "max_depth": int(best_params["max_depth"]),
        "min_child_weight": int(best_params["min_child_weight"]),
        "reg_lambda": float(best_params["lambda"]),
        "reg_alpha": float(best_params["alpha"]),
        "subsample": float(best_params["subsample"]),
        "colsample_bytree": float(best_params["colsample_bytree"]),
        # booster kannst du weglassen oder explizit auf "gbtree" setzen
        "verbosity": 1
    }

    num_boost_round = int(best_params["n_estimators"])

    return params, num_boost_round


In [None]:
def train_final_aft_and_log(
    xdtrain: xgb.DMatrix, xdval: xgb.DMatrix, experiment_name: str = "XGB_AFT_final"
):
    """
    Train the final XGBoost AFT model and log the results.

    Args:
        xdtrain (xgb.DMatrix): The training data for the final model.
        xdval (xgb.DMatrix): The validation data for early stopping.
        best_params (dict): The best hyperparameters from the Optuna study.
        num_boost_round (int): The number of boosting rounds.
        experiment_name (str): The name of the MLflow experiment.

    Returns:
        xgb.Booster: The trained XGBoost Booster model.
    """
    params, num_boost_round =prepare_params_df()
    setup_mlflow(experiment_name)
    early_stopping_rounds = 50  # Du kannst das anpassen oder entfernen, wenn nicht gewünscht
    with mlflow.start_run(run_name="XGB_AFT_final"):
        booster = xgb.train(
            params=params,
            dtrain=xdtrain,
            num_boost_round=num_boost_round,
            evals=[(xdval, "val")],              # für Early Stopping; entferne diese Zeile + early_stopping_rounds, falls unerwünscht
            early_stopping_rounds=early_stopping_rounds,
            verbose_eval=50
        )

    # In MLflow loggen (Artefakt)
    mlflow.xgboost.log_model(
        xgb_model=booster,
        artifact_path="model",
        registered_model_name="AFT_final_model"  # optional: Modellregistry
    )

    # Lokal im MLflow-Format speichern
    local_path = "data/06_models/AFT_final_model"
    mlflow.xgboost.save_model(xgb_model=booster, path=local_path)

Training fertig. Modell lokal gespeichert unter: data/06_models/XGB_AFT_final_model


In [28]:
train_final_aft_and_log(xdtrain, xdval)

2025/09/08 07:48:25 INFO mlflow.tracking.fluent: Experiment with name 'XGB_AFT_final' does not exist. Creating a new experiment.


MLflow tracking URI: file:///workspace/mlruns
[0]	val-aft-nloglik:0.67713
[50]	val-aft-nloglik:16.08798


Registered model 'AFT_final_model' already exists. Creating a new version of this model...
Created version '3' of model 'AFT_final_model'.
