# Model 1: Failure Risk



In [1]:
"""Failure risk modeling pipeline for Scania sensor data and MaintNet maintenance logs.

This module implements a reproducible, step-by-step pipeline for tabular modeling
that follows the specification in the project brief:

* Merge Scania sensor data with MaintNet work-order statistics per unit.
* Perform lag-based feature engineering and aggregate statistics.
* Address class imbalance via either SMOTE or cost-sensitive learning.
* Train a baseline logistic regression model and an ensemble gradient boosted model
  (LightGBM preferred, CatBoost as a fallback) with optional calibration.
* Provide utilities for model explainability with SHAP at both global and local levels.

The code is organized to make it easy to run each stage individually while maintaining
an end-to-end `run_training_pipeline` entry-point that wires everything together.
"""

from __future__ import annotations

import logging
import pathlib
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Tuple

import numpy as np
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.calibration import CalibratedClassifierCV
from sklearn.compose import ColumnTransformer
from sklearn.exceptions import NotFittedError
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder, StandardScaler

try:  # Optional dependency that may not be available in all environments.
    import lightgbm as lgb
except ModuleNotFoundError:  # pragma: no cover - optional dependency guard
    lgb = None  # type: ignore

try:  # Optional dependency that may not be available in all environments.
    from catboost import CatBoostClassifier
except ModuleNotFoundError:  # pragma: no cover - optional dependency guard
    CatBoostClassifier = None  # type: ignore

try:  # Optional dependency that may not be available in all environments.
    import shap
except ModuleNotFoundError:  # pragma: no cover - optional dependency guard
    shap = None  # type: ignore

try:
    from imblearn.over_sampling import SMOTE
except ModuleNotFoundError as exc:  # pragma: no cover - imbalanced-learn may be optional
    raise RuntimeError(
        "The failure risk pipeline requires the `imbalanced-learn` package."
    ) from exc

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)


@dataclass
class FailureRiskConfig:
    """Configuration for the failure risk modeling pipeline."""

    scania_path: pathlib.Path
    maintnet_path: pathlib.Path
    target_column: str = "failure_event"
    unit_id_column: str = "unit_id"
    timestamp_column: str = "timestamp"
    train_size: float = 0.7
    validation_size: float = 0.15
    random_state: int = 42
    use_smote: bool = True
    calibration_method: Optional[str] = "isotonic"
    text_column: Optional[str] = "technician_remarks"
    text_max_features: int = 100
    categorical_cols: List[str] = field(default_factory=list)
    numerical_cols: List[str] = field(default_factory=list)
    lag_days: Iterable[int] = field(default_factory=lambda: (7, 30, 90))


def _load_dataframe(path: pathlib.Path) -> pd.DataFrame:
    """Load a dataframe from CSV, Parquet, or Feather based on extension."""

    if not path.exists():
        raise FileNotFoundError(f"Data file not found: {path}")

    LOGGER.info("Loading data from %s", path)
    if path.suffix.lower() == ".csv":
        return pd.read_csv(path)
    if path.suffix.lower() in {".parquet", ".pq"}:
        return pd.read_parquet(path)
    if path.suffix.lower() in {".feather", ".ft"}:
        return pd.read_feather(path)

    raise ValueError(f"Unsupported file format: {path.suffix}")


def load_scania_data(config: FailureRiskConfig) -> pd.DataFrame:
    """Load Scania sensor data and ensure timestamps are parsed."""

    scania_df = _load_dataframe(config.scania_path)
    if config.timestamp_column in scania_df.columns:
        scania_df[config.timestamp_column] = pd.to_datetime(
            scania_df[config.timestamp_column], errors="coerce"
        )
    return scania_df


def load_maintnet_data(config: FailureRiskConfig) -> pd.DataFrame:
    """Load MaintNet work-order statistics for the vehicle subset."""

    maintnet_df = _load_dataframe(config.maintnet_path)
    if config.timestamp_column in maintnet_df.columns:
        maintnet_df[config.timestamp_column] = pd.to_datetime(
            maintnet_df[config.timestamp_column], errors="coerce"
        )
    return maintnet_df


