<a href="https://colab.research.google.com/github/Menon-Vineet/Books/blob/main/IMS_Bearing_Anomaly_Colab_Seamless.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# NASA IMS Bearing Anomaly Detection — Colab-Seamless Notebook

**Author:** Vineet Menon (el jefe)  
**Date:** 2026-01-20

This notebook is designed to run **seamlessly on Google Colab (Python 3.12)**.

What I do here:
- Download and parse the **NASA IMS Bearings** dataset (run-to-failure vibration snapshots)
- Convert each 1-second vibration file into **compact statistical features**
- Train and compare **classical unsupervised anomaly models** (scikit-learn)
- Train a **BiLSTM Autoencoder** (PyTorch) to produce reconstruction-error anomaly scores

Why no PyCaret?
- PyCaret’s Python 3.12 support is still inconsistent across environments (especially Colab).
- I implement the same core models directly in scikit-learn so the notebook remains reliable and shareable.


## 0. Install dependencies (run once)

**Colab tip:** After running the install cell, use **Runtime → Disconnect and delete runtime**, reconnect, then run the notebook from the top.
This avoids “half-loaded” binary packages.


In [None]:
%%capture
!pip install -U pip setuptools wheel
!pip install --no-cache-dir numpy==1.26.4 pandas==2.1.4
!pip install --no-cache-dir scipy==1.11.4 scikit-learn==1.4.2
!pip install --no-cache-dir matplotlib tqdm joblib
!pip install --no-cache-dir torch --index-url https://download.pytorch.org/whl/cpu


## 1. Imports and configuration

In [None]:
from __future__ import annotations

import zipfile
import shutil
import urllib.request
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope

import torch
from torch import nn

SEED: int = 1234
np.random.seed(SEED)
torch.manual_seed(SEED)

PROJECT_DIR = Path.cwd() / "ims_anomaly_project"
DATA_DIR = PROJECT_DIR / "data"
PLOTS_DIR = PROJECT_DIR / "plots"
ARTIFACTS_DIR = PROJECT_DIR / "artifacts"

for d in (DATA_DIR, PLOTS_DIR, ARTIFACTS_DIR):
    d.mkdir(parents=True, exist_ok=True)

print("Project directory:", PROJECT_DIR)


## 2. Dataset utilities (download, parse, featurize)

In [None]:
NASA_BEARINGS_ZIP_URL: str = "https://phm-datasets.s3.amazonaws.com/NASA/4.+Bearings.zip"

def download_file(url: str, dst: Path, overwrite: bool = False) -> Path:
    \"\"\"Download a file to disk.\"\"\"
    dst.parent.mkdir(parents=True, exist_ok=True)
    if dst.exists() and not overwrite:
        return dst
    print(f"I am downloading: {url}")
    urllib.request.urlretrieve(url, dst)
    return dst

def unzip(zip_path: Path, extract_to: Path, overwrite: bool = False) -> Path:
    \"\"\"Extract a ZIP archive.\"\"\"
    if extract_to.exists() and overwrite:
        shutil.rmtree(extract_to)
    extract_to.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as zf:
        zf.extractall(extract_to)
    return extract_to

def parse_ims_filename_to_datetime(filename: str) -> datetime:
    \"\"\"Parse IMS timestamps from filenames like '2003.10.22.12.06.24'.\"\"\"
    stem = Path(filename).stem
    return datetime.strptime(stem, "%Y.%m.%d.%H.%M.%S")

def find_test_folder(root: Path, test_name: str) -> Path:
    \"\"\"Find test folder by name (e.g., '1st_test', '2nd_test', '3rd_test').\"\"\"
    matches = list(root.rglob(test_name))
    if not matches:
        raise FileNotFoundError(f"I could not find {test_name} under {root}")
    return matches[0]

def load_ims_raw_signals(
    test_folder: Path,
    channel_col: int = 0,
    max_files: Optional[int] = 800,
) -> pd.DataFrame:
    \"\"\"Load raw vibration snapshots into a DataFrame.\"\"\"
    files = sorted([p for p in test_folder.iterdir() if p.is_file()])
    if max_files is not None:
        files = files[:max_files]

    idx: List[pd.Timestamp] = []
    sigs: List[np.ndarray] = []

    for fp in tqdm(files, desc=f"Loading raw signals (col={channel_col})"):
        ts = parse_ims_filename_to_datetime(fp.name)
        arr = pd.read_csv(fp, header=None, delim_whitespace=True).iloc[:, channel_col].to_numpy(dtype=float)
        idx.append(pd.Timestamp(ts))
        sigs.append(arr)

    df = pd.DataFrame({"signal": sigs}, index=pd.to_datetime(idx)).sort_index()
    df.index.name = "timestamp"
    return df

