In [2]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder
from sklearn.pipeline import Pipeline

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


# -----------------------------
# 1) Data preparation
# -----------------------------
def prepare_failure_type_data(df: pd.DataFrame):
    # Drop ID columns
    df = df.drop(columns=["UDI", "Product ID"], errors="ignore")

    # IMPORTANT: drop Target if it exists (leak)
    df = df.drop(columns=["Target"], errors="ignore")

    if "Failure Type" not in df.columns:
        raise ValueError("Column 'Failure Type' not found in df.")

    X = df.drop(columns=["Failure Type"])
    y_raw = df["Failure Type"].astype(str).values

    # Encode y -> class indices
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y_raw)

    # Split FIRST (no leakage)
    X_train_df, X_val_df, y_train, y_val = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Preprocessor (fit on train only)
    cat_cols = ["Type"]
    try:
        ohe = OneHotEncoder(drop="first", sparse_output=False, handle_unknown="ignore")
    except TypeError:
        ohe = OneHotEncoder(drop="first", sparse=False, handle_unknown="ignore")

    preprocessor = ColumnTransformer(
        transformers=[("cat", ohe, cat_cols)],
        remainder="passthrough"
    )

    X_train = preprocessor.fit_transform(X_train_df)
    X_val = preprocessor.transform(X_val_df)

    # Scale (fit on train only)
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)

    return X_train, X_val, y_train, y_val, preprocessor, scaler, label_encoder


# -----------------------------
# 2) PyTorch Dataset
# -----------------------------
class MaintenanceDataset(Dataset):
    """
    Multiclass Dataset:
    - X: float32 tensor
    - y: long tensor (class indices), shape (N,)
    """

    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)  # for CrossEntropyLoss

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


# -----------------------------
# 3) Build model (multiple architectures)
# -----------------------------
def build_model(input_dim: int,
                num_classes: int,
                hidden_sizes=(64, 32),
                activation="relu",
                dropout=0.0,
                use_batchnorm=False) -> nn.Module:
    """
    Build an MLP for multiclass classification.
    - Output layer: Linear(last_hidden, num_classes)  (NO Sigmoid/Softmax here)
    - activation: 'relu' | 'leakyrelu' | 'elu'
    - optional: Dropout, BatchNorm
    """

    act_map = {
        "relu": nn.ReLU,
        "leakyrelu": lambda: nn.LeakyReLU(negative_slope=0.01),
        "elu": nn.ELU,
    }
    if activation not in act_map:
        raise ValueError(f"Unknown activation='{activation}'. Choose from {list(act_map.keys())}.")

    layers = []
    prev = input_dim

    for h in hidden_sizes:
        layers.append(nn.Linear(prev, h))
        if use_batchnorm:
            layers.append(nn.BatchNorm1d(h))
        layers.append(act_map[activation]())
        if dropout and dropout > 0:
            layers.append(nn.Dropout(dropout))
        prev = h

    layers.append(nn.Linear(prev, num_classes))  # logits
    return nn.Sequential(*layers)


# -----------------------------
# 4) Train / Eval (epoch-level)
# -----------------------------
def train_one_epoch(model: nn.Module, train_loader: DataLoader, criterion, optimizer, device="cpu") -> float:
    model.train()
    total_loss = 0.0
    num_batches = 0

    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        logits = model(X_batch)
        loss = criterion(logits, y_batch)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

    return float(total_loss / num_batches)


def evaluate_accuracy(model: nn.Module, val_loader: DataLoader, device="cpu") -> float:
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            logits = model(X_batch)
            preds = torch.argmax(logits, dim=1)  # class index
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

    return float(correct / total)


# -----------------------------
# 5) Full experiment runner (compare architectures)
# -----------------------------
def run_experiment(X_train, y_train, X_val, y_val,
                   num_classes: int,
                   config: dict,
                   epochs=20,
                   batch_size=256,
                   lr=1e-3,
                   weight_decay=0.0,
                   device="cpu"):
    """
    Train for N epochs with one architecture config and return best val accuracy.
    """
    train_ds = MaintenanceDataset(X_train, y_train)
    val_ds = MaintenanceDataset(X_val, y_val)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

    model = build_model(
        input_dim=X_train.shape[1],
        num_classes=num_classes,
        hidden_sizes=config.get("hidden_sizes", (64, 32)),
        activation=config.get("activation", "relu"),
        dropout=config.get("dropout", 0.0),
        use_batchnorm=config.get("use_batchnorm", False),
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    best_val_acc = 0.0
    history = []

    for epoch in range(1, epochs + 1):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device=device)
        val_acc = evaluate_accuracy(model, val_loader, device=device)

        best_val_acc = max(best_val_acc, val_acc)
        history.append((epoch, train_loss, val_acc))

    return model, best_val_acc, history


# -----------------------------
# 6) Example usage: load CSV + compare multiple architectures
# -----------------------------
if __name__ == "__main__":
    df = pd.read_csv("predictive_maintenance.csv")

    X_train, X_val, y_train, y_val, preprocessor, scaler, label_enc = prepare_failure_type_data(df)
    num_classes = len(label_enc.classes_)

    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Define architectures to compare
    configs = [
        {"name": "MLP_ReLU_64-32", "hidden_sizes": (64, 32), "activation": "relu", "dropout": 0.0, "use_batchnorm": False},
        {"name": "MLP_ReLU_128-64-32 + Dropout", "hidden_sizes": (128, 64, 32), "activation": "relu", "dropout": 0.2, "use_batchnorm": False},
        {"name": "MLP_LeakyReLU_64-32 + BN", "hidden_sizes": (64, 32), "activation": "leakyrelu", "dropout": 0.0, "use_batchnorm": True},
        {"name": "MLP_ELU_128-64 + Dropout+BN", "hidden_sizes": (128, 64), "activation": "elu", "dropout": 0.3, "use_batchnorm": True},
    ]

    results = []
    best_overall = None

    for cfg in configs:
        model, best_val_acc, history = run_experiment(
            X_train, y_train, X_val, y_val,
            num_classes=num_classes,
            config=cfg,
            epochs=25,
            batch_size=256,
            lr=1e-3,
            weight_decay=1e-4,   # L2 regularization
            device=device
        )

        results.append((cfg["name"], best_val_acc))
        if best_overall is None or best_val_acc > best_overall[1]:
            best_overall = (cfg["name"], best_val_acc, model)

        print(f"{cfg['name']}: best val acc = {best_val_acc:.4f}")

    print("\nClasses (Failure Type):")
    print(list(label_enc.classes_))

    print("\nBest architecture:")
    print(f"{best_overall[0]} with best val acc = {best_overall[1]:.4f}")


MLP_ReLU_64-32: best val acc = 0.9700
MLP_ReLU_128-64-32 + Dropout: best val acc = 0.9735
MLP_LeakyReLU_64-32 + BN: best val acc = 0.9760
MLP_ELU_128-64 + Dropout+BN: best val acc = 0.9720

Classes (Failure Type):
['Heat Dissipation Failure', 'No Failure', 'Overstrain Failure', 'Power Failure', 'Random Failures', 'Tool Wear Failure']

Best architecture:
MLP_LeakyReLU_64-32 + BN with best val acc = 0.9760
