# LLM-Assisted Fault Detection & Diagnosis (FDD) Demo

**Goal:** Train a time-series classifier for vehicle sensor fault types (0â€“9) and generate clear technical explanations using a small open-source LLM.

This notebook is designed to run **top-to-bottom** in Google Colab.


## 1) Setup
Install required libraries.


In [None]:
!pip -q install numpy pandas scikit-learn torch torchvision torchaudio plotly gradio==4.44.0 transformers==4.44.2 accelerate bitsandbytes sentencepiece

## 2) Imports & Config


In [None]:
import os
import json
import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Dict, List, Tuple

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

import plotly.graph_objects as go
import gradio as gr

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
RNG = np.random.default_rng(42)

DATA_PATH = "/content"  # upload CSV here in Colab
CSV_FILE = None  # set to filename after upload (e.g., "vehicle_faults.csv")

FEATURE_COLS = [
    "ENGINE_RPM",
    "Trq_MeanEff_Engine_Mod[Nm]",
    "p_InMan[Pa]",
    "p_Rail[bar]",
    "v_vehicle[km|h]",
]
LABEL_COL = "type"


## 3) Load Data
Place your CSV in `/content/` and set `CSV_FILE` accordingly.


In [None]:
# If running in Colab, upload the file via the left sidebar or: from google.colab import files; files.upload()

if CSV_FILE is None:
    candidates = [f for f in os.listdir(DATA_PATH) if f.endswith('.csv')]
    if not candidates:
        raise FileNotFoundError("No CSV found in /content/. Upload your dataset and set CSV_FILE.")
    CSV_FILE = candidates[0]

csv_path = os.path.join(DATA_PATH, CSV_FILE)
print(f"Using CSV: {csv_path}")

df = pd.read_csv(csv_path)

missing = [c for c in FEATURE_COLS + [LABEL_COL] if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns: {missing}")

# Basic cleaning
df = df.dropna(subset=FEATURE_COLS + [LABEL_COL]).reset_index(drop=True)

print(df.head())
print(df[LABEL_COL].value_counts().sort_index())


## 4) Windowing + Normalization (Training Stats Only)


In [None]:
@dataclass
class WindowConfig:
    window_size: int = 128
    stride: int = 64

win_cfg = WindowConfig()

# Split by rows first (stratified), then window within each split
train_df, test_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    stratify=df[LABEL_COL],
)

# Compute normalization stats ONLY on training data
train_stats = {
    "mean": train_df[FEATURE_COLS].mean().to_dict(),
    "std": (train_df[FEATURE_COLS].std() + 1e-8).to_dict(),
}

# Save stats for later
os.makedirs("artifacts", exist_ok=True)
with open("artifacts/normalization_stats.json", "w") as f:
    json.dump(train_stats, f, indent=2)


def normalize(df_in: pd.DataFrame, stats: Dict) -> pd.DataFrame:
    out = df_in.copy()
    for c in FEATURE_COLS:
        out[c] = (out[c] - stats["mean"][c]) / stats["std"][c]
    return out


def window_data(df_in: pd.DataFrame, win_cfg: WindowConfig) -> Tuple[np.ndarray, np.ndarray]:
    data = df_in[FEATURE_COLS].values
    labels = df_in[LABEL_COL].values.astype(int)
    X_list, y_list = [], []
    for start in range(0, len(df_in) - win_cfg.window_size + 1, win_cfg.stride):
        end = start + win_cfg.window_size
        X_list.append(data[start:end])
        # majority label in window
        y_list.append(int(np.bincount(labels[start:end]).argmax()))
    return np.array(X_list, dtype=np.float32), np.array(y_list, dtype=np.int64)

train_df_norm = normalize(train_df, train_stats)
test_df_norm = normalize(test_df, train_stats)

X_train, y_train = window_data(train_df_norm, win_cfg)
X_test, y_test = window_data(test_df_norm, win_cfg)

print("Train windows:", X_train.shape, "Test windows:", X_test.shape)


## 5) Dataset, Model, and Class Imbalance Handling


In [None]:
class WindowDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


class CNNGRU(nn.Module):
    def __init__(self, num_features: int, num_classes: int = 10):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(num_features, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.MaxPool1d(2),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.MaxPool1d(2),
        )
        self.gru = nn.GRU(128, 128, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, num_classes),
        )

    def forward(self, x):
        # x: [B, T, F] -> [B, F, T]
        x = x.transpose(1, 2)
        x = self.conv(x)
        # [B, C, T] -> [B, T, C]
        x = x.transpose(1, 2)
        out, _ = self.gru(x)
        # use last timestep
        return self.fc(out[:, -1, :])


