# Bearing Fault – Training & Config Builder

This notebook builds a **production-ready bearing fault config** for a new machine and logs it to **MLflow**.  
It computes features across your dataset, derives **per-feature normalization**, and emits a single YAML used by real-time serving.

---

##  Approach (BDF = Bearing Degradation Factor)
- Use **dynamic harmonic bins**: discover available `ch1_k/ch2_k/ch3_k` columns and compute `rms_k` only where all 3 exist.
- Compute **stateless** features from each spectrum row:
  - `spectral_kurtosis_rms_0` (shape)
  - `spectral_crest_factor_rms_1` (peakedness vs RMS)
  - `total_harmonic_distortion_rms_2` (from a centered 3-bin triplet: `[lower, fundamental, upper]`)
  - `sideband_energy_ratio_rms_3` (sidebands / fundamental)
  - `sideband_modulation_index_rms_4` ((lower+upper)/fundamental)
- Compute **BPFO/BPFI band energies** from bearing geometry; use mean of nearest neighbor bins.
- **Normalize** each feature with **robust min–max** (percentile-clipped) **derived from the dataset**, not fit on live data.
- Fuse normalized features with **weights** → **BDF** in ~[0,1]; flag fault if `BDF ≥ bdf_threshold`.

---

##  What this notebook generates
1. **`bearing_config.yaml`** (saved at the **root** of the MLflow run artifacts), containing:
   - Bearing geometry: `shaft_rpm, ball_diameter, pitch_diameter, num_rolling_elements, contact_angle_deg`
   - Inference knobs: `freq_resolution`, `bdf_threshold`
   - `weights` for each feature
   - `normalization.minmax` for every feature (`min`, `max` from percentile-clipped stats)
   - `feature_list` and small `notes`
2. **MLflow run** under experiment: `bearing_fault_monitoring/<tenant_id>/<machine_id>`
   - Params: IDs, dataset path, approach, freq_resolution, threshold
   - Metrics: BDF distribution summary, BPFO/BPFI (Hz), guessed fundamental bin
   - Artifact: the YAML config above

---

##  What you need to set (Cell 1)
- `tenant_id`, `machine_id`
- `APPROACH` (e.g., `bdf_stateless_v1`)
- `DATASET_PATH` (CSV with `ch1_k/ch2_k/ch3_k` columns)
- `bearing_config` (geometry incl. **shaft RPM**)
- `freq_resolution` (Hz per bin; must match how harmonics were computed)
- `bdf_threshold` and `weights`
- Percentile bounds for robust normalization (e.g., 1–99%)

---

## How to onboard a new equipment
1. **Update Cell 1** with machine IDs, dataset path, geometry, and knobs.
2. **Run all cells**:
   - The notebook auto-discovers available frequency bins and computes features.
   - It derives robust `min`/`max` per feature from the dataset.
   - It writes `bearing_config.yaml` and logs it to MLflow (artifact root).
3. **Review BDF preview stats** (Cell “BDF distribution”) and adjust `bdf_threshold` if needed.
4. **Deploy** by pointing serving to the run’s `bearing_config.yaml`.

---

## Serving expectations (to keep training & inference aligned)
- Use the **same dynamic-bin logic** and **same `freq_resolution`**.
- Compute the **fundamental bin** from `shaft_rpm / 60 / freq_resolution` and take the **nearest available bin**.
- Build the 3-bin **triplet** `[lower, fundamental, upper]` from available bins (repeat edges as needed).
- Normalize each feature using the **saved `normalization.minmax`** (do **not** fit a scaler in real-time).
- Apply `weights` → BDF; compare to `bdf_threshold`.

---

##  Sanity checks
- Columns must follow `ch1_k/ch2_k/ch3_k` naming; missing bins are OK (handled dynamically).
- `freq_resolution` must match your spectral construction; if off, BPFO/BPFI alignment will drift.
- If **RPM varies** at runtime, pass the current RPM into serving; this notebook assumes the training dataset reflects nominal RPM for normalization.

