In [5]:
!pip -q install numpy==2.0.2 pandas==2.2.2 scikit-learn==1.6.1 matplotlib==3.9.2 seaborn==0.13.2 joblib==1.4.2

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.3/8.3 MB[0m [31m54.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m301.8/301.8 kB[0m [31m19.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [52]:
%%writefile rt_maint.py
import time
import threading
import queue
from datetime import datetime, timezone
import os
import math
import joblib
import numpy as np
import pandas as pd
import streamlit as st
import matplotlib.pyplot as plt

from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import precision_recall_curve

# ---------------------------
# Page setup
# ---------------------------
st.set_page_config(page_title="IoT Predictive Maintenance — Realtime", layout="wide")
st.title("🔧 Real-Time IoT Predictive Maintenance Dashboard")

# ---------------------------
# Config
# ---------------------------
FEATURES = ["vibration", "temperature", "pressure", "rpm"]
STREAM_INTERVAL_SEC = 0.6         # simulated sampling interval
WINDOW_SECONDS = 600              # keep ~10 min rolling window
ALERT_COOLDOWN_SEC = 30           # seconds between alerts per sensor
MODEL_PATH = "rt_maint_pipeline.joblib"

# ---------------------------
# Helpers
# ---------------------------
def robust_z(x, series):
    """Median Absolute Deviation z-score (robust to outliers)."""
    med = np.nanmedian(series)
    mad = np.nanmedian(np.abs(series - med)) or 1.0
    return (x - med) / (1.4826 * mad)

def ewma(arr, alpha=0.2):
    if len(arr) == 0:
        return np.nan
    s = arr[0]
    for v in arr[1:]:
        s = alpha * v + (1 - alpha) * s
    return s

def psi(expected, actual, bins=10, eps=1e-9):
    """
    Population Stability Index for drift (lower better; <0.1 good, 0.1-0.25 moderate, >0.25 large).
    """
    expected = pd.Series(expected).dropna()
    actual = pd.Series(actual).dropna()
    if expected.empty or actual.empty:
        return np.nan
    qs = np.linspace(0, 1, bins + 1)
    cuts = np.unique(np.quantile(expected, qs))
    if len(cuts) < 3:
        return 0.0
    e_hist = np.histogram(expected, bins=cuts)[0] / max(len(expected), 1)
    a_hist = np.histogram(actual, bins=cuts)[0] / max(len(actual), 1)
    return float(np.sum((a_hist - e_hist) * np.log((a_hist + eps) / (e_hist + eps))))

def make_baseline(n=2000, seed=42):
    rng = np.random.default_rng(seed)
    base = pd.DataFrame({
        "vibration": rng.normal(0.35, 0.05, n),
        "temperature": rng.normal(58, 2.5, n),
        "pressure": rng.normal(3.2, 0.15, n),
        "rpm": rng.normal(1400, 120, n),
    })
    return base

# ---------- Plotting helper (dual axis) ----------
def plot_dual_axis_time(t, y_left, y_right, title_left="Vibration & Temp (dual axis)"):
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()

    ln1 = ax1.plot(t, y_left, color="tab:blue", label="vibration")
    ln2 = ax2.plot(t, y_right, color="orange", label="temperature")

    ax1.set_ylabel("Vibration", color="tab:blue")
    ax2.set_ylabel("Temperature", color="orange")
    ax1.tick_params(axis="y", colors="tab:blue")
    ax2.tick_params(axis="y", colors="orange")
    ax1.grid(True, axis="y", linestyle="--", alpha=0.5)
    ax1.tick_params(axis="x", rotation=30, labelsize=9)

    ax1.set_title(title_left)
    lines = ln1 + ln2
    labels = [l.get_label() for l in lines]
    ax1.legend(lines, labels, loc="best", frameon=True, framealpha=0.85)
    return fig

# ---------------------------
# Session state init
# ---------------------------
if "stream_q" not in st.session_state:
    st.session_state.stream_q = queue.Queue(maxsize=10000)

if "live_df" not in st.session_state:
    st.session_state.live_df = pd.DataFrame(columns=["timestamp", "sensor_id", *FEATURES])

if "alerts" not in st.session_state:
    st.session_state.alerts = []  # list of dicts

if "last_alert_time" not in st.session_state:
    st.session_state.last_alert_time = {}  # per sensor

if "baseline" not in st.session_state:
    st.session_state.baseline = make_baseline()

if "model" not in st.session_state:
    # Online-capable classifier
    model = Pipeline([
        ("scaler", StandardScaler(with_mean=True, with_std=True)),
        ("clf", SGDClassifier(loss="log_loss", alpha=1e-4, learning_rate="optimal", random_state=123))
    ])
    st.session_state.model = model
    st.session_state.model_classes = np.array([0, 1])
    st.session_state.is_model_warm = False

# ---------------------------
# Simulator (background thread)
# ---------------------------
def sensor_simulator(out_q: queue.Queue, stop_event: threading.Event, n_sensors=4):
    rng = np.random.default_rng(2025)
    drift_start = time.time() + 90   # drift for S03 after 1.5 min
    failure_rate = 0.05              # probability of weak label row

    while not stop_event.is_set():
        t_iso = datetime.now(timezone.utc).isoformat()
        for sid in range(1, n_sensors + 1):
            # Base signals
            vibration = rng.normal(0.35, 0.05)
            temperature = rng.normal(58, 2.5)
            pressure = rng.normal(3.2, 0.15)
            rpm = rng.normal(1400, 120)

            # Spikes (demo)
            if rng.random() < 0.15:
                vibration += rng.normal(0.18, 0.05)
                temperature += rng.normal(4.5, 1.0)

            # Controlled drift on S03
            if time.time() > drift_start and sid == 3:
                vibration += 0.10
                temperature += 1.2

            row = {
                "timestamp": t_iso,
                "sensor_id": f"S{sid:02d}",
                "vibration": float(vibration),
                "temperature": float(temperature),
                "pressure": float(pressure),
                "rpm": float(rpm),
            }

            # Occasionally attach a weak label for online learning
            if rng.random() < failure_rate:
                row["label"] = 1 if (row["vibration"] > 0.42 or row["temperature"] > 62.0) else 0

            try:
                out_q.put(row, timeout=0.01)
            except queue.Full:
                pass

        time.sleep(STREAM_INTERVAL_SEC)

if "sim_stop" not in st.session_state:
    st.session_state.sim_stop = threading.Event()
    st.session_state.sim_thread = threading.Thread(
        target=sensor_simulator,
        args=(st.session_state.stream_q, st.session_state.sim_stop),
        daemon=True,
    )
    st.session_state.sim_thread.start()

# ---------------------------
# Sidebar Controls
# ---------------------------
st.sidebar.header("⚙️ Controls")
alert_threshold = st.sidebar.slider("Alert threshold (risk ≥)", 0.10, 0.99, 0.70, 0.01)
anomaly_weight = st.sidebar.slider("Anomaly weight (0→risk from model only, 1→anomaly only)", 0.0, 1.0, 0.40, 0.05)
train_rate = st.sidebar.slider("Online training rate (probability to partial_fit on labeled events)", 0.0, 1.0, 0.35, 0.05)
show_points = st.sidebar.slider("Plot last N points per sensor", 100, 1500, 700, 50)

colA, colB, colC = st.sidebar.columns(3)
if colA.button("💾 Save"):
    joblib.dump(st.session_state.model, MODEL_PATH)
    st.sidebar.success(f"Saved → {MODEL_PATH}")
if colB.button("📂 Load") and os.path.exists(MODEL_PATH):
    st.session_state.model = joblib.load(MODEL_PATH)
    st.sidebar.success(f"Loaded ← {MODEL_PATH}")
if colC.button("🧹 Clear Alerts"):
    st.session_state.alerts = []
    st.session_state.last_alert_time = {}

# ---------------------------
# Rolling store & features
# ---------------------------
def append_live(df, row):
    st.session_state.live_df.loc[len(st.session_state.live_df)] = row

def trim_window():
    df = st.session_state.live_df
    if df.empty:
        return
    ts = pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
    cutoff = pd.Timestamp.now(tz="UTC") - pd.Timedelta(seconds=WINDOW_SECONDS)
    st.session_state.live_df = df[ts >= cutoff].reset_index(drop=True)

def engineer_features(df_win: pd.DataFrame) -> pd.DataFrame:
    if df_win.empty:
        cols = ["sensor_id", "timestamp", *FEATURES, "z_vib", "ewma_vib", "z_tmp", "ewma_tmp"]
        return pd.DataFrame(columns=cols)
    g = df_win.groupby("sensor_id", group_keys=False)
    def fe(gdf):
        vib = gdf["vibration"].values
        tmp = gdf["temperature"].values
        return pd.DataFrame({
            "sensor_id": gdf["sensor_id"].values,
            "timestamp": gdf["timestamp"].values,
            "vibration": gdf["vibration"].values,
            "temperature": gdf["temperature"].values,
            "pressure": gdf["pressure"].values,
            "rpm": gdf["rpm"].values,
            "z_vib": [robust_z(x, vib) for x in vib],
            "ewma_vib": [ewma(vib[:i+1]) for i in range(len(vib))],
            "z_tmp": [robust_z(x, tmp) for x in tmp],
            "ewma_tmp": [ewma(tmp[:i+1]) for i in range(len(tmp))],
        })
    return g.apply(fe)

def score_batch(fe_df: pd.DataFrame) -> pd.DataFrame:
    if fe_df.empty:
        return fe_df.assign(anomaly=[])
    # anomaly score: sigmoid(|z_vib| + |z_tmp|)
    zscore = np.abs(fe_df["z_vib"].astype(float).values) + np.abs(fe_df["z_tmp"].astype(float).values)
    anomaly = 1 / (1 + np.exp(-zscore))
    fe_df = fe_df.assign(anomaly=anomaly)

    # supervised risk (if model is warm)
    feats = ["vibration", "temperature", "pressure", "rpm", "z_vib", "z_tmp", "ewma_vib", "ewma_tmp"]
    X = fe_df[feats].astype(float).values
    if st.session_state.is_model_warm:
        risk_sup = st.session_state.model.predict_proba(X)[:, 1]
    else:
        risk_sup = np.zeros(len(fe_df))
    risk = anomaly_weight * anomaly + (1 - anomaly_weight) * risk_sup
    return fe_df.assign(risk=risk, risk_sup=risk_sup)

def maybe_train_online(fe_df: pd.DataFrame, raw_df: pd.DataFrame):
    if "label" not in raw_df.columns or raw_df["label"].isna().all():
        return
    lab = raw_df[["timestamp", "sensor_id", "label"]].dropna()
    if lab.empty:
        return
    df = fe_df.merge(lab, on=["timestamp", "sensor_id"], how="inner")
    if df.empty:
        return
    # Subsample for online updates
    mask = np.random.rand(len(df)) < train_rate
    df = df[mask]
    if df.empty:
        return

    feats = ["vibration", "temperature", "pressure", "rpm", "z_vib", "z_tmp", "ewma_vib", "ewma_tmp"]
    X = df[feats].astype(float).values
    y = df["label"].astype(int).values

    scaler = st.session_state.model.named_steps["scaler"]
    clf = st.session_state.model.named_steps["clf"]

    # first call needs classes
    Xs = scaler.fit_transform(X)
    clf.partial_fit(Xs, y, classes=st.session_state.model_classes)
    st.session_state.is_model_warm = True

def add_alerts(risk_df: pd.DataFrame):
    if risk_df.empty:
        return
    now = time.time()
    for sid, grp in risk_df.groupby("sensor_id"):
        last = grp.tail(1).iloc[0]
        risk = float(last["risk"])
        if risk >= alert_threshold:
            last_t = st.session_state.last_alert_time.get(sid, 0)
            if now - last_t >= ALERT_COOLDOWN_SEC:
                st.session_state.alerts.append({
                    "time": last["timestamp"],
                    "sensor_id": sid,
                    "risk": round(risk, 3),
                    "vibration": round(float(last["vibration"]), 3),
                    "temperature": round(float(last["temperature"]), 2),
                })
                st.session_state.last_alert_time[sid] = now

# ---------------------------
# Drain → store → features → score → train → alerts
# ---------------------------
batch_rows = []
while not st.session_state.stream_q.empty():
    try:
        batch_rows.append(st.session_state.stream_q.get_nowait())
    except queue.Empty:
        break

if batch_rows:
    raw_batch = pd.DataFrame(batch_rows)
    for _, r in raw_batch.iterrows():
        append_live(st.session_state.live_df, r)
    trim_window()
    fe = engineer_features(st.session_state.live_df)
    scored = score_batch(fe)
    maybe_train_online(fe, raw_batch)
    add_alerts(scored)
else:
    fe = engineer_features(st.session_state.live_df)
    scored = score_batch(fe)

# ---------------------------
# Sidebar KPIs (live)
# ---------------------------
st.sidebar.divider()
st.sidebar.subheader("📊 Live KPIs")

# Always show alert counter
st.sidebar.metric("Active Alerts", len(st.session_state.alerts))

# Show risk KPIs if we have scored data
if not scored.empty:
    latest_per = (
        scored.groupby("sensor_id", as_index=False)
              .tail(1)
              .sort_values("sensor_id")
              .reset_index(drop=True)
    )
    avg_risk = float(latest_per["risk"].mean())
    max_idx = int(latest_per["risk"].idxmax())
    max_risk = float(latest_per.loc[max_idx, "risk"])
    worst_sensor = str(latest_per.loc[max_idx, "sensor_id"])

    st.sidebar.metric("Avg Risk (latest)", f"{avg_risk:.2f}")
    st.sidebar.metric("Max Risk (latest)", f"{max_risk:.2f}")
    st.sidebar.caption(f"Worst sensor right now: **{worst_sensor}**")

    # (Optional) Per-sensor mini KPIs
    with st.sidebar.expander("Per-sensor risk (latest)"):
        for _, row in latest_per.iterrows():
            sid = row["sensor_id"]
            r = float(row["risk"])
            st.write(f"**{sid}** — {r:.2f}")
else:
    st.sidebar.caption("Waiting for data to compute risk KPIs…")
# ---------------------------
# UI Tabs
# ---------------------------
tab1, tab2, tab3, tab4 = st.tabs(["📈 Live Monitor", "🚨 Alerts", "🧠 Model", "🧪 Data Quality & Drift"])

# ---------- Live Monitor ----------
with tab1:
    st.subheader("Live Signals & Risk")
    if scored.empty:
        st.info("Waiting for stream…")
    else:
        latest = scored.groupby("sensor_id", as_index=False).tail(1).sort_values("sensor_id")
        st.dataframe(
            latest[["timestamp", "sensor_id", "vibration", "temperature", "pressure", "rpm", "anomaly", "risk"]]
            .assign(risk=lambda d: d["risk"].round(3), anomaly=lambda d: d["anomaly"].round(3))
        )

        for sid, g in scored.groupby("sensor_id"):
            g = g.tail(show_points)
            st.markdown(f"**Sensor {sid}**")
            c1, c2 = st.columns(2)

            t = pd.to_datetime(g["timestamp"], utc=True, errors="coerce")

            # Left: dual-axis line plot
            with c1:
                fig = plot_dual_axis_time(
                    t,
                    g["vibration"].values,
                    g["temperature"].values,
                    title_left=f"{sid} — Vibration & Temp (dual axis)",
                )
                st.pyplot(fig)

            # Right: risk + threshold
            with c2:
                fig, ax = plt.subplots()
                ax.plot(t, g["risk"], label="risk")
                ax.axhline(alert_threshold, linestyle="--", linewidth=1)
                ax.set_ylim(0, 1.05)
                ax.set_title(f"{sid} — Risk")
                ax.legend(loc="best", frameon=True, framealpha=0.85)
                ax.grid(True, axis="y", linestyle="--", alpha=0.4)
                ax.tick_params(axis="x", rotation=30, labelsize=9)
                st.pyplot(fig)

# ---------- Alerts ----------
with tab2:
    st.subheader("Active Alerts")
    if not st.session_state.alerts:
        st.success("No alerts. All sensors nominal.")
    else:
        st.dataframe(pd.DataFrame(st.session_state.alerts).iloc[::-1].reset_index(drop=True))

# ---------- Model ----------
with tab3:
    st.subheader("Online Model Status")
    st.write(f"Warm-started: **{st.session_state.is_model_warm}**")
    st.caption("The classifier updates online when labeled events trickle in (simulated).")
    # Simple proxy PR curve on the latest window: use top-k risk as positives
    if not scored.empty:
        try:
            y_scores = scored["risk"].values
            k = max(5, int(0.1 * len(y_scores)))
            proxy_y = np.zeros_like(y_scores, dtype=int)
            proxy_y[np.argsort(y_scores)[-k:]] = 1
            P, R, T = precision_recall_curve(proxy_y, y_scores)
            f1 = 2 * (P * R) / np.clip(P + R, 1e-9, None)
            if len(T) > 0:
                best = int(np.nanargmax(f1[:-1]))
                st.info(f"Window-suggested threshold ≈ **{T[best]:.2f}** (proxy F1≈{f1[best]:.2f})")
            fig, ax = plt.subplots()
            ax.plot(R, P)
            ax.set_xlabel("Recall"); ax.set_ylabel("Precision"); ax.set_title("Proxy Precision–Recall (Risk Top-k as Positives)")
            st.pyplot(fig)
        except Exception:
            pass

# ---------- Data Quality & Drift ----------
with tab4:
    st.subheader("Data Quality")
    if st.session_state.live_df.empty:
        st.info("Waiting for stream…")
    else:
        dfq = st.session_state.live_df
        dq_rows = []
        for sid, g in dfq.groupby("sensor_id"):
            last = g.tail(200)
            spikes = ((last["vibration"] - last["vibration"].rolling(5, min_periods=1).mean()).abs() > 0.2).sum()
            dq_rows.append({
                "sensor_id": sid,
                "n_last": len(last),
                "missing_any": int(last[FEATURES].isna().any().any()),
                "flatlines_vibration": int(last["vibration"].nunique() < 5),
                "spikes_vibration": int(spikes > 3),
            })
        st.dataframe(pd.DataFrame(dq_rows))

        st.subheader("Drift (PSI vs Baseline)")
        psi_rows = []
        base = st.session_state.baseline
        cur = dfq.tail(1500)
        for col in FEATURES:
            val = psi(base[col], cur[col])
            psi_rows.append({"feature": col, "psi": round(val if pd.notna(val) else 0.0, 3)})
        st.dataframe(pd.DataFrame(psi_rows))
        st.caption("Heuristic: PSI < 0.10 stable • 0.10–0.25 moderate shift • >0.25 significant shift.")

st.caption("Demo: simulator emits sensor events; the app maintains a rolling window, computes features, blends anomaly + online risk, triggers alerts, and tracks drift & data quality.")


Overwriting rt_maint.py


In [41]:
!pip -q install streamlit pyngrok==7.1.6
!pkill -f streamlit || true
!streamlit run app_churn.py --server.port 8501 &>/content/logs.txt &

^C


In [54]:
from pyngrok import ngrok
import os, time
os.environ["NGROK_TOKEN"] = "2tZ6mqHFZ9n2B4HsTOzAPVA3Jnw_6qB1RFncPLxV8kcYUxNcJ"
ngrok.set_auth_token(os.environ["NGROK_TOKEN"])
time.sleep(2)
print("URL:", ngrok.connect(8501, "http").public_url)


URL: https://5bc312c70faa.ngrok-free.app