train_ds = WindowDataset(X_train, y_train)
test_ds = WindowDataset(X_test, y_test)

# Handle class imbalance: class weights + weighted sampling
class_counts = np.bincount(y_train, minlength=10)
class_weights = 1.0 / np.maximum(class_counts, 1)
class_weights = class_weights / class_weights.sum() * len(class_weights)

weights_per_sample = class_weights[y_train]
sampler = WeightedRandomSampler(weights_per_sample, num_samples=len(weights_per_sample), replacement=True)

train_loader = DataLoader(train_ds, batch_size=64, sampler=sampler)
test_loader = DataLoader(test_ds, batch_size=64, shuffle=False)

model = CNNGRU(num_features=len(FEATURE_COLS)).to(DEVICE)
criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float32).to(DEVICE))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)


## 6) Train


In [None]:
def train_epoch(model, loader):
    model.train()
    total_loss = 0.0
    for Xb, yb in loader:
        Xb = Xb.to(DEVICE)
        yb = yb.to(DEVICE)
        optimizer.zero_grad()
        logits = model(Xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * Xb.size(0)
    return total_loss / len(loader.dataset)


EPOCHS = 5
for epoch in range(1, EPOCHS + 1):
    loss = train_epoch(model, train_loader)
    print(f"Epoch {epoch}/{EPOCHS} - loss: {loss:.4f}")


## 7) Evaluate


In [None]:
def predict(model, loader):
    model.eval()
    preds = []
    labels = []
    with torch.no_grad():
        for Xb, yb in loader:
            Xb = Xb.to(DEVICE)
            logits = model(Xb)
            pred = torch.argmax(logits, dim=1).cpu().numpy()
            preds.append(pred)
            labels.append(yb.numpy())
    return np.concatenate(preds), np.concatenate(labels)


preds, labels = predict(model, test_loader)
print("Accuracy:", accuracy_score(labels, preds))
print(classification_report(labels, preds, digits=3))

cm = confusion_matrix(labels, preds)
print("Confusion matrix:\n", cm)


## 8) Save Model & Artifacts


In [None]:
os.makedirs("artifacts", exist_ok=True)

# Save model
torch.save(model.state_dict(), "artifacts/fdd_cnn_gru.pt")

# Save window config
with open("artifacts/window_config.json", "w") as f:
    json.dump({"window_size": win_cfg.window_size, "stride": win_cfg.stride}, f, indent=2)

# Save feature list
with open("artifacts/feature_cols.json", "w") as f:
    json.dump(FEATURE_COLS, f, indent=2)

print("Saved artifacts to /content/artifacts")


## 9) Explainability: Knowledge Base + Anomaly Summary


In [None]:
FAULT_KB = {
    0: {"name": "healthy", "definition": "Signals are within expected ranges with normal variability and dynamics."},
    1: {"name": "gain", "definition": "Sensor gain error causing proportional scaling of readings."},
    2: {"name": "offset", "definition": "Sensor bias causing constant shift in readings."},
    3: {"name": "noise", "definition": "High-frequency random fluctuations dominate the signal."},
    4: {"name": "stuck-at", "definition": "Signal remains nearly constant regardless of system changes."},
    5: {"name": "drift", "definition": "Slow, continuous deviation over time from expected baseline."},
    6: {"name": "hard-over", "definition": "Signal saturates at a fixed high/low limit."},
    7: {"name": "spike", "definition": "Brief sharp excursions from normal values."},
    8: {"name": "delay", "definition": "Sensor response lags behind related signals."},
    9: {"name": "packet loss", "definition": "Intermittent missing or repeated values due to transmission loss."},
}


def _linear_slope(y):
    x = np.arange(len(y))
    A = np.vstack([x, np.ones(len(y))]).T
    m, _ = np.linalg.lstsq(A, y, rcond=None)[0]
    return float(m)


def compute_anomaly_summary(window: np.ndarray) -> Dict[str, Dict[str, float]]:
    # window shape: [T, F]
    summary = {}
    for i, col in enumerate(FEATURE_COLS):
        series = window[:, i]
        diffs = np.diff(series)
        spikes = int(np.sum(np.abs(series - series.mean()) > 3 * series.std()))
        flatline = float(np.std(series) < 0.01)
        summary[col] = {
            "mean": float(series.mean()),
            "std": float(series.std()),
            "slope": _linear_slope(series),
            "delta_mean": float(diffs.mean()) if len(diffs) else 0.0,
            "delta_std": float(diffs.std()) if len(diffs) else 0.0,
            "spike_count": spikes,
            "flatline_flag": flatline,
        }
    # crude delay estimate between engine RPM and vehicle speed
    rpm = window[:, FEATURE_COLS.index("ENGINE_RPM")]
    spd = window[:, FEATURE_COLS.index("v_vehicle[km|h]")]
    corr = np.correlate(rpm - rpm.mean(), spd - spd.mean(), mode='full')
    lag = int(np.argmax(corr) - (len(rpm) - 1))
    summary["delay_estimate"] = {"rpm_vs_speed_lag": lag}
    return summary


def fallback_explanation(pred_class: int, confidence: float, summary: Dict[str, Dict[str, float]]) -> str:
    fault = FAULT_KB[pred_class]
    bullet_points = []
    for k, v in summary.items():
        if k == "delay_estimate":
            bullet_points.append(f"Estimated lag (RPM vs speed): {v['rpm_vs_speed_lag']} samples")
            continue
        bullet_points.append(
            f"{k}: mean={v['mean']:.2f}, std={v['std']:.2f}, slope={v['slope']:.4f}, spikes={v['spike_count']}"
        )
    bullets = "\n".join([f"- {b}" for b in bullet_points[:6]])
    reason = (
        f"Predicted class {pred_class} ({fault['name']}) with confidence {confidence:.2f}. "
        f"This aligns with the definition: {fault['definition']}"
    )
    checks = "Recommended checks: verify sensor wiring, compare against redundant signals, and inspect ECU logs."
    return f"Evidence:\n{bullets}\n\nReason: {reason}\n\n{checks}"


## 10) LLM Explainability (Open-Source, No Paid API)
We use a small instruction-tuned model that can run in Colab. The prompt includes **only**: predicted class, confidence, anomaly summary, and fault definition.


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

LLM_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

llm_tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL)
llm_model = AutoModelForCausalLM.from_pretrained(
    LLM_MODEL,
    device_map="auto",
    load_in_4bit=True,
)