> Result: a **versioned, reproducible config** per machine that my API can load directly for consistent, explainable bearing fault detection.


## Metrics — What they mean & how to use them

- **rows_processed**  
  *What it is:* Number of dataset rows used to derive normalization stats.  
  *Use it for:* A sanity check on data sufficiency. Set a gate (e.g., **≥ 1,000 rows**) before considering a run for promotion. Not a ranking metric.

- **bdf_p50, bdf_p75, bdf_p90**  
  *What they are:* Median and upper-tail quantiles of the Bearing Degradation Factor (BDF) distribution over the dataset.  
  *Use it for:* Understanding spread and where the “healthy” tail sits. For promotion, prefer a **single tail metric** (e.g., `bdf_p95`, lower is better) to minimize false alarms.

- **bdf_mean, bdf_std**  
  *What they are:* Mean and standard deviation of the BDF.  
  *Use it for:* Quick central tendency/spread sanity check. Not ideal for skewed distributions; **not recommended** as promotion criteria.

- **bpfo_hz, bpfi_hz**  
  *What they are:* Ball Pass Frequency Outer/Inner race (Hz), computed from bearing geometry and shaft speed. Deterministic given the config.  
  *Use it for:* Audit that geometry/config are loaded correctly. **Not useful** for comparing runs within the same machine (unchanged across runs).

- **fundamental_bin_guess**  
  *What it is:* The estimated spectral bin for the shaft rotation frequency, computed as  
  \[
  \text{round}\left(\frac{f_r}{\text{freq\_resolution}}\right),\quad f_r = \frac{\text{RPM}}{60}
  \]  
  *Use it for:* Audit of frequency alignment (catches a wrong `freq_resolution` or RPM). **Not for promotion**.

> **Recommendation for promotion (unsupervised):** Track a **single tail metric** like `bdf_p95` (lower is better) and optionally an **alarm_rate** at your serving threshold (lower is better), with `rows_processed` as a minimum data gate.


In [15]:
# --- Cell 1: Setup ---
import os, yaml, math, warnings
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from typing import Dict, List

import mlflow
from mlflow_base import MLflowBase

warnings.filterwarnings("ignore")

# ====== Fill these for the new machine ======
tenant_id   = "28"
machine_id  = "257"
APPROACH    = "bdf_stateless_v1"        # run name
DATASET_PATH = "iotts.harmonics_257.csv"  # path to your harmonics dataset

# Bearing geometry (example)
bearing_config = {
    "shaft_rpm": 1480,
    "ball_diameter": 8.0,
    "pitch_diameter": 40.0,
    "num_rolling_elements": 8,
    "contact_angle_deg": 0.0
}

# Inference knobs to embed in config
freq_resolution = 50    # Hz per rms_k bin (e.g., rms_0 ~ 0 Hz, rms_1 ~ 50 Hz)
bdf_threshold   = 0.70  # default decision threshold

# Feature weights (must sum to ~1.0)
weights = {
    "spectral_kurtosis_rms_0":         0.20,
    "spectral_crest_factor_rms_1":     0.10,
    "total_harmonic_distortion_rms_2": 0.30,
    "sideband_energy_ratio_rms_3":     0.10,
    "sideband_modulation_index_rms_4": 0.10,
    "bpfo_band_energy":                0.10,   
    "bpfi_band_energy":                0.10,   
}

# Robust normalization percentile bounds (avoid outliers)
LOW_PCT, HIGH_PCT = 1.0, 99.0

# Spectrum size (rms_0 ... rms_29)
N_RMS_BINS = 30

# Compose MLflow experiment name
EXPERIMENT_NAME = f"bearing_fault_monitoring/{tenant_id}/{machine_id}"


