Task 0 — Setup & Utilities

### Setup and Utility Functions
This section imports required libraries, defines constants, and provides utility functions for time conversion and contiguous run detection.

In [151]:
import os
import time
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, List, Tuple

import psycopg2
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, MinMaxScaler

plt.switch_backend("agg")  # safe for headless environments

AXES = [f"axis_{i}" for i in range(1, 9)]
META_AXES = [f"axis_{i}" for i in range(9, 15)]
TIME_COL = "Time"
TRAIT_COL = "Trait"

def contiguous_runs(mask: np.ndarray) -> List[Tuple[int,int]]:
    runs, in_run, start = [], False, 0
    for i, val in enumerate(mask):
        if val and not in_run: in_run, start = True, i
        elif not val and in_run: runs.append((start, i-1)); in_run = False
    if in_run: runs.append((start, len(mask)-1))
    return runs

def to_seconds(tseries: pd.Series) -> pd.Series:
    t = pd.to_datetime(tseries)
    return (t - t.iloc[0]).dt.total_seconds()

def median_dt_seconds(tseries: pd.Series) -> float:
    t = pd.to_datetime(tseries)
    dt = t.diff().dt.total_seconds().dropna()
    return float(dt.median() if len(dt) else 1.0)


🟦 Task 1 — Training Data from DB (Neon/Postgres)

### Database Handler
This section defines the database configuration and a handler class to fetch training data snapshots from the Postgres database.

In [152]:
DB_CONFIG = {
    "host": "ep-twilight-poetry-ad4klocb-pooler.c-2.us-east-1.aws.neon.tech",
    "database": "neondb",
    "user": "neondb_owner",
    "password": "npg_w8YtZHyuePS7",
    "port": "5432",
    "sslmode": "require",
}

class DBHandler:
    def __init__(self, cfg: Dict):
        self.cfg = cfg

    def connect(self):
        return psycopg2.connect(**self.cfg)

    def fetch_training_snapshot(self, table: str = "pm_dashboard", limit: int = 50000) -> pd.DataFrame:
        with self.connect() as conn:
            query = f"""
                SELECT timestamp_recorded AS "{TIME_COL}", {", ".join([f'"{a}"' for a in AXES])}
                FROM {table}
                WHERE timestamp_recorded IS NOT NULL
                ORDER BY timestamp_recorded ASC
                LIMIT {limit};
            """
            df = pd.read_sql(query, conn)
        df = df.rename(columns={"timestamp_recorded": TIME_COL})            
        return df


🟦 Task 2 — Regression Models (Univariate)

### Regression Model Classes
Defines data structures and classes for fitting univariate linear regression models to each axis.

In [153]:
@dataclass
class AxisCoeffs:
    axis: str
    slope: float
    intercept: float
    r2: float

class AxisModel:
    def __init__(self, axis: str):
        self.axis = axis
        self.model = LinearRegression()
        self.slope = None
        self.intercept = None

    def fit(self, t_sec: np.ndarray, y: np.ndarray):
        X = t_sec.reshape(-1, 1)
        self.model.fit(X, y)
        self.slope = float(self.model.coef_[0])
        self.intercept = float(self.model.intercept_)
        return self

    def predict(self, t_sec: np.ndarray) -> np.ndarray:
        return self.model.predict(t_sec.reshape(-1, 1))

    def residuals(self, t_sec: np.ndarray, y: np.ndarray) -> np.ndarray:
        return y - self.predict(t_sec)

    def fit_report(self, t_sec: np.ndarray, y: np.ndarray) -> AxisCoeffs:
        yhat = self.predict(t_sec)
        ss_res = np.sum((y - yhat) ** 2)
        ss_tot = np.sum((y - y.mean()) ** 2) if len(y) else 0.0
        r2 = 1 - ss_res/ss_tot if ss_tot > 0 else np.nan
        return AxisCoeffs(self.axis, self.slope, self.intercept, r2)


🟦 Task 3 — Residual Analysis & Threshold Discovery

### Residual Analysis
Analyzes model residuals to determine alert and error thresholds for anomaly detection.

In [154]:
@dataclass
class AxisThresholds:
    axis: str
    z_alert: float
    z_error: float
    MinC: float
    MaxC: float
    T_seconds: float
    dt_s: float
    resid_mu: float
    resid_sigma: float