def merge_datasets(
    scania_df: pd.DataFrame,
    maintnet_df: pd.DataFrame,
    config: FailureRiskConfig,
) -> pd.DataFrame:
    """Merge Scania sensor data with MaintNet work-order statistics by unit and time."""

    required_cols = {config.unit_id_column}
    missing_in_scania = required_cols - set(scania_df.columns)
    missing_in_maintnet = required_cols - set(maintnet_df.columns)
    if missing_in_scania:
        raise KeyError(f"Missing columns in Scania data: {missing_in_scania}")
    if missing_in_maintnet:
        raise KeyError(f"Missing columns in MaintNet data: {missing_in_maintnet}")

    LOGGER.info("Merging Scania and MaintNet data on %s", config.unit_id_column)
    merged = pd.merge(
        scania_df,
        maintnet_df,
        on=config.unit_id_column,
        how="left",
        suffixes=("_scania", "_maintnet"),
    )

    if config.timestamp_column in merged.columns:
        merged = merged.sort_values(by=[config.unit_id_column, config.timestamp_column])

    return merged


def _lag_feature_name(base: str, lag_days: int, agg: str) -> str:
    return f"{base}_lag_{lag_days}_days_{agg}"


def engineer_lag_features(
    df: pd.DataFrame,
    config: FailureRiskConfig,
    failure_count_col: str = "failure_count",
    pressure_col: str = "pressure_deviation",
) -> pd.DataFrame:
    """Engineer lag-based features such as rolling failure counts and mean deviation."""

    if config.timestamp_column not in df.columns:
        LOGGER.warning(
            "Timestamp column %s not found; lag features will be skipped.",
            config.timestamp_column,
        )
        return df

    df = df.copy()
    df[config.timestamp_column] = pd.to_datetime(df[config.timestamp_column], errors="coerce")

    for lag in config.lag_days:
        window = f"{lag}D"
        group = df.groupby(config.unit_id_column, group_keys=False)

        if failure_count_col in df.columns:
            df[_lag_feature_name(failure_count_col, lag, "sum")] = group[
                failure_count_col
            ].apply(lambda s: s.rolling(window=window, on=df[config.timestamp_column]).sum())

        if pressure_col in df.columns:
            df[_lag_feature_name(pressure_col, lag, "mean")] = group[pressure_col].apply(
                lambda s: s.rolling(window=window, on=df[config.timestamp_column]).mean()
            )

    return df


def engineer_static_features(
    df: pd.DataFrame, config: FailureRiskConfig
) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """Infer categorical and numerical columns when not provided."""

    df = df.copy()
    cat_cols = config.categorical_cols or df.select_dtypes(include=["object", "category"]).columns.tolist()
    num_cols = config.numerical_cols or df.select_dtypes(include=["number"]).columns.tolist()

    # Remove the target and identifier columns from feature lists
    for removable in (config.target_column, config.unit_id_column):
        if removable in cat_cols:
            cat_cols.remove(removable)
        if removable in num_cols:
            num_cols.remove(removable)

    # Optional textual embedding placeholder: we keep text columns separate to vectorize later
    if config.text_column and config.text_column in cat_cols:
        cat_cols.remove(config.text_column)

    return df, cat_cols, num_cols


def build_preprocessor(
    categorical_cols: Iterable[str],
    numerical_cols: Iterable[str],
    text_column: Optional[str],
    text_max_features: int,
) -> ColumnTransformer:
    """Create a preprocessing pipeline for categorical, numerical, and text columns."""

    transformers = []

    if numerical_cols:
        transformers.append(
            (
                "num",
                Pipeline(
                    steps=[
                        ("imputer", SimpleImputer(strategy="median")),
                        ("scaler", StandardScaler()),
                    ]
                ),
                list(numerical_cols),
            )
        )

    if categorical_cols:
        transformers.append(
            (
                "cat",
                Pipeline(
                    steps=[
                        ("imputer", SimpleImputer(strategy="most_frequent")),
                        (
                            "onehot",
                            OneHotEncoder(handle_unknown="ignore", sparse_output=False),
                        ),
                    ]
                ),
                list(categorical_cols),
            )
        )

    if text_column:
        from sklearn.feature_extraction.text import TfidfVectorizer

        def select_text_column(X: pd.DataFrame) -> pd.Series:
            return X[text_column].fillna("")

        transformers.append(
            (
                "text",
                Pipeline(
                    steps=[
                        (
                            "selector",
                            FunctionTransformer(select_text_column, validate=False),
                        ),
                        (
                            "tfidf",
                            TfidfVectorizer(
                                max_features=text_max_features,
                                ngram_range=(1, 2),
                                min_df=2,
                            ),
                        ),
                    ]
                ),
                [text_column],
            )
        )

    return ColumnTransformer(transformers=transformers, remainder="drop")