In [16]:
# --- Cell 2: Start MLflow run ---
# Close any existing run (notebook safety)
if mlflow.active_run() is not None:
    mlflow.end_run()

mlbase = MLflowBase(EXPERIMENT_NAME)
run = mlbase.start_run(run_name=APPROACH)
run_id = mlflow.active_run().info.run_id
print("MLflow run:", run_id)

# Log core identifiers and knobs
mlbase.log_params({
    "tenant_id": tenant_id,
    "machine_id": machine_id,
    "approach": APPREOACH if (APPREOACH:=APPROACH) else APPROACH,  # keep param key stable
    "dataset_path": os.path.basename(DATASET_PATH),
    "freq_resolution": freq_resolution,
    "bdf_threshold": bdf_threshold,
})


🏃 View run bdf_stateless_v1 at: https://mlops.zolnoi.app/#/experiments/18/runs/7d9fcf02a1e047058f0300d40af9505d
🧪 View experiment at: https://mlops.zolnoi.app/#/experiments/18
MLflow run: cfa4f75906b24760ad6b6b313da62ffa


In [17]:
# --- Cell 3: Dynamic feature helpers for variable frequency bins ---
import re, math
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple

CH_PAT = re.compile(r"^(ch[123])_(\d+)$")

def discover_available_bins(df: pd.DataFrame) -> List[int]:
    """Return sorted list of bin indices k that exist for ALL channels ch1_k, ch2_k, ch3_k."""
    ch_bins = {f"ch{i}": set() for i in [1,2,3]}
    for col in df.columns:
        m = CH_PAT.match(col)
        if not m:
            continue
        ch, k = m.group(1), int(m.group(2))
        ch_bins[ch].add(k)
    common = sorted(list(ch_bins["ch1"] & ch_bins["ch2"] & ch_bins["ch3"]))
    if not common:
        raise ValueError("No common frequency bins found across ch1/ch2/ch3.")
    return common

def calculate_rms_dynamic(df: pd.DataFrame, bins: List[int]) -> Tuple[pd.DataFrame, List[str]]:
    """Compute RMS across 3 channels for the provided bin indices."""
    rms_cols = []
    for k in bins:
        chs = [f"ch1_{k}", f"ch2_{k}", f"ch3_{k}"]
        missing = [c for c in chs if c not in df.columns]
        if missing:
            # Skip bins that somehow don't have all 3 present
            continue
        out_col = f"rms_{k}"
        df[out_col] = np.sqrt(df[chs].pow(2).sum(axis=1) / 3.0)
        rms_cols.append(out_col)
    if not rms_cols:
        raise ValueError("No RMS columns created — check your input data.")
    return df, rms_cols

def nearest_bin(target_k: int, bins: List[int]) -> int:
    """Pick the available bin index closest to target_k."""
    # bins is sorted
    idx = np.searchsorted(bins, target_k)
    candidates = []
    if idx < len(bins):
        candidates.append(bins[idx])
    if idx > 0:
        candidates.append(bins[idx-1])
    # choose nearest by absolute distance
    return min(candidates, key=lambda x: abs(x - target_k))

def neighbor_triplet(center_k: int, bins: List[int]) -> List[int]:
    """Return [lower, center, upper] bin indices from available bins (repeat edges if needed)."""
    # ensure center is in bins (snap to nearest)
    c = nearest_bin(center_k, bins) if center_k not in bins else center_k
    pos = bins.index(c)
    if pos == 0:
        return [bins[0], bins[0], bins[min(1, len(bins)-1)]]
    if pos == len(bins) - 1:
        return [bins[max(len(bins)-2, 0)], bins[-1], bins[-1]]
    return [bins[pos-1], bins[pos], bins[pos+1]]

