In [1]:
# Imports and Setup
import os
import sys
import json
import numpy as np
from pathlib import Path
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM

sys.path.insert(0, str(Path("..") / "src"))

from utils import config
from utils.io import load_cleaned, load_method_ready, results_dir, append_csv_row, save_json
from utils.evaluation import compute_binary_metrics
from utils.plotting import plot_signal, plot_scores 

In [2]:
# Helper: Window and Mapping

def build_windows(series: np.ndarray, win_size: int, stride: int) -> np.ndarray:
    """Sliding windows (n_windows, win_size)."""
    series = np.asarray(series).reshape(-1)
    w = np.lib.stride_tricks.sliding_window_view(series, win_size)
    return w[::stride]


def threshold_from_train(train_scores: np.ndarray, q: float) -> float:
    """Quantile threshold learned from training scores."""
    return float(np.quantile(train_scores, q))


def windows_pred_to_point_pred(
    win_starts_local: np.ndarray,
    win_size: int,
    pred_win: np.ndarray,
    n_points: int,
) -> np.ndarray:
    """OR-expand window predictions to point predictions."""
    diff = np.zeros(n_points + 1, dtype=np.int32)

    for s, p in zip(win_starts_local, pred_win):
        if p == 0:
            continue
        s = int(s)
        e = min(n_points, s + win_size)
        if 0 <= s < n_points:
            diff[s] += 1
            diff[e] -= 1

    return (np.cumsum(diff[:-1]) > 0).astype(int)


def windows_scores_to_point_scores_max(
    win_starts_local: np.ndarray,
    win_size: int,
    scores_win: np.ndarray,
    n_points: int,
) -> np.ndarray:
    """
    Make a continuous point score curve from window scores.
    For each point i, score = max score of windows covering i.
    """
    # Assumes starts are monotonic
    start_scores = np.full(n_points, np.nan, dtype=float)
    for s, sc in zip(win_starts_local, scores_win):
        s = int(s)
        if 0 <= s < n_points:
            start_scores[s] = sc

    # Sliding max over window starts for coverage
    from collections import deque
    dq = deque()  # stores indices, scores are decreasing

    out = np.full(n_points, np.nan, dtype=float)
    m = n_points  # start_scores length

    for i in range(n_points):
        j = i  # add start at j=i if exists
        if j < m and not np.isnan(start_scores[j]):
            while dq and start_scores[dq[-1]] <= start_scores[j]:
                dq.pop()
            dq.append(j)

        # Remove starts that no longer cover point i
        left = i - win_size + 1
        while dq and dq[0] < max(0, left):
            dq.popleft()

        out[i] = start_scores[dq[0]] if dq else np.nan

    return out