def build_llm_prompt(pred_class: int, confidence: float, summary: Dict[str, Dict[str, float]]) -> str:
    fault = FAULT_KB[pred_class]
    payload = {
        "predicted_class": pred_class,
        "confidence": round(float(confidence), 4),
        "anomaly_summary": summary,
        "fault_definition": fault,
    }
    return (
        "You are a vehicle diagnostics assistant. Use ONLY the provided JSON. "
        "Produce:\n"
        "- 3 to 6 evidence bullet points (signal-based)\n"
        "- 1 short reason paragraph\n"
        "- recommended next checks\n"
        "\nJSON:\n" + json.dumps(payload, indent=2)
    )


def generate_llm_explanation(pred_class: int, confidence: float, summary: Dict[str, Dict[str, float]]) -> str:
    prompt = build_llm_prompt(pred_class, confidence, summary)
    inputs = llm_tokenizer(prompt, return_tensors="pt").to(llm_model.device)
    outputs = llm_model.generate(
        **inputs,
        max_new_tokens=300,
        do_sample=True,
        temperature=0.4,
    )
    text = llm_tokenizer.decode(outputs[0], skip_special_tokens=True)
    # Strip prompt for readability
    return text.split("JSON:")[-1].strip()


## 11) Inference Helpers


In [None]:
def predict_window(window: np.ndarray) -> Tuple[int, float]:
    model.eval()
    with torch.no_grad():
        x = torch.tensor(window, dtype=torch.float32).unsqueeze(0).to(DEVICE)
        logits = model(x)
        probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
    pred_class = int(np.argmax(probs))
    confidence = float(np.max(probs))
    return pred_class, confidence


def run_explain(window: np.ndarray, use_llm: bool = True) -> str:
    pred_class, confidence = predict_window(window)
    summary = compute_anomaly_summary(window)
    if use_llm:
        try:
            return generate_llm_explanation(pred_class, confidence, summary)
        except Exception as exc:
            print("LLM failed, using fallback:", exc)
    return fallback_explanation(pred_class, confidence, summary)