def band_energy_around_dynamic(df_rms: pd.DataFrame, center_freq_hz: float,
                               freq_res: float, bins: List[int]) -> pd.Series:
    """Mean of available neighbor triplet around nearest bin to center_freq_hz."""
    target_k = int(round(center_freq_hz / freq_res))
    trip = neighbor_triplet(target_k, bins)
    cols = [f"rms_{k}" for k in trip if f"rms_{k}" in df_rms.columns]
    return df_rms[cols].mean(axis=1)

# -------- Stateless spectral & sideband features (unchanged logic) --------
def spectral_kurtosis(full_spectrum: np.ndarray) -> float:
    return float(stats.kurtosis(full_spectrum, fisher=True, bias=False, nan_policy="omit"))

def spectral_crest_factor(full_spectrum: np.ndarray) -> float:
    full_spectrum = np.asarray(full_spectrum, dtype=float)
    peak = np.nanmax(np.abs(full_spectrum))
    rms  = np.sqrt(np.nanmean(full_spectrum**2))
    return float(peak / rms) if rms > 0 else np.nan

def thd_centered_triplet(triplet_vals: np.ndarray) -> float:
    """triplet = [lower, fundamental, upper] values"""
    if triplet_vals is None or len(triplet_vals) != 3:
        return np.nan
    fundamental = float(triplet_vals[1])
    side = math.sqrt(float(triplet_vals[0])**2 + float(triplet_vals[2])**2)
    return float(side / fundamental) if fundamental != 0 else np.nan

def sideband_energy_ratio(triplet_vals: np.ndarray) -> float:
    if triplet_vals is None or len(triplet_vals) != 3:
        return np.nan
    c = float(triplet_vals[1])
    s = math.sqrt(float(triplet_vals[0])**2 + float(triplet_vals[2])**2)
    return float(s / c) if c != 0 else np.nan

def sideband_modulation_index(triplet_vals: np.ndarray) -> float:
    if triplet_vals is None or len(triplet_vals) != 3:
        return np.nan
    c = float(triplet_vals[1])
    v = float(triplet_vals[0]) + float(triplet_vals[2])
    return float(v / c) if c != 0 else np.nan

def bpfo_bpfi_from_geometry(cfg: Dict) -> Tuple[float, float]:
    d = cfg["ball_diameter"]
    D = cfg["pitch_diameter"]
    n = cfg["num_rolling_elements"]
    contact_angle = np.deg2rad(cfg["contact_angle_deg"])
    fr = cfg["shaft_rpm"] / 60.0
    bpfo = fr * n / 2.0 * (1.0 - (d / D) * np.cos(contact_angle))
    bpfi = fr * n / 2.0 * (1.0 + (d / D) * np.cos(contact_angle))
    return float(bpfo), float(bpfi)


In [18]:
# --- Cell 4 : Load data and compute features across dataset (dynamic bins) ---
# 1) Load dataset
df = pd.read_csv(DATASET_PATH)
print("Dataset shape:", df.shape)

# 2) Discover available bins and compute RMS
available_bins = discover_available_bins(df)           # e.g., [0,1,2,3,5,6,...]
print(f"Available bins (count={len(available_bins)}):", available_bins[:10], "...")

df, rms_cols = calculate_rms_dynamic(df, available_bins)
print(f"RMS columns created: {len(rms_cols)}")

# 3) Compute BPFO/BPFI (Hz)
bpfo_hz, bpfi_hz = bpfo_bpfi_from_geometry(bearing_config)
print(f"BPFO ~ {bpfo_hz:.2f} Hz, BPFI ~ {bpfi_hz:.2f} Hz")

# 4) Fundamental bin guess from shaft RPM, snapped to nearest available bin
fr_hz = bearing_config["shaft_rpm"] / 60.0
target_bin = int(round(fr_hz / freq_resolution))
fundamental_bin = nearest_bin(target_bin, available_bins)
print(f"Estimated fundamental_bin: target={target_bin}, using={fundamental_bin}")

