In [1]:
from __future__ import annotations


from pathlib import Path
from typing import Optional, Tuple
import numpy as np
import joblib
import matplotlib.pyplot as plt

In [2]:
ROOT_DIR = Path("~/Uni-stuff/semester-2/applied_Ml/reef_zmsc").expanduser().resolve()
STAGE1_REL = Path("data/autolabeling_fixed/models/classifier.joblib")
STAGE2_REL = Path("data/two_stage_detector/models/two_stage_pipeline.joblib")
FEATURES_PATH = Path("/path/to/X.npy").expanduser().resolve() # or .npz with key 'X'
OUTDIR = Path("evaluation/consistency_reports").resolve()
STRENGTHS = (0.0, 0.05, 0.1, 0.2, 0.3) # 0.0 = none, 0.3 = moderate perturbation
SEED = 42
MAX_SAMPLES: Optional[int] = 5000 # set to None to use all rows


# Derived paths
STAGE1_PATH = (ROOT_DIR / STAGE1_REL).resolve()
STAGE2_PATH = (ROOT_DIR / STAGE2_REL).resolve()

In [5]:
def load_features(path: Path) -> np.ndarray:
    if not path.exists():
        raise FileNotFoundError(f"Features file not found: {path}")
    if path.suffix == ".npy":
        X = np.load(path)
    elif path.suffix == ".npz":
        data = np.load(path)
        for key in ("X", "features", "arr_0"):
            if key in data:
                X = data[key]
                break
        else:
            raise KeyError(".npz missing one of keys: 'X', 'features', 'arr_0'")
    else:
        raise ValueError("Features must be a .npy or .npz file")
    if X.ndim != 2:
        raise ValueError(f"Expected 2D feature array [N, D], got {X.shape}")
    return X.astype(np.float32, copy=False)

def safe_softmax(logits: np.ndarray) -> np.ndarray:
    if logits.ndim != 2:
        raise ValueError("logits must be [N, C]")
    # If already probabilities (sum≈1, in [0,1]) just return
    if np.all(logits >= -1e-6) and np.all(logits <= 1 + 1e-6):
        row_sums = logits.sum(axis=1, keepdims=True)
        if np.allclose(row_sums, 1.0, atol=1e-3):
            return logits
    x = logits - logits.max(axis=1, keepdims=True)
    e = np.exp(x)
    return e / (e.sum(axis=1, keepdims=True) + 1e-12)

In [6]:
def feature_augmentations(
    X: np.ndarray, strength: float, rng: np.random.Generator
) -> np.ndarray:
    """Small, benign perturbations in feature space.
    - multiplicative jitter ~ N(1, sigma)
    - additive noise ~ N(0, sigma)
    - low-rank tilt on a random subset of dims
    """
    X = X.astype(np.float32, copy=False)
    N, D = X.shape
    sigma = 0.05 * strength

    # multiplicative jitter
    mul = rng.normal(1.0, sigma, size=(N, D)).astype(np.float32)
    Y = X * mul

    # additive noise
    add = rng.normal(0.0, sigma, size=(N, D)).astype(np.float32)
    Y = Y + add

    # low-rank tilt
    k = max(1, int(D * min(0.1, 0.4 * strength)))
    if k > 0:
        idx = rng.choice(D, size=k, replace=False)
        tilt = rng.normal(0.0, sigma, size=(N, k)).astype(np.float32)
        Y[:, idx] += tilt

    return Y


