In [1]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import List
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns

seed=42


In [2]:
def calculate_mz(values: np.ndarray) -> np.ndarray:
    """
    Compute Median Z-scores (robust z-score using MAD) for anomaly detection.

    Parameters
    ----------
    values : np.ndarray
        1D array of input values (scaled or raw).

    Returns
    -------
    np.ndarray
        Array of robust z-scores for each value.
    """
    median = np.median(values)
    mid_diff = np.abs(values - median)
    epsilon = 1e-9  # for stability
    mad = np.median(mid_diff)
    sigma_mad = 1.486 * mad + epsilon
    return mid_diff / sigma_mad

In [3]:
def evaluate_all_buildings_mz(
    all_files: List[str],
    path: str,
    anomaly_type: int,
    z_thresholds: List[float] = None
) -> pd.DataFrame:
    """
    Evaluate Median Z-score thresholding for anomaly detection per building.

    Parameters
    ----------
    all_files : list of str
        List of CSV file names (modified building data).
    path : str
        Path to directory containing CSVs.
    anomaly_type : int
        Anomaly label to evaluate.
    z_thresholds : list of float, optional
        List of Z-score thresholds to sweep for best F1. Defaults from 0.5 to 4.0.

    Returns
    -------
    pd.DataFrame
        Summary DataFrame with building file name, scores and best threshold.
    """
    if z_thresholds is None:
        z_thresholds = [i / 10 for i in range(5, 45, 5)]  # 0.5 to 4.0

    records = []

    for file in tqdm(all_files, desc=f"Evaluating MZ for Anomaly Type {anomaly_type}"):
        df = pd.read_csv(os.path.join(path, file))
        y_true = np.where(df["labels"].values == anomaly_type, 1, 0)
        X = df["modified"].values.reshape(-1, 1)
        X_scaled = StandardScaler().fit_transform(X)

        z_scores = calculate_mz(X_scaled).reshape(-1)

        best_f1, best_prec, best_rec, best_thresh = 0, 0, 0, None

        for thresh in z_thresholds:
            y_pred = (z_scores > thresh).astype(int)
            f1 = f1_score(y_true, y_pred, zero_division=0)
            if f1 > best_f1:
                best_f1 = f1
                best_prec = precision_score(y_true, y_pred, zero_division=0)
                best_rec = recall_score(y_true, y_pred, zero_division=0)
                best_thresh = thresh

        records.append({
            "building_file": file,
            "f1_score": best_f1,
            "precision": best_prec,
            "recall": best_rec,
            "best_threshold": best_thresh
        })

    return pd.DataFrame(records)

In [4]:
def run_mz_evaluation_for_all_anomaly_types(
    base_path: str,
    anomaly_types: List[int] = [2, 3, 4],
    output_dir: str = "mz_scores"
) -> None:
    """
    Run Median Z-score evaluation for all anomaly types and save summaries.

    Parameters
    ----------
    base_path : str
        Directory containing folders like TYPE2/, TYPE3/, etc.
    anomaly_types : list of int
        Anomaly types to evaluate.
    output_dir : str
        Where to save the evaluation result CSVs.
    """
    os.makedirs(output_dir, exist_ok=True)

    for anom_type in anomaly_types:
        print(f"\n📏 Running MZ-score evaluation for anomaly type {anom_type}")
        path = os.path.join(base_path, f"TYPE{anom_type}")
        files = [f for f in os.listdir(path) if f.endswith(".csv")]

        summary = evaluate_all_buildings_mz(
            all_files=files,
            path=path,
            anomaly_type=anom_type
        )

        out_path = os.path.join(output_dir, f"mz_type{anom_type}_scores.csv")
        summary.to_csv(out_path, index=False)
        print(f"✅ Saved results to {out_path}")

In [5]:
# Base path to folders like TYPE2/, TYPE3/, etc.
base_injected_path = "/data1/home/nitinvetcha/Ashwin_KM_Code/tsfm_learning/Anomaly_Injection_IISc/injection_code/src/OUTPUT_BDG2/MODIFIED"