# 5) Compute band energies around BPFO & BPFI using dynamic bins
df["bpfo_band_energy"] = band_energy_around_dynamic(df[rms_cols], bpfo_hz, freq_resolution, available_bins)
df["bpfi_band_energy"] = band_energy_around_dynamic(df[rms_cols], bpfi_hz, freq_resolution, available_bins)

# 6) Build per-row stateless features
feat_cols = [
    "spectral_kurtosis_rms_0",
    "spectral_crest_factor_rms_1",
    "total_harmonic_distortion_rms_2",
    "sideband_energy_ratio_rms_3",
    "sideband_modulation_index_rms_4",
]

for col in feat_cols:
    df[col] = np.nan

# We will read the spectrum in the **available bins order** for stability:
ordered_rms_cols = [f"rms_{k}" for k in available_bins]
rms_arr = df[ordered_rms_cols].values  # shape (N, len(available_bins))

# Build the triplet indices once
trip_bins = neighbor_triplet(fundamental_bin, available_bins)
trip_cols = [f"rms_{k}" for k in trip_bins]

for i in tqdm(range(len(df)), total=len(df), desc="Computing stateless features"):
    # Get full spectrum in available-bin order
    row_spec = rms_arr[i, :]  # aligns with ordered_rms_cols
    # Triplet values for sideband/THD features
    trip_vals = df.loc[df.index[i], trip_cols].values.astype(float)

    df.at[i, "spectral_kurtosis_rms_0"]         = spectral_kurtosis(row_spec)
    df.at[i, "spectral_crest_factor_rms_1"]     = spectral_crest_factor(row_spec)
    df.at[i, "total_harmonic_distortion_rms_2"] = thd_centered_triplet(trip_vals)
    df.at[i, "sideband_energy_ratio_rms_3"]     = sideband_energy_ratio(trip_vals)
    df.at[i, "sideband_modulation_index_rms_4"] = sideband_modulation_index(trip_vals)

print("Features ready.")


Dataset shape: (14491, 94)
Available bins (count=15): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ...
RMS columns created: 15
BPFO ~ 78.93 Hz, BPFI ~ 118.40 Hz
Estimated fundamental_bin: target=0, using=0


Computing stateless features:   0%|          | 0/14491 [00:00<?, ?it/s]

Features ready.


In [19]:
# --- Cell 5: Build robust min-max normalization (pct-clip) ---
def robust_minmax(series: pd.Series, low=LOW_PCT, high=HIGH_PCT) -> Dict[str, float]:
    """Return robust min/max using percentiles; fallback to true min/max if needed."""
    s = pd.to_numeric(series, errors="coerce").dropna()
    if len(s) == 0:
        return {"min": 0.0, "max": 1.0}
    lo = float(np.percentile(s, low))
    hi = float(np.percentile(s, high))
    if hi <= lo:  # degenerate
        lo = float(np.nanmin(s))
        hi = float(np.nanmax(s))
        if hi <= lo:
            # fully degenerate: single-valued series
            return {"min": float(lo), "max": float(lo + 1e-9)}
    return {"min": lo, "max": hi}

all_feature_keys = list(weights.keys())
norm_minmax = {}
for key in all_feature_keys:
    if key not in df.columns:
        print(f"[WARN] Feature missing in dataframe: {key}")
        norm_minmax[key] = {"min": 0.0, "max": 1.0}
        continue
    norm_minmax[key] = robust_minmax(df[key], low=LOW_PCT, high=HIGH_PCT)

# Quick sanity print
pd.DataFrame(norm_minmax).T


Unnamed: 0,min,max
spectral_kurtosis_rms_0,14.722719,14.990559
spectral_crest_factor_rms_1,3.855954,3.872324
total_harmonic_distortion_rms_2,1.0,1.000009
sideband_energy_ratio_rms_3,1.0,1.000009
sideband_modulation_index_rms_4,1.0,1.004351
bpfo_band_energy,0.0,0.534112
bpfi_band_energy,0.0,0.534112