def predict_with_model(model, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """Return (y_pred, p_max). Supports sklearn-style API.
    Falls back to decision_function -> softmax when needed.
    """
    if hasattr(model, "predict_proba"):
        probs = model.predict_proba(X)
    elif hasattr(model, "decision_function"):
        scores = model.decision_function(X)
        if scores.ndim == 1:
            scores = np.stack([-scores, scores], axis=1)
        probs = safe_softmax(scores)
    else:
        y = model.predict(X)
        return y.astype(int), np.full(X.shape[0], 0.5, dtype=np.float32)

    y = np.argmax(probs, axis=1).astype(int)
    pmax = probs.max(axis=1).astype(np.float32)
    return y, pmax

In [7]:
def evaluate_consistency(
    model,
    X: np.ndarray,
    strengths=STRENGTHS,
    seed: int = SEED,
    max_samples: Optional[int] = MAX_SAMPLES,
    title_prefix: str = "Stage1",
    outdir: Path | None = OUTDIR,
):
    rng = np.random.default_rng(seed)
    if max_samples is not None and X.shape[0] > max_samples:
        sel = rng.choice(X.shape[0], size=max_samples, replace=False)
        X = X[sel]

    # Base predictions
    y0, p0 = predict_with_model(model, X)

    consistencies = []
    all_strengths = []

    for s in strengths:
        X_aug = feature_augmentations(X, strength=s, rng=rng)
        y1, _ = predict_with_model(model, X_aug)
        agree = (y0 == y1)
        cons = float(agree.mean())
        consistencies.append(cons)
        all_strengths.append(s)
        print(f"{title_prefix} | strength={s:.2f} -> consistency={cons:.4f}")

    # Plot consistency curve
    OUTDIR.mkdir(parents=True, exist_ok=True)
    plt.figure(figsize=(7, 5))
    plt.plot(all_strengths, consistencies, marker="o")
    plt.xlabel("Augmentation strength (feature space)")
    plt.ylabel("Consistency (agreement rate)")
    plt.title(f"{title_prefix}: Consistency vs Aug Strength")
    plt.grid(True, alpha=0.3)
    curve_path = OUTDIR / f"{title_prefix.lower()}_consistency_curve.png"
    plt.savefig(curve_path, dpi=160, bbox_inches="tight")
    print(f"Saved: {curve_path}")

    # Confidence histogram at a mid strength
    mid_idx = min(len(strengths) - 1, 2)  # pick a reasonable mid point
    s_mid = strengths[mid_idx]
    X_mid = feature_augmentations(X, strength=s_mid, rng=rng)
    y_mid, _ = predict_with_model(model, X_mid)
    agree_mid = (y0 == y_mid)

    plt.figure(figsize=(7, 5))
    plt.hist(p0[agree_mid], bins=30, alpha=0.7, label="consistent")
    plt.hist(p0[~agree_mid], bins=30, alpha=0.7, label="inconsistent")
    plt.xlabel("Base prediction confidence (max prob)")
    plt.ylabel("Count")
    plt.title(f"{title_prefix}: Confidence by Consistency (s={s_mid:.2f})")
    plt.legend()
    plt.grid(True, alpha=0.3)
    hist_path = OUTDIR / f"{title_prefix.lower()}_confidence_hist.png"
    plt.savefig(hist_path, dpi=160, bbox_inches="tight")
    print(f"Saved: {hist_path}")

In [None]:
if __name__ == "__main__":
    print("Stage 1 model:", STAGE1_PATH)
    print("Two-Stage model:", STAGE2_PATH)
    print("Features:", FEATURES_PATH)
    print("Outdir:", OUTDIR)

    # Load models
    print("Loading Stage 1 model...")
    stage1 = joblib.load(STAGE1_PATH)

    stage2 = None
    try:
        print("Loading Two-Stage pipeline...")
        stage2 = joblib.load(STAGE2_PATH)
    except Exception as e:
        print("[WARN] Could not load Two-Stage pipeline:", e)

    # Load features
    print("Loading features...")
    X = load_features(FEATURES_PATH)
    print("Features shape:", X.shape)

    # Evaluate Stage 1
    print("=== Evaluating Stage 1 ===")
    evaluate_consistency(
        model=stage1,
        X=X,
        strengths=STRENGTHS,
        seed=SEED,
        max_samples=MAX_SAMPLES,
        title_prefix="Stage1",
        outdir=OUTDIR,
    )

    # Evaluate Two-Stage (optional)
    if stage2 is not None:
        print("=== Evaluating Two-Stage ===")
        try:
            evaluate_consistency(
                model=stage2,
                X=X,
                strengths=STRENGTHS,
                seed=SEED,
                max_samples=MAX_SAMPLES,
                title_prefix="TwoStage",
                outdir=OUTDIR,
            )
        except Exception as e:
            print("[WARN] Skipping Two-Stage evaluation:", e)