class ResidualAnalyzer:
    def __init__(self, dt_s: float, z_alert=1.65, z_error=2.57):
        self.dt_s = dt_s
        self.z_alert = z_alert
        self.z_error = z_error

    def discover(self, axis_model: AxisModel, t_sec: np.ndarray, y: np.ndarray) -> AxisThresholds:
        resid = axis_model.residuals(t_sec, y)
        mu = float(np.mean(resid))
        sigma = float(np.std(resid, ddof=1)) if len(resid) > 1 else 1.0

        # store in model for later
        axis_model.resid_mu = mu
        axis_model.resid_sigma = sigma

        # Convert Z thresholds to current thresholds
        minC = self.z_alert * sigma
        maxC = self.z_error * sigma

        # Compute sustain time (95th percentile run length of positive exceedances)
        mask = resid - mu > minC
        runs = contiguous_runs(mask)
        lengths = [j - i + 1 for i, j in runs] or [3]
        pts_thresh = int(np.quantile(lengths, 0.95))
        T_seconds = max(3, pts_thresh) * self.dt_s

        return AxisThresholds(axis_model.axis,
                              self.z_alert, self.z_error,
                              float(minC), float(maxC),
                              float(T_seconds), self.dt_s,
                              mu, sigma)


🟦 Task 4 — Trainer (Train models on DB data)

### Model Training and Visualization
Contains classes for training models, normalizing data, and generating regression and residual plots.

In [155]:
class Visualizer:
    def __init__(self, outdir: str):
        self.outdir = outdir
        os.makedirs(outdir, exist_ok=True)

    def plot_regression(self, axis: str, t_sec: np.ndarray, y: np.ndarray, yhat: np.ndarray, label="train"):
        plt.figure()
        plt.scatter(t_sec, y, s=3, label="Observed")
        plt.plot(t_sec, yhat, color="red", label="Regression")
        plt.legend()
        plt.xlabel("Time (s)")
        plt.ylabel("Current (A)")
        plt.title(f"{axis} – Regression Fit ({label})")
        plt.tight_layout()
        fname = os.path.join(self.outdir, f"{axis}_regression_{label}.png")
        plt.savefig(fname, dpi=150)
        plt.close()

    def plot_residuals(self, axis: str, t_sec: np.ndarray, resid: np.ndarray, th, label="train"):
        plt.figure()
        plt.plot(t_sec, resid, linewidth=1, label="Residuals")
        if th is not None:
            plt.axhline(th.resid_mu + th.MinC, color="orange", linestyle="--",
                        label=f"Alert (μ+{th.z_alert}σ)")
            plt.axhline(th.resid_mu + th.MaxC, color="red", linestyle="--",
                        label=f"Error (μ+{th.z_error}σ)")
        plt.legend()
        plt.xlabel("Time (s)")
        plt.ylabel("Residual (A)")
        plt.title(f"{axis} – Residuals ({label})")
        plt.tight_layout()
        fname = os.path.join(self.outdir, f"{axis}_residuals_{label}.png")
        plt.savefig(fname, dpi=150)
        plt.close()




class Trainer:
    def __init__(self, db: DBHandler, outdir="pm_outputs_local"):
        self.db = db
        self.outdir = outdir
        os.makedirs(outdir, exist_ok=True)
        self.models = {}
        self.thresholds = {}
        self.scaler_std = None
        self.scaler_mm = None

    def load_training(self, table="robot_readings", limit=50000, normalize=True) -> pd.DataFrame:
        df = self.db.fetch_training_snapshot(table=table, limit=limit)
        df["t_sec"] = to_seconds(df[TIME_COL])

        if normalize:
            # Fit scalers on training data (only once!)
            self.fit_scalers(df)
            df_scaled, _ = self.transform(df)   
            df_scaled["t_sec"] = df["t_sec"]    # keep time column aligned
            df_scaled[TIME_COL] = df[TIME_COL]  # keep raw timestamp for reference
            return df_scaled
        else:
            return df

    def fit(self, df_train: pd.DataFrame):
        t = df_train["t_sec"].to_numpy()
        dt_s = median_dt_seconds(df_train[TIME_COL])
        analyzer = ResidualAnalyzer(dt_s)
        viz = Visualizer(self.outdir)
        coeffs = []

        for axis in AXES:
            y = df_train[axis].to_numpy()
            m = AxisModel(axis).fit(t, y)
            th = analyzer.discover(m, t, y)

            # Save regression + residual plots
            yhat = m.predict(t)
            viz.plot_regression(axis, t, y, yhat)
            resid = m.residuals(t, y)
            viz.plot_residuals(axis, t, resid, th)

            self.models[axis] = m
            self.thresholds[axis] = th
            coeffs.append(m.fit_report(t, y).__dict__)

        pd.DataFrame(coeffs).to_csv(os.path.join(self.outdir, "model_coeffs.csv"), index=False)
        pd.DataFrame([vars(th) for th in self.thresholds.values()]).to_csv(
            os.path.join(self.outdir, "thresholds.csv"), index=False
        )


    def fit_scalers(self, df_train: pd.DataFrame):
        self.scaler_mm = MinMaxScaler().fit(df_train[AXES].values)

    def transform(self, df: pd.DataFrame):
        # Apply MinMax scaling
        mm = pd.DataFrame(self.scaler_mm.transform(df[AXES]), columns=AXES)

        # 🔑 Preserve timestamp column if present
        if TIME_COL in df.columns:
            mm[TIME_COL] = df[TIME_COL].values

        # 🔑 Optionally preserve trait or other metadata
        if TRAIT_COL in df.columns:
            mm[TRAIT_COL] = df[TRAIT_COL].values

        return mm, None

    def normalize_dataframe(self, df: pd.DataFrame, method="minmax") -> pd.DataFrame:
        """
        Normalize a dataframe using previously fitted scalers.
        method = "minmax" or "standard"
        """
        if method == "minmax":
            norm_values = self.scaler_mm.transform(df[AXES])
        elif method == "standard":
            norm_values = self.scaler_std.transform(df[AXES])
        else:
            raise ValueError("method must be 'minmax' or 'standard'")
        
        df_norm = df.copy()
        df_norm[AXES] = norm_values
        return df_norm



