
# Predictive Maintenance for Machinery

End-to-end notebook for data loading, EDA, feature engineering, model training, evaluation, and export for a predictive maintenance classifier.


In [None]:

# Setup & Imports
import os, json, math, joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import GroupKFold, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.metrics import roc_auc_score, average_precision_score, precision_recall_curve, roc_curve, confusion_matrix, f1_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance
import warnings
warnings.filterwarnings("ignore")

DATA_PATH = "data/sample_sensor_data.csv"
MODEL_DIR = "models"
os.makedirs(MODEL_DIR, exist_ok=True)
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)


In [None]:

# Load data
df = pd.read_csv(DATA_PATH, parse_dates=["timestamp"])
print("Rows:", len(df))
df.head()


In [None]:

# Basic EDA
print(df.describe(include='all'))

# Class balance
print("Failures:", df['failure'].sum(), "(", round(100*df['failure'].mean(),2), "% )")

# Plot a sample machine's signals
m = df['machine_id'].sample(1, random_state=RANDOM_STATE).iloc[0]
sub = df[df.machine_id==m].sort_values("timestamp")
plt.figure()
plt.plot(sub['timestamp'], sub['sensor_temp']); plt.title(f'Machine {m} - Temperature over time'); plt.xticks(rotation=45); plt.tight_layout(); plt.show()

plt.figure()
plt.plot(sub['timestamp'], sub['sensor_vibration']); plt.title(f'Machine {m} - Vibration over time'); plt.xticks(rotation=45); plt.tight_layout(); plt.show()


In [None]:

# Feature Engineering: rolling stats and lags per machine
df = df.sort_values(["machine_id", "timestamp"]).copy()

def add_rolling_features(g, cols, wins=[3,6]):
    for c in cols:
        for w in wins:
            g[f"{c}_rollmean_{w}"] = g[c].rolling(window=w, min_periods=1).mean()
            g[f"{c}_rollstd_{w}"] = g[c].rolling(window=w, min_periods=1).std().fillna(0)
            g[f"{c}_lag_1"] = g[c].shift(1)
    return g

feat_cols = ["sensor_temp", "sensor_vibration", "sensor_pressure", "op_load", "op_rpm"]
df = df.groupby("machine_id", group_keys=False).apply(lambda g: add_rolling_features(g, feat_cols))

# Fill any NaNs from rolling/lag
df = df.fillna(method="bfill").fillna(method="ffill")

target = "failure"
group = "machine_id"

X = df.drop(columns=[target, "timestamp"])
y = df[target].astype(int).values
groups = df[group].values
feature_names = X.columns.tolist()

print("Feature count:", len(feature_names))
X.head()


In [None]:

# Train/Validation split by machines to reduce leakage
machines = np.unique(groups)
train_machines, test_machines = train_test_split(machines, test_size=0.25, random_state=RANDOM_STATE)

is_train = np.isin(groups, train_machines)
X_train, X_test = X[is_train], X[~is_train]
y_train, y_test = y[is_train], y[~is_train]

print("Train size:", X_train.shape, "Test size:", X_test.shape)


In [None]:

# Two baselines: Logistic Regression and Random Forest
num_features = feature_names

preprocess = ColumnTransformer(
    transformers=[("num", StandardScaler(with_mean=False), [X.columns.get_loc(c) for c in num_features])],
    remainder="drop",
)

logreg = Pipeline([("prep", preprocess),
                   ("clf", LogisticRegression(max_iter=1000, class_weight="balanced", random_state=RANDOM_STATE))])

rf = Pipeline([("prep", preprocess),
               ("clf", RandomForestClassifier(
                   n_estimators=300, max_depth=None, n_jobs=-1, class_weight="balanced_subsample",
                   random_state=RANDOM_STATE
               ))])

models = {"logreg": logreg, "random_forest": rf}

metrics = {}
probs = {}
for name, mdl in models.items():
    mdl.fit(X_train, y_train)
    p = mdl.predict_proba(X_test)[:,1]
    probs[name] = p
    roc = roc_auc_score(y_test, p)
    pr  = average_precision_score(y_test, p)
    metrics[name] = {"roc_auc": roc, "pr_auc": pr}
metrics


In [None]:

# ROC and PR curves for the better model
best = max(metrics, key=lambda k: metrics[k]["roc_auc"])
p = probs[best]

fpr, tpr, thr = roc_curve(y_test, p)
plt.figure()
plt.plot(fpr, tpr); plt.title(f"ROC Curve - {best}"); plt.xlabel("FPR"); plt.ylabel("TPR"); plt.tight_layout(); plt.show()

prec, rec, thr2 = precision_recall_curve(y_test, p)
plt.figure()
plt.plot(rec, prec); plt.title(f"Precision-Recall Curve - {best}"); plt.xlabel("Recall"); plt.ylabel("Precision"); plt.tight_layout(); plt.show()

print("Best model:", best, metrics[best])


In [None]:

# Choose decision threshold based on F1 (could also be cost-based)
def best_threshold_by_f1(y_true, p):
    prec, rec, thr = precision_recall_curve(y_true, p)
    f1s = 2*prec*rec/(prec+rec+1e-12)
    i = np.nanargmax(f1s)
    # precision_recall_curve returns thresholds len-1 vs len for prec/rec
    chosen_thr = 0.5 if i >= len(thr) else thr[i]
    return chosen_thr, f1s[i]

thr_opt, f1_opt = best_threshold_by_f1(y_test, p)
y_pred = (p >= thr_opt).astype(int)
cm = confusion_matrix(y_test, y_pred)

print("Chosen threshold:", round(float(thr_opt),4), "F1:", round(float(f1_opt),4))
print("Confusion matrix (TN, FP, FN, TP):")
print(cm.ravel())


In [None]:

# Permutation importance for interpretability (on a subset to speed up)
best_model = models[best]
sub_idx = np.random.choice(np.arange(X_test.shape[0]), size=min(2000, X_test.shape[0]), replace=False)
r = permutation_importance(best_model, X_test[sub_idx], y_test[sub_idx], n_repeats=5, random_state=RANDOM_STATE, scoring="roc_auc")

imp = pd.DataFrame({"feature": feature_names, "importance": r.importances_mean}).sort_values("importance", ascending=False)
imp.head(15)


In [None]:

# Save artifacts
joblib.dump(best_model, os.path.join(MODEL_DIR, "pd_maint_model.joblib"))
with open(os.path.join(MODEL_DIR, "pd_maint_model.meta.json"), "w") as f:
    json.dump({"best_model": best, "metrics": metrics[best], "threshold": float(thr_opt)}, f, indent=2)
print("Saved model and metadata to 'models/'")


In [None]:

# Inference helper
def predict_failure(probe_df):
    mdl = joblib.load(os.path.join(MODEL_DIR, "pd_maint_model.joblib"))
    meta = json.load(open(os.path.join(MODEL_DIR, "pd_maint_model.meta.json")))
    p = mdl.predict_proba(probe_df[feature_names])[:,1]
    return p, meta

# Example: take the last 5 rows of test set
probe = df[~np.isin(df['machine_id'].values, train_machines)].copy()
probe = probe.sort_values(["machine_id","timestamp"]).tail(5)
proba, meta = predict_failure(probe.drop(columns=["failure","timestamp"]))
out = probe[["machine_id","timestamp"]].copy()
out["predicted_failure_proba"] = proba
out, meta