def rms(x: np.ndarray) -> float:
    return float(np.sqrt(np.mean(np.square(x), dtype=float)))

def peak_to_peak(x: np.ndarray) -> float:
    return float(np.max(x) - np.min(x))

def featurize_snapshot(x: np.ndarray) -> Dict[str, float]:
    \"\"\"Convert one raw snapshot into compact statistical features.\"\"\"
    return {
        "rms": rms(x),
        "mean": float(np.mean(x)),
        "std": float(np.std(x)),
        "p2p": peak_to_peak(x),
    }

def featurize_timeseries(raw_df: pd.DataFrame) -> pd.DataFrame:
    feats = [featurize_snapshot(sig) for sig in tqdm(raw_df["signal"].to_list(), desc="Featurizing")]
    return pd.DataFrame(feats, index=raw_df.index)

def plot_series(df: pd.DataFrame, columns: List[str], title: str, save_name: Optional[str] = None) -> None:
    plt.figure(figsize=(12, 4))
    for c in columns:
        plt.plot(df.index, df[c], label=c)
    plt.title(title)
    plt.xlabel("timestamp")
    plt.legend()
    plt.tight_layout()
    if save_name:
        out = PLOTS_DIR / f"{save_name}.png"
        plt.savefig(out, dpi=150)
        print("Saved:", out)
    plt.show()


## 3. Download + load data

In [None]:
zip_path = DATA_DIR / "NASA_4_Bearings.zip"
extract_root = DATA_DIR / "NASA_4_Bearings"

download_file(NASA_BEARINGS_ZIP_URL, zip_path, overwrite=False)
unzip(zip_path, extract_root, overwrite=False)

TEST_NAME = "2nd_test"
CHANNEL_COL = 0
MAX_FILES = 800

test_folder = find_test_folder(extract_root, TEST_NAME)
print("Using test folder:", test_folder)

raw_df = load_ims_raw_signals(test_folder, channel_col=CHANNEL_COL, max_files=MAX_FILES)
feat_df = featurize_timeseries(raw_df)

print("Features shape:", feat_df.shape)
feat_df.head()


## 4. Quick sanity plot

In [None]:
plot_series(feat_df, ["rms"], title=f"{TEST_NAME} - RMS over time", save_name="rms_over_time")

## 5. Time-aware train/test split

In [None]:
TRAIN_FRACTION = 0.6

n = len(feat_df)
cut = int(n * TRAIN_FRACTION)

train_df = feat_df.iloc[:cut].copy()
test_df = feat_df.iloc[cut:].copy()

print("Train rows:", len(train_df), "| Test rows:", len(test_df))


## 6. Classical anomaly detection (scikit-learn)

In [None]:
def fit_and_score_sklearn(
    model_name: str,
    train_df: pd.DataFrame,
    test_df: pd.DataFrame,
    score_percentile: float = 95.0,
) -> pd.DataFrame:
    """Train an unsupervised model on train_df, then score test_df."""
    Xtr = train_df.to_numpy()
    Xte = test_df.to_numpy()

    if model_name == "iforest":
        model = IsolationForest(n_estimators=400, random_state=SEED, contamination="auto")
        model.fit(Xtr)
        scores = -model.score_samples(Xte)

    elif model_name == "lof":
        model = LocalOutlierFactor(n_neighbors=35, novelty=True)
        model.fit(Xtr)
        scores = -model.score_samples(Xte)

    elif model_name == "ocsvm":
        model = OneClassSVM(kernel="rbf", nu=0.05, gamma="scale")
        model.fit(Xtr)
        scores = -model.score_samples(Xte)

    elif model_name == "mcd":
        model = EllipticEnvelope(contamination=0.05, random_state=SEED)
        model.fit(Xtr)
        scores = -model.score_samples(Xte)

    else:
        raise ValueError(f"Unknown model_name: {model_name}")

    thresh = float(np.percentile(scores, score_percentile))
    flags = (scores >= thresh).astype(int)

    out = test_df.copy()
    out["Anomaly_Score"] = scores
    out["Anomaly"] = flags
    return out

MODELS = ["iforest", "lof", "ocsvm", "mcd"]
results = {m: fit_and_score_sklearn(m, train_df, test_df) for m in MODELS}

MODEL_TO_PLOT = "iforest"
scored = results[MODEL_TO_PLOT].join(test_df[["rms"]], how="left")

plot_series(scored, ["rms", "Anomaly_Score"], title=f"{MODEL_TO_PLOT} - RMS vs Anomaly_Score", save_name=f"{MODEL_TO_PLOT}_score_vs_rms")
print("Anomaly rate:", scored["Anomaly"].mean() * 100, "%")
scored.head()


## 7. BiLSTM Autoencoder (PyTorch)

In [None]:
@dataclass(frozen=True)
class WindowConfig:
    window: int = 32
    stride: int = 1