🟦 Task 5 — Synthetic Test Data (Optional in the event there is no synthetic data CSV file)

### Synthetic Data Generation
Generates synthetic test data based on trained models and thresholds for testing anomaly detection.

In [156]:
class SyntheticDataGenerator:
    def __init__(self, models, thresholds, outdir="pm_outputs_local"):
        self.models = models
        self.thresholds = thresholds
        self.outdir = outdir
        os.makedirs(outdir, exist_ok=True)

    def generate(self, t_start, n_rows, dt_s):
        t_sec = np.arange(n_rows, dtype=float) * dt_s
        times = pd.to_datetime(t_start) + pd.to_timedelta(t_sec, unit="s")
        data = {TRAIT_COL: ["current"]*n_rows, TIME_COL: times}
        rng = np.random.default_rng(42)

        for axis, m in self.models.items():
            trend = m.predict(t_sec)
            th = self.thresholds[axis]
            resid_std = max(0.1, th.MinC/1.65) if th.MinC > 0 else 0.1
            syn = trend + rng.normal(0, resid_std, size=n_rows)
            run_pts = max(3, int(th.T_seconds / th.dt_s))
            if n_rows > 6*run_pts:
                syn[100:100+run_pts] += th.MinC*1.2
                syn[300:300+run_pts] += th.MaxC*1.2
            data[axis] = syn

        for m in META_AXES: data[m] = np.nan
        df = pd.DataFrame(data)
        path = os.path.join(self.outdir, "synthetic_test.csv")
        df.to_csv(path, index=False)
        return df


🟦 Task 6 — Local Streaming Simulation (CSV only)

### Local Streaming Simulation
Simulates streaming of data from a CSV file, yielding one row at a time with a delay.

In [157]:
class LocalStreamer:
    def __init__(self, csv_path, delay_s=0.1):
        self.df = pd.read_csv(csv_path)
        self.idx = 0
        self.delay = delay_s

    def step(self):
        if self.idx >= len(self.df):
            return None
        row = self.df.iloc[self.idx]
        self.idx += 1
        time.sleep(self.delay)
        return row.to_dict()

    def run(self, max_steps=None):
        rows = []
        while True:
            if max_steps and self.idx >= max_steps:
                break
            row = self.step()
            if row is None: break
            rows.append(row)
        return pd.DataFrame(rows)


🟦 Task 7 — Anomaly Detection (local)

### Anomaly Detection
Detects alert and error events in streamed data based on model residuals and learned thresholds.

In [158]:
@dataclass
class Event:
    axis: str
    type: str
    start: float
    end: float
    duration: float
    z_mean: float
    p_value: float

