"""
Edge-Oriented Bearing Fault Severity Classification

This script implements a window-based vibration signal processing
pipeline using group-aware machine learning to estimate bearing
fault severity from raw industrial bearing vibration data.

Key components:
- Window segmentation (fixed-length windows)
- Time-domain feature extraction
- Multi-class severity labeling
- Group-aware train/test splitting (prevents leakage)
- Group-based cross-validation
- Random Forest classifier with feature importance analysis

Designed for ML + Embedded Systems portfolio positioning.
"""

In [2]:
import os
import logging
from typing import Callable, List, Dict

import numpy as np
import pandas as pd
from scipy.io import loadmat
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GroupShuffleSplit, GroupKFold, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix

In [4]:
# ===============================
# CONFIGURATION
# ===============================

NORMAL_DIR = "Dataset/Normal"
FAULTY_DIR = "Dataset/Faulty"

WINDOW_SIZE = 2048
RANDOM_STATE = 42
N_ESTIMATORS = 300
MAX_DEPTH = 6

In [6]:
# ===============================
# LOGGING
# ===============================

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [8]:
# ===============================
# SEVERITY MAP
# ===============================

SEVERITY_MAP = {
    "normal": 0,
    "ir007": 1,   # Mild
    "ir014": 2,   # Moderate
    "ir021": 2,   # Moderate
    "ir028": 3,   # Severe
}



In [10]:
# ===============================
# FEATURE EXTRACTION
# ===============================

def extract_features(signal: np.ndarray) -> Dict[str, float]:
    """Extract time-domain features from a vibration window."""
    return {
        "mean": float(np.mean(signal)),
        "std": float(np.std(signal)),
        "rms": float(np.sqrt(np.mean(signal ** 2))),
        "peak_to_peak": float(np.max(signal) - np.min(signal)),
        "energy": float(np.sum(signal ** 2)),
    }

In [12]:
# ===============================
# LOAD DRIVE END SIGNAL
# ===============================

def load_drive_end_signal(mat_file_path: str) -> np.ndarray:
    """Load Drive-End accelerometer signal from a MATLAB file."""
    data = loadmat(mat_file_path)
    de_key = [key for key in data.keys() if key.endswith("_DE_time")][0]
    return data[de_key].flatten()

In [14]:
# ===============================
# WINDOWING
# ===============================

def split_into_windows(signal: np.ndarray, window_size: int) -> List[np.ndarray]:
    """
    Split signal into fixed-length non-overlapping windows.
    Only full windows are kept to maintain consistent feature dimensions.
    """
    windows = []
    for start in range(0, len(signal) - window_size, window_size):
        windows.append(signal[start:start + window_size])
    return windows

In [16]:
# ===============================
# SEVERITY LABELING
# ===============================

def get_severity_label(filename: str) -> int:
    """Map filename to severity label based on fault diameter."""
    name = filename.lower()
    for key, label in SEVERITY_MAP.items():
        if key in name:
            return label
    logger.warning(f"Skipping file with unknown severity: {filename}")
    return None

In [18]:
# ===============================
# DIRECTORY PROCESSING
# ===============================

def process_directory(
    directory: str,
    label_function: Callable[[str], int]
) -> List[Dict]:
    """Process all .mat files in a directory and return feature rows."""
    rows = []

    for filename in os.listdir(directory):
        if not filename.endswith(".mat"):
            continue

        label = label_function(filename)
        if label is None:
            continue

        file_path = os.path.join(directory, filename)
        signal = load_drive_end_signal(file_path)
        windows = split_into_windows(signal, WINDOW_SIZE)

        for w in windows:
            features = extract_features(w)
            features["label"] = label
            features["group"] = os.path.splitext(filename)[0]
            rows.append(features)

    return rows


def build_dataset(normal_dir: str, faulty_dir: str) -> pd.DataFrame:
    """Build complete dataset from normal and faulty directories."""
    dataset = []
    dataset += process_directory(normal_dir, lambda _: 0)
    dataset += process_directory(faulty_dir, get_severity_label)
    return pd.DataFrame(dataset)


In [24]:
# ===============================
# MAIN EXECUTION
# ===============================

if __name__ == "__main__":

    logger.info("Building dataset...")
    df = build_dataset(NORMAL_DIR, FAULTY_DIR)

    logger.info("Class distribution:\n%s", df["label"].value_counts())

    X = df.drop(columns=["label", "group"])
    y = df["label"]
    groups = df["group"]

    # ===============================
    # GROUP-AWARE SPLIT
    # ===============================

    gss = GroupShuffleSplit(
        n_splits=1,
        test_size=0.25,
        random_state=RANDOM_STATE,
    )

    train_idx, test_idx = next(gss.split(X, y, groups))

    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

    # ===============================
    # ESTIMATOR (UNFITTED)
    # ===============================

    estimator = RandomForestClassifier(
        n_estimators=N_ESTIMATORS,
        max_depth=MAX_DEPTH,
        class_weight="balanced",
        random_state=RANDOM_STATE,
    )

    # ===============================
    # GROUP-BASED CROSS-VALIDATION
    # ===============================

    logger.info("Performing group-based cross-validation...")
    gkf = GroupKFold(n_splits=3)

    cv_scores = cross_val_score(
        estimator,
        X,
        y,
        cv=gkf,
        groups=groups,
        scoring="f1_macro",
    )

    logger.info("CV F1 scores: %s", cv_scores)
    logger.info("Mean CV F1: %.4f", np.mean(cv_scores))

    # ===============================
    # FINAL MODEL TRAINING
    # ===============================

    logger.info("Training final model on training split...")
    model = estimator.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    print("\n=== Test Set Evaluation ===")
    print(classification_report(y_test, y_pred))
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, y_pred))

    # ===============================
    # FEATURE IMPORTANCE
    # ===============================

    importances = pd.Series(
        model.feature_importances_,
        index=X.columns
    ).sort_values(ascending=False)

    print("\nFeature Importance:")
    print(importances)

INFO:__main__:Building dataset...
INFO:__main__:Class distribution:
label
0    828
2    472
1    237
3    235
Name: count, dtype: int64
INFO:__main__:Performing group-based cross-validation...
INFO:__main__:CV F1 scores: [1.         1.         0.99578052]
INFO:__main__:Mean CV F1: 0.9986
INFO:__main__:Training final model on training split...



=== Test Set Evaluation ===
              precision    recall  f1-score   support

           0       1.00      1.00      1.00       236
           1       1.00      1.00      1.00       118
           2       1.00      1.00      1.00        59
           3       1.00      1.00      1.00        59

    accuracy                           1.00       472
   macro avg       1.00      1.00      1.00       472
weighted avg       1.00      1.00      1.00       472

Confusion Matrix:
[[236   0   0   0]
 [  0 118   0   0]
 [  0   0  59   0]
 [  0   0   0  59]]

Feature Importance:
energy          0.265434
rms             0.237707
std             0.233124
peak_to_peak    0.184449
mean            0.079286
dtype: float64