def make_windows(x: np.ndarray, cfg: WindowConfig) -> np.ndarray:
    """Create overlapping windows from (T, F) into (N, window, F)."""
    T = x.shape[0]
    if T < cfg.window:
        raise ValueError("Not enough rows to build windows.")
    return np.stack([x[i:i+cfg.window] for i in range(0, T - cfg.window + 1, cfg.stride)], axis=0)

class BiLSTMAutoencoder(nn.Module):
    """A compact BiLSTM autoencoder for sequence reconstruction."""

    def __init__(self, n_features: int, hidden: int = 32) -> None:
        super().__init__()
        self.encoder = nn.LSTM(n_features, hidden, batch_first=True, bidirectional=True)
        self.decoder = nn.LSTM(2 * hidden, hidden, batch_first=True, bidirectional=True)
        self.proj = nn.Linear(2 * hidden, n_features)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        z, _ = self.encoder(x)
        y, _ = self.decoder(z)
        return self.proj(y)

def train_autoencoder(model: nn.Module, x_train: np.ndarray, epochs: int = 8, lr: float = 1e-3, batch_size: int = 64) -> List[float]:
    device = torch.device("cpu")
    model.to(device).train()
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.SmoothL1Loss()

    ds = torch.utils.data.TensorDataset(torch.tensor(x_train, dtype=torch.float32))
    dl = torch.utils.data.DataLoader(ds, batch_size=batch_size, shuffle=True)

    losses: List[float] = []
    for ep in range(1, epochs + 1):
        batch_losses = []
        for (xb,) in dl:
            xb = xb.to(device)
            opt.zero_grad()
            recon = model(xb)
            loss = loss_fn(recon, xb)
            loss.backward()
            opt.step()
            batch_losses.append(float(loss.detach().cpu().item()))
        mean_loss = float(np.mean(batch_losses))
        losses.append(mean_loss)
        print(f"Epoch {ep:02d}/{epochs} | loss={mean_loss:.6f}")
    return losses

def reconstruction_scores(model: nn.Module, x: np.ndarray, batch_size: int = 128) -> np.ndarray:
    device = torch.device("cpu")
    model.to(device).eval()
    ds = torch.utils.data.TensorDataset(torch.tensor(x, dtype=torch.float32))
    dl = torch.utils.data.DataLoader(ds, batch_size=batch_size, shuffle=False)

    scores: List[float] = []
    loss_fn = nn.SmoothL1Loss(reduction="none")

    with torch.no_grad():
        for (xb,) in dl:
            xb = xb.to(device)
            recon = model(xb)
            per_elem = loss_fn(recon, xb)
            per_win = per_elem.mean(dim=(1, 2)).detach().cpu().numpy()
            scores.extend(per_win.tolist())
    return np.array(scores, dtype=float)


### 7.1 Train + score the BiLSTM AE

In [None]:
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_df)
test_scaled = scaler.transform(test_df)

cfg = WindowConfig(window=32, stride=1)

train_w = make_windows(train_scaled, cfg)
test_w = make_windows(test_scaled, cfg)

print("Train windows:", train_w.shape, "| Test windows:", test_w.shape)

ae = BiLSTMAutoencoder(n_features=train_w.shape[-1], hidden=32)
losses = train_autoencoder(ae, train_w, epochs=8, lr=1e-3, batch_size=64)

plt.figure(figsize=(8, 3))
plt.plot(losses)
plt.title("BiLSTM AE training loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.tight_layout()
plt.show()

train_scores = reconstruction_scores(ae, train_w)
test_scores = reconstruction_scores(ae, test_w)

THRESH_PCTL = 90
thr = float(np.percentile(train_scores, THRESH_PCTL))
flags = (test_scores >= thr).astype(int)
print(f"Threshold (p{THRESH_PCTL}) = {thr:.6f} | flagged={int(flags.sum())}/{len(flags)}")


### 7.2 Align window scores to timestamps and plot

In [None]:
aligned_index = test_df.index[cfg.window - 1:]
ae_df = pd.DataFrame({"Reconstruction_Score": test_scores, "Anomaly": flags}, index=aligned_index)

plot_df = ae_df.join(test_df[["rms"]].iloc[cfg.window - 1:], how="left")

plot_series(plot_df, ["rms", "Reconstruction_Score"], title="BiLSTM AE - RMS vs Reconstruction Score", save_name="bilstm_recon_vs_rms")
print("Sample anomalies:")
ae_df[ae_df["Anomaly"] == 1].head(10)


## 8. Notes and next steps

- Add more channels and richer features (spectral bands, envelope, crest factor)
- Evaluate early-warning quality (lead time before failure)
- Persist models/scaler for reuse (`joblib.dump`, `torch.save`)

This notebook stays intentionally lightweight so it runs reliably in Colab.