class AnomalyDetector:
    def __init__(self, models, thresholds, outdir="pm_outputs_local"):
        self.models = models
        self.thresholds = thresholds
        self.events = []
        self.outdir = outdir
        os.makedirs(outdir, exist_ok=True)

    def detect(self, df: pd.DataFrame):
        t_sec = to_seconds(df[TIME_COL]).to_numpy()
        for axis, m in self.models.items():
            y = df[axis].to_numpy()
            resid = y - m.predict(t_sec)
            th = self.thresholds[axis]

            # Z-scores relative to training μ,σ
            z = (resid - th.resid_mu) / max(th.resid_sigma, 1e-9)
            run_pts = max(3, int(th.T_seconds/th.dt_s))

            # Alerts
            for i,j in contiguous_runs(z >= th.z_alert):
                if (j-i+1) >= run_pts:
                    z_mean = float(np.mean(z[i:j+1]))
                    p = 1.0 - 0.5*(1 + math.erf(abs(z_mean) / math.sqrt(2)))
                    self.events.append(Event(axis,"ALERT",t_sec[i],t_sec[j],
                                             (j-i+1)*th.dt_s,z_mean,p))
            # Errors
            for i,j in contiguous_runs(z >= th.z_error):
                if (j-i+1) >= run_pts:
                    z_mean = float(np.mean(z[i:j+1]))
                    p = 1.0 - 0.5*(1 + math.erf(abs(z_mean) / math.sqrt(2)))
                    self.events.append(Event(axis,"ERROR",t_sec[i],t_sec[j],
                                             (j-i+1)*th.dt_s,z_mean,p))

    def save(self):
        if not self.events: return None
        df = pd.DataFrame([e.__dict__ for e in self.events])
        path = os.path.join(self.outdir,"events.csv")
        df.to_csv(path,index=False)
        return path


🟦 Task 8 — End-to-End Driver

### End-to-End Pipeline Driver
Runs the full pipeline: loads data, trains models, generates or loads test data, streams data, detects anomalies, and generates verification plots.

In [159]:
def verify_on_synthetic(models, thresholds, df_verif, outdir="pm_outputs_local"):
    viz = Visualizer(outdir)
    t_sec = to_seconds(df_verif[TIME_COL]).to_numpy()

    for axis, m in models.items():
        y = df_verif[axis].to_numpy()
        yhat = m.predict(t_sec)
        resid = y - yhat

        th = thresholds[axis]

        # Save regression and residual plots for verification data
        viz.plot_regression(axis, t_sec, y, yhat, label="verify")
        viz.plot_residuals(axis, t_sec, resid, th, label="verify")


def run_end_to_end_local(
    training_table="robot_readings",
    training_limit=20000,
    outdir="pm_outputs_local",
    synthetic_rows=1000,
    stream_steps=500,
    test_csv_file=None
):
    db = DBHandler(DB_CONFIG)
    trainer = Trainer(db, outdir)
    
    # 1️⃣ Load & normalize training data
    df_train = trainer.load_training(training_table, training_limit, normalize=True)
    print(df_train.columns)
    if df_train.empty:
        raise RuntimeError("No training data in DB")

    # 2️⃣ Train models on normalized data
    trainer.fit(df_train)

    # 3️⃣ Prepare test data
    dt_s = median_dt_seconds(df_train[TIME_COL])
    if test_csv_file is not None:
        test_csv_path = os.path.join('./data', test_csv_file)
        df_test = pd.read_csv(test_csv_path)
        df_test_scaled, _ = trainer.transform(df_test)  # 🔑 normalize test set
        df_test_scaled[TIME_COL] = df_test[TIME_COL]    # keep timestamp
    else:
        gen = SyntheticDataGenerator(trainer.models, trainer.thresholds, outdir)
        df_test = gen.generate(df_train[TIME_COL].iloc[-1], synthetic_rows, dt_s)
        df_test_scaled, _ = trainer.transform(df_test)

    # 4️⃣ Stream & detect anomalies
    streamer = LocalStreamer(test_csv_path, delay_s=0.01)
    df_streamed = streamer.run(max_steps=stream_steps)
    df_streamed_scaled, _ = trainer.transform(df_streamed)

    detector = AnomalyDetector(trainer.models, trainer.thresholds, outdir)
    detector.detect(df_streamed_scaled)  # detection on normalized residuals
    ev_path = detector.save()
    print("✅ Done. Events saved to", ev_path)

    verify_on_synthetic(trainer.models, trainer.thresholds, df_streamed_scaled, outdir)
    print("✅ Verification plots generated")

🟦 Task 9 — Run

### Run the End-to-End Pipeline
This cell executes the full pipeline using the specified training table and test CSV file.

In [160]:
run_end_to_end_local(
    training_table="pm_dashboard",
    training_limit=36000,
    outdir="pm_outputs_local",
    stream_steps=36000,
    test_csv_file="synthetic_unnormalized_with_anomalies.csv"
)


  df = pd.read_sql(query, conn)


Index(['axis_1', 'axis_2', 'axis_3', 'axis_4', 'axis_5', 'axis_6', 'axis_7',
       'axis_8', 'Time', 't_sec'],
      dtype='object')




✅ Done. Events saved to pm_outputs_local\events.csv
✅ Verification plots generated