def _build_logistic_regression_pipeline(
    preprocessor: ColumnTransformer,
    class_weight: Optional[Dict[str, float]],
) -> Pipeline:
    return Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            (
                "classifier",
                LogisticRegression(
                    class_weight=class_weight,
                    max_iter=1000,
                    solver="lbfgs",
                ),
            ),
        ]
    )


def _build_lightgbm_pipeline(
    preprocessor: ColumnTransformer,
    random_state: int,
    class_weight: Optional[Dict[str, float]],
) -> Pipeline:
    if lgb is None:
        raise RuntimeError(
            "LightGBM is not installed. Please install `lightgbm` to use this model."
        )

    classifier = lgb.LGBMClassifier(
        n_estimators=500,
        learning_rate=0.05,
        subsample=0.9,
        colsample_bytree=0.8,
        random_state=random_state,
        class_weight=class_weight,
    )

    return Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            ("classifier", classifier),
        ]
    )


def _build_catboost_pipeline(
    preprocessor: ColumnTransformer,
    random_state: int,
    class_weight: Optional[Dict[str, float]],
) -> Pipeline:
    if CatBoostClassifier is None:
        raise RuntimeError(
            "CatBoost is not installed. Please install `catboost` to use this model."
        )

    classifier = CatBoostClassifier(
        iterations=800,
        learning_rate=0.03,
        depth=6,
        eval_metric="Logloss",
        verbose=False,
        random_seed=random_state,
        class_weights=class_weight,
    )

    return Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            ("classifier", classifier),
        ]
    )


def _compute_class_weight(y: pd.Series) -> Dict[int, float]:
    """Compute inverse frequency class weights."""

    value_counts = y.value_counts(normalize=True)
    return {cls: 1.0 / freq for cls, freq in value_counts.items()}


def resample_with_smote(
    X: pd.DataFrame, y: pd.Series, random_state: int
) -> Tuple[pd.DataFrame, pd.Series]:
    """Apply SMOTE oversampling to handle class imbalance."""

    LOGGER.info("Applying SMOTE oversampling")
    smote = SMOTE(random_state=random_state)
    X_resampled, y_resampled = smote.fit_resample(X, y)
    return X_resampled, y_resampled


def calibrate_model(
    model: Pipeline,
    method: str,
    X_train: pd.DataFrame,
    y_train: pd.Series,
) -> CalibratedClassifierCV:
    """Calibrate probability outputs using Platt scaling or isotonic regression."""

    LOGGER.info("Calibrating model with %s calibration", method)
    calibrator = CalibratedClassifierCV(
        base_estimator=model,
        method=method,
        cv=3,
    )
    calibrator.fit(X_train, y_train)
    return calibrator


def train_models(
    df: pd.DataFrame,
    config: FailureRiskConfig,
) -> Dict[str, ClassifierMixin]:
    """Train baseline and boosted models according to the pipeline specification."""

    if config.target_column not in df.columns:
        raise KeyError(f"Target column `{config.target_column}` not found in data frame")

    X = df.drop(columns=[config.target_column])
    y = df[config.target_column].astype(int)

    LOGGER.info("Performing train/validation split with train size %.2f", config.train_size)
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, train_size=config.train_size, random_state=config.random_state, stratify=y
    )

    validation_ratio = config.validation_size / (1 - config.train_size)
    X_valid, X_test, y_valid, y_test = train_test_split(
        X_temp,
        y_temp,
        test_size=1 - validation_ratio,
        random_state=config.random_state,
        stratify=y_temp,
    )

    df_train = X_train.assign(**{config.target_column: y_train})

    df_train, categorical_cols, numerical_cols = engineer_static_features(df_train, config)
    preprocessor = build_preprocessor(
        categorical_cols=categorical_cols,
        numerical_cols=numerical_cols,
        text_column=config.text_column,
        text_max_features=config.text_max_features,
    )

    class_weight = None
    if not config.use_smote:
        class_weight = _compute_class_weight(y_train)
        LOGGER.info("Using cost-sensitive class weights: %s", class_weight)

    logistic_pipeline = _build_logistic_regression_pipeline(preprocessor, class_weight)

    training_features = df_train.drop(columns=[config.target_column])
    training_target = df_train[config.target_column]

    if config.use_smote:
        training_features, training_target = resample_with_smote(
            training_features, training_target, config.random_state
        )

    LOGGER.info("Training logistic regression baseline model")
    logistic_pipeline.fit(training_features, training_target)

    models: Dict[str, ClassifierMixin] = {"logistic_regression": logistic_pipeline}

    try:
        LOGGER.info("Training LightGBM model")
        lightgbm_pipeline = _build_lightgbm_pipeline(
            preprocessor=preprocessor,
            random_state=config.random_state,
            class_weight=None if config.use_smote else class_weight,
        )
        lightgbm_pipeline.fit(training_features, training_target)
        models["lightgbm"] = lightgbm_pipeline
    except RuntimeError as err:
        LOGGER.warning("LightGBM training skipped: %s", err)

    if "lightgbm" not in models and CatBoostClassifier is not None:
        LOGGER.info("Training CatBoost model as fallback")
        catboost_pipeline = _build_catboost_pipeline(
            preprocessor=preprocessor,
            random_state=config.random_state,
            class_weight=None if config.use_smote else class_weight,
        )
        catboost_pipeline.fit(training_features, training_target)
        models["catboost"] = catboost_pipeline

    # Optionally calibrate probability outputs
    if config.calibration_method:
        for name, model in list(models.items()):
            LOGGER.info("Calibrating %s model", name)
            models[f"{name}_calibrated"] = calibrate_model(
                model=model,
                method=config.calibration_method,
                X_train=X_valid,
                y_train=y_valid,
            )

    LOGGER.info("Training complete. Evaluating models on the holdout set.")
    for name, model in models.items():
        try:
            y_pred = model.predict(X_test)
            LOGGER.info("Model %s performance:\n%s", name, classification_report(y_test, y_pred))
        except NotFittedError:
            LOGGER.warning("Model %s is not fitted; skipping evaluation.", name)

    return models