In [20]:
# --- Cell 6 (optional): Check BDF distribution with normalized features ---
def normalize_value_minmax(x: float, stats: Dict[str, float]) -> float:
    vmin, vmax = stats.get("min"), stats.get("max")
    if vmax is None or vmin is None or vmax == vmin:
        return 0.0
    return float((x - vmin) / (vmax - vmin))

def compute_bdf_row(row: pd.Series, weights: Dict[str, float], norm_cfg: Dict[str, Dict[str, float]]) -> float:
    total = 0.0
    wsum  = 0.0
    for feat, w in weights.items():
        if feat in row.index:
            x = float(row[feat])
            xnorm = normalize_value_minmax(x, norm_cfg.get(feat, {"min":0.0, "max":1.0}))
            total += xnorm * w
            wsum  += w
    return float(total / wsum) if wsum > 0 else 0.0

bdf_preview = df[all_feature_keys].apply(lambda r: compute_bdf_row(r, weights, norm_minmax), axis=1)
print("BDF preview stats:")
print(bdf_preview.describe(percentiles=[0.5, 0.75, 0.9, 0.95]))

# You can adjust bdf_threshold after inspecting this distribution if needed.


BDF preview stats:
count    5129.000000
mean        0.497670
std         0.484589
min         0.157118
50%         0.488031
75%         0.578475
90%         0.629341
95%         0.662067
max        31.950813
dtype: float64


In [21]:
# --- Cell 7: Build YAML config and log as artifact (root) ---
final_config = {
    # Machine-specific geometry & runtime knobs
    "shaft_rpm":            float(bearing_config["shaft_rpm"]),
    "ball_diameter":        float(bearing_config["ball_diameter"]),
    "pitch_diameter":       float(bearing_config["pitch_diameter"]),
    "num_rolling_elements": int(bearing_config["num_rolling_elements"]),
    "contact_angle_deg":    float(bearing_config["contact_angle_deg"]),

    # Inference knobs
    "freq_resolution": float(freq_resolution),
    "bdf_threshold":   float(bdf_threshold),

    # Model fusion weights (match serving exactly)
    "weights": {k: float(v) for k, v in weights.items()},

    # Normalization stats for each feature (min-max)
    "normalization": {
        "minmax": {k: {"min": float(v["min"]), "max": float(v["max"])} for k, v in norm_minmax.items()}
    }
}

# Save locally at artifacts root
os.makedirs("artifacts", exist_ok=True)
yaml_path = os.path.join("artifacts", "bearing_config.yaml")
with open(yaml_path, "w", encoding="utf-8") as f:
    yaml.safe_dump(final_config, f, sort_keys=False, default_flow_style=False)
print(f"Saved YAML -> {yaml_path}")

# Log to MLflow (artifact root)
mlbase.log_artifact(run_id=run_id, local_path=yaml_path)
print("Logged YAML to MLflow (artifact root).")


Saved YAML -> artifacts\bearing_config.yaml
Logged YAML to MLflow (artifact root).


In [22]:
# --- Cell 8: Log summary metrics & end run ---
metrics = {
    "rows_processed":        float(len(df)),
    "bdf_p50":               float(np.nanpercentile(bdf_preview, 50)),
    "bdf_p75":               float(np.nanpercentile(bdf_preview, 75)),
    "bdf_p90":               float(np.nanpercentile(bdf_preview, 90)),
    "bdf_mean":              float(np.nanmean(bdf_preview)),
    "bdf_std":               float(np.nanstd(bdf_preview)),
    "bpfo_hz":               float(bpfo_hz),
    "bpfi_hz":               float(bpfi_hz),
    "fundamental_bin_guess": float(fundamental_bin),
}
mlbase.log_metrics(metrics)

print("Done. You can now use this run’s bearing_config.yaml in serving.")


Done. You can now use this run’s bearing_config.yaml in serving.