def fit_score_iforest(X_train: np.ndarray, X_test: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Isolation Forest scores: higher = more anomalous."""
    model = IsolationForest(**config.IFOREST_PARAMS)
    model.fit(X_train)
    train_scores = -model.decision_function(X_train)
    test_scores = -model.decision_function(X_test)
    return train_scores, test_scores


def fit_score_ocsvm(X_train: np.ndarray, X_test: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """One-Class SVM scores: higher = more anomalous."""
    model = OneClassSVM(**config.OCSVM_PARAMS)
    model.fit(X_train)
    train_scores = -model.decision_function(X_train)
    test_scores = -model.decision_function(X_test)
    return train_scores, test_scores

In [3]:
# Main Execution Loop

base_out = results_dir("shallow_learning")
csv_path = base_out / "shallow_learning_results.csv"
if csv_path.exists():
    os.remove(csv_path)

win_size = config.WINDOW_SIZE
stride = config.STRIDE
q = config.BASELINE_THR_QUANTILE
margin = config.PLOT_ZOOM_MARGIN

rng = np.random.default_rng(config.RANDOM_SEED)
max_train = config.OCSVM_MAX_TRAIN_WINDOWS

for dataset_name in config.DATASETS:
    # Load point labels + split
    _, labels, meta = load_cleaned(dataset_name)
    train_end = int(meta["train_end"])
    y_test = labels[train_end:]

    # Load method-ready
    mr = load_method_ready(dataset_name)
    train_raw = mr["train_raw"]
    test_raw = mr["test_raw"]
    train_robust = mr["train_robust"]
    test_robust = mr["test_robust"]

    test_win_labels = mr["test_win_labels"]
    test_win_starts = mr["test_win_starts"]

    # Window starts in test coordinates
    win_starts_local = test_win_starts - train_end

    # Build window features 
    X_train_all = build_windows(train_robust, win_size, stride)
    X_test = build_windows(test_robust, win_size, stride)

    # Subsample training windows 
    if len(X_train_all) > max_train:
        idx = rng.choice(len(X_train_all), size=max_train, replace=False)
        X_train = X_train_all[idx]
    else:
        X_train = X_train_all

    # Output folder
    out_dir = results_dir("shallow_learning", dataset_name)

    # One overview plot per dataset
    plot_signal(
        test_raw,
        true_labels=y_test,
        title=f"{dataset_name} - Test (overview)",
        save_path=out_dir / "overview_signal.png",
        max_points=5000,
    )

    # Zoom window (test coords)
    a0 = int(meta["anomaly_start"]) - train_end
    a1 = int(meta["anomaly_end"]) - train_end
    z0 = max(0, a0 - margin)
    z1 = min(len(test_raw), a1 + margin)

    # ---- Isolation Forest ----
    tr_scores_win, te_scores_win = fit_score_iforest(X_train, X_test)
    thr = threshold_from_train(tr_scores_win, q)

    pred_win = (te_scores_win >= thr).astype(int)

    pred_point = windows_pred_to_point_pred(win_starts_local, win_size, pred_win, len(test_raw))
    score_point = windows_scores_to_point_scores_max(win_starts_local, win_size, te_scores_win, len(test_raw))

    metrics_point = compute_binary_metrics(y_test, pred_point)
    metrics_win = compute_binary_metrics(test_win_labels, pred_win)

    row = {"dataset": dataset_name, "method": "iforest", "threshold": thr, "q": q, **metrics_point}
    append_csv_row(csv_path, row)

    save_json(
        out_dir / "iforest_metrics.json",
        {
            **row,
            "window_metrics": metrics_win,
            "n_train_windows_fit": int(len(X_train)),
            "n_test_windows": int(len(X_test)),
        },
    )
    np.save(out_dir / "iforest_pred.npy", pred_point)
    np.save(out_dir / "iforest_scores.npy", score_point)

    plot_signal(
        test_raw[z0:z1], y_test[z0:z1], pred_point[z0:z1],
        title=f"{dataset_name} - IForest (zoom)",
        save_path=out_dir / "iforest_signal_zoom.png",
        x_offset=z0,
    )
    plot_scores(
        score_point[z0:z1],
        threshold=thr,
        true_labels=y_test[z0:z1],
        title=f"{dataset_name} - IForest scores (zoom)",
        save_path=out_dir / "iforest_scores_zoom.png",
        x_offset=z0,
    )

    # ---- One-Class SVM ----
    tr_scores_win, te_scores_win = fit_score_ocsvm(X_train, X_test)
    thr = threshold_from_train(tr_scores_win, q)

    pred_win = (te_scores_win >= thr).astype(int)

    pred_point = windows_pred_to_point_pred(win_starts_local, win_size, pred_win, len(test_raw))
    score_point = windows_scores_to_point_scores_max(win_starts_local, win_size, te_scores_win, len(test_raw))

    metrics_point = compute_binary_metrics(y_test, pred_point)
    metrics_win = compute_binary_metrics(test_win_labels, pred_win)

    row = {"dataset": dataset_name, "method": "ocsvm", "threshold": thr, "q": q, **metrics_point}
    append_csv_row(csv_path, row)

    save_json(
        out_dir / "ocsvm_metrics.json",
        {
            **row,
            "window_metrics": metrics_win,
            "n_train_windows_fit": int(len(X_train)),
            "n_test_windows": int(len(X_test)),
        },
    )
    np.save(out_dir / "ocsvm_pred.npy", pred_point)
    np.save(out_dir / "ocsvm_scores.npy", score_point)

    plot_signal(
        test_raw[z0:z1], y_test[z0:z1], pred_point[z0:z1],
        title=f"{dataset_name} - OCSVM (zoom)",
        save_path=out_dir / "ocsvm_signal_zoom.png",
        x_offset=z0,
    )
    plot_scores(
        score_point[z0:z1],
        threshold=thr,
        true_labels=y_test[z0:z1],
        title=f"{dataset_name} - OCSVM scores (zoom)",
        save_path=out_dir / "ocsvm_scores_zoom.png",
        x_offset=z0,
    )