def explain_global(model: ClassifierMixin, X: pd.DataFrame) -> Optional[object]:
    """Compute global SHAP values for the provided model."""

    if shap is None:
        LOGGER.warning("SHAP is not installed; skipping explainability.")
        return None

    LOGGER.info("Computing SHAP global explanations")

    # Extract the underlying estimator if the model is wrapped in a calibrator or pipeline
    estimator = model
    if isinstance(model, CalibratedClassifierCV):
        estimator = model.base_estimator  # type: ignore[attr-defined]

    if isinstance(estimator, Pipeline):
        preprocessor = estimator.named_steps["preprocessor"]
        classifier = estimator.named_steps["classifier"]
        transformed_data = preprocessor.transform(X)
    else:
        classifier = estimator
        transformed_data = X

    explainer = shap.Explainer(classifier, transformed_data)
    shap_values = explainer(transformed_data)
    shap.summary_plot(shap_values, transformed_data, show=False)
    return shap_values


def explain_local(
    model: ClassifierMixin,
    X: pd.DataFrame,
    instance_indices: Iterable[int],
) -> List[object]:
    """Generate SHAP force plots for specific instances to aid maintenance decisions."""

    if shap is None:
        LOGGER.warning("SHAP is not installed; skipping explainability.")
        return []

    LOGGER.info("Computing SHAP local explanations for instances: %s", list(instance_indices))

    estimator = model
    if isinstance(model, CalibratedClassifierCV):
        estimator = model.base_estimator  # type: ignore[attr-defined]

    if isinstance(estimator, Pipeline):
        preprocessor = estimator.named_steps["preprocessor"]
        classifier = estimator.named_steps["classifier"]
        transformed_data = preprocessor.transform(X)
    else:
        classifier = estimator
        transformed_data = X

    explainer = shap.Explainer(classifier, transformed_data)
    shap_values = explainer(transformed_data)

    force_plots = []
    for idx in instance_indices:
        force_plot = shap.force_plot(
            base_value=shap_values.base_values[idx],
            shap_values=shap_values.values[idx],
            features=transformed_data[idx],
            matplotlib=True,
            show=False,
        )
        force_plots.append(force_plot)

    return force_plots


def run_training_pipeline(config: FailureRiskConfig) -> Dict[str, ClassifierMixin]:
    """High-level orchestration for the failure risk pipeline."""

    LOGGER.info("Starting failure risk training pipeline")
    scania_df = load_scania_data(config)
    maintnet_df = load_maintnet_data(config)
    merged_df = merge_datasets(scania_df, maintnet_df, config)
    enriched_df = engineer_lag_features(merged_df, config)
    trained_models = train_models(enriched_df, config)
    LOGGER.info("Pipeline completed successfully")
    return trained_models


__all__ = [
    "FailureRiskConfig",
    "run_training_pipeline",
    "engineer_lag_features",
    "train_models",
    "explain_global",
    "explain_local",
]