## 12) Gradio Demo UI
Features:
- Upload CSV and select a window
- Or paste manual CSV rows
- Plotly signal visualization
- Predicted fault + confidence
- Explanation with LLM or fallback


In [None]:
def build_plot(window: np.ndarray) -> go.Figure:
    fig = go.Figure()
    x = np.arange(window.shape[0])
    for i, col in enumerate(FEATURE_COLS):
        fig.add_trace(go.Scatter(x=x, y=window[:, i], mode='lines', name=col))
    fig.update_layout(title="Sensor Window", template="plotly_white", height=350)
    return fig


# Cache CSV windows for UI
UI_CACHE = {}


def load_csv_windows(file_path: str) -> Tuple[np.ndarray, np.ndarray]:
    df_local = pd.read_csv(file_path).dropna(subset=FEATURE_COLS + [LABEL_COL])
    df_local_norm = normalize(df_local, train_stats)
    return window_data(df_local_norm, win_cfg)


def ui_predict_from_csv(file_obj, window_index: int, use_llm: bool):
    if file_obj is None:
        return None, "Please upload a CSV file.", ""
    key = file_obj.name
    if key not in UI_CACHE:
        Xw, yw = load_csv_windows(key)
        UI_CACHE[key] = (Xw, yw)
    Xw, _ = UI_CACHE[key]
    if len(Xw) == 0:
        return None, "No windows generated from file.", ""
    idx = int(np.clip(window_index, 0, len(Xw) - 1))
    window = Xw[idx]
    pred_class, confidence = predict_window(window)
    explanation = run_explain(window, use_llm=use_llm)
    fig = build_plot(window)
    return fig, f"Predicted: {pred_class} ({FAULT_KB[pred_class]['name']}) | confidence={confidence:.2f}", explanation


def parse_manual_csv(text: str) -> np.ndarray:
    from io import StringIO
    df_manual = pd.read_csv(StringIO(text))
    missing = [c for c in FEATURE_COLS if c not in df_manual.columns]
    if missing:
        raise ValueError(f"Missing columns in manual CSV: {missing}")
    df_manual = df_manual[FEATURE_COLS]
    if len(df_manual) < win_cfg.window_size:
        raise ValueError(f"Need at least {win_cfg.window_size} rows for one window")
    df_manual = normalize(df_manual, train_stats)
    window = df_manual.values[: win_cfg.window_size].astype(np.float32)
    return window


def ui_predict_manual(text: str, use_llm: bool):
    try:
        window = parse_manual_csv(text)
    except Exception as exc:
        return None, f"Error: {exc}", ""
    pred_class, confidence = predict_window(window)
    explanation = run_explain(window, use_llm=use_llm)
    fig = build_plot(window)
    return fig, f"Predicted: {pred_class} ({FAULT_KB[pred_class]['name']}) | confidence={confidence:.2f}", explanation


with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("# FDD Demo: LLM-Assisted Fault Explanation")
    gr.Markdown("Upload a CSV or paste rows to get fault predictions and explanations.")

    with gr.Row():
        use_llm = gr.Checkbox(value=True, label="Use LLM for explanation (fallback if unavailable)")

    with gr.Tab("Upload CSV"):
        file_in = gr.File(file_types=[".csv"], label="Upload CSV")
        window_index = gr.Slider(0, 100, value=0, step=1, label="Window index")
        btn = gr.Button("Predict")
        plot_out = gr.Plot()
        pred_out = gr.Textbox(label="Prediction")
        exp_out = gr.Textbox(label="Explanation", lines=8)

        btn.click(ui_predict_from_csv, inputs=[file_in, window_index, use_llm], outputs=[plot_out, pred_out, exp_out])

    with gr.Tab("Manual CSV Window"):
        gr.Markdown("Paste CSV with columns: " + ", ".join(FEATURE_COLS))
        manual_text = gr.Textbox(lines=10, label="CSV Rows")
        btn2 = gr.Button("Predict")
        plot_out2 = gr.Plot()
        pred_out2 = gr.Textbox(label="Prediction")
        exp_out2 = gr.Textbox(label="Explanation", lines=8)

        btn2.click(ui_predict_manual, inputs=[manual_text, use_llm], outputs=[plot_out2, pred_out2, exp_out2])


# Launch
demo.launch(share=False)