# Run mz evaluation for all anomaly types
run_mz_evaluation_for_all_anomaly_types(
    base_path=base_injected_path,
    anomaly_types=[2, 3, 4],
    output_dir="bdg2_mz_scores"
)


📏 Running MZ-score evaluation for anomaly type 2


Evaluating MZ for Anomaly Type 2: 100%|██████████| 1553/1553 [02:01<00:00, 12.74it/s]


✅ Saved results to bdg2_mz_scores/mz_type2_scores.csv

📏 Running MZ-score evaluation for anomaly type 3


Evaluating MZ for Anomaly Type 3: 100%|██████████| 1553/1553 [02:28<00:00, 10.45it/s]


✅ Saved results to bdg2_mz_scores/mz_type3_scores.csv

📏 Running MZ-score evaluation for anomaly type 4


Evaluating MZ for Anomaly Type 4: 100%|██████████| 1553/1553 [02:17<00:00, 11.30it/s]

✅ Saved results to bdg2_mz_scores/mz_type4_scores.csv





In [6]:
def plot_and_save_score_boxplots(
    score_dir: str,
    output_dir: str = "plots",
    anomaly_types: list = [2, 3, 4],
    prefix: str = "isoforest"
) -> None:
    """
    Generate and save boxplots of F1, Precision, and Recall for each anomaly type.

    Parameters
    ----------
    score_dir : str
        Directory where the per-anomaly-type score CSVs are stored.
    output_dir : str, default "plots"
        Directory where plots will be saved.
    anomaly_types : list of int, default [2, 3, 4]
        List of anomaly types to plot.
    prefix : str, default "isoforest"
        Prefix used in the CSV filenames, e.g., "isoforest_type2_scores.csv".

    Returns
    -------
    None
    """
    os.makedirs(output_dir, exist_ok=True)

    for anomaly_type in anomaly_types:
        print(f"\n📊 Plotting results for anomaly type {anomaly_type}...")

        score_path = os.path.join(score_dir, f"{prefix}_type{anomaly_type}_scores.csv")
        if not os.path.exists(score_path):
            print(f"⚠️ File not found: {score_path}")
            continue

        df = pd.read_csv(score_path)

        if df.empty or df[["f1_score", "precision", "recall"]].isnull().all().all():
            print(f"⚠️ No valid score data for anomaly type {anomaly_type}. Skipping...")
            continue

        # Prepare data
        data = pd.DataFrame({
            "F1 Score": df["f1_score"],
            "Precision": df["precision"],
            "Recall": df["recall"]
        })
        data_melted = data.melt(var_name="Metric", value_name="Score")

        # Plot
        plt.figure(figsize=(10, 6))
        sns.boxplot(data=data_melted, x="Metric", y="Score", palette="Set2")
        plt.title(f"Distribution of F1, Precision, and Recall — Anomaly Type {anomaly_type}")
        plt.grid(True, linestyle='--', alpha=0.6)
        plt.tight_layout()

        out_path = os.path.join(output_dir, f"{prefix}_boxplot_type{anomaly_type}.png")
        plt.savefig(out_path)
        plt.close()

        print(f"✅ Saved boxplot to {out_path}")

In [7]:
# Plot and save boxplots for all anomaly types
plot_and_save_score_boxplots(
    score_dir="bdg2_mz_scores",
    output_dir="bdg2_mz_plots",
    anomaly_types=[2, 3, 4],
    prefix="mz"
)


📊 Plotting results for anomaly type 2...
✅ Saved boxplot to bdg2_mz_plots/mz_boxplot_type2.png

📊 Plotting results for anomaly type 3...
✅ Saved boxplot to bdg2_mz_plots/mz_boxplot_type3.png

📊 Plotting results for anomaly type 4...
✅ Saved boxplot to bdg2_mz_plots/mz_boxplot_type4.png
