# Norcia Baseline – Phase 3: Baseline Analysis

This notebook compares pre-event vs background windows, computes feature statistics, and derives a seismic score.

Inputs: `norcia_outputs/features.csv` from Phase 2.

Outputs: `norcia_outputs/feature_stats.csv`, `norcia_outputs/seismic_scores.csv`, `norcia_outputs/phase3_summary.md`.


In [None]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd

@dataclass
class ScoreConfig:
    method: str
    features: list[str]

def cohen_d(sample_a: np.ndarray, sample_b: np.ndarray) -> float:
    if sample_a.size == 0 or sample_b.size == 0:
        return np.nan
    mean_a = np.nanmean(sample_a)
    mean_b = np.nanmean(sample_b)
    var_a = np.nanvar(sample_a, ddof=1)
    var_b = np.nanvar(sample_b, ddof=1)
    pooled = np.sqrt(((sample_a.size - 1) * var_a + (sample_b.size - 1) * var_b) / (
        sample_a.size + sample_b.size - 2
    ))
    if pooled == 0 or not np.isfinite(pooled):
        return np.nan
    return float((mean_a - mean_b) / pooled)

def mann_whitney_pvalue(sample_a: np.ndarray, sample_b: np.ndarray) -> float:
    try:
        from scipy.stats import mannwhitneyu

        return float(
            mannwhitneyu(sample_a, sample_b, alternative="two-sided").pvalue
        )
    except Exception:
        return np.nan

def ttest_pvalue(sample_a: np.ndarray, sample_b: np.ndarray) -> float:
    try:
        from scipy.stats import ttest_ind

        return float(ttest_ind(sample_a, sample_b, equal_var=False).pvalue)
    except Exception:
        return np.nan

def auc_from_scores(labels: np.ndarray, scores: np.ndarray) -> float:
    labels = labels.astype(int)
    positive_scores = scores[labels == 1]
    negative_scores = scores[labels == 0]
    if positive_scores.size == 0 or negative_scores.size == 0:
        return np.nan
    combined = np.concatenate([positive_scores, negative_scores])
    ranks = pd.Series(combined).rank(method="average").to_numpy()
    rank_pos = np.sum(ranks[: len(positive_scores)])
    n_pos = len(positive_scores)
    n_neg = len(negative_scores)
    return float((rank_pos - n_pos * (n_pos + 1) / 2) / (n_pos * n_neg))

def score_windows(features: pd.DataFrame, config: ScoreConfig) -> pd.Series:
    score_features = [f for f in config.features if f in features.columns]
    background = features[features["window_type"] == "background"]
    if background.empty or not score_features:
        return pd.Series(np.nan, index=features.index)

    if config.method == "single":
        feature = score_features[0]
        mean = background[feature].mean()
        std = background[feature].std(ddof=0)
        if std == 0 or not np.isfinite(std):
            return features[feature] - mean
        return (features[feature] - mean) / std

    if config.method == "mahalanobis":
        matrix = background[score_features].to_numpy()
        center = np.nanmean(matrix, axis=0)
        matrix = np.nan_to_num(matrix)
        cov = np.cov(matrix, rowvar=False)
        cov += np.eye(cov.shape[0]) * 1e-6
        inv_cov = np.linalg.pinv(cov)
        values = features[score_features].to_numpy()
        values = np.nan_to_num(values)
        diffs = values - center
        distances = np.einsum("ij,jk,ik->i", diffs, inv_cov, diffs)
        return pd.Series(np.sqrt(distances), index=features.index)

    means = background[score_features].mean()
    stds = background[score_features].std(ddof=0).replace(0, np.nan)
    zscores = (features[score_features] - means) / stds
    return zscores.mean(axis=1, skipna=True)

def summarize_feature_stats(features: pd.DataFrame) -> pd.DataFrame:
    candidates = features.select_dtypes(include=[np.number]).columns
    candidates = [c for c in candidates if c not in {"hours_before_mainshock"}]
    pre = features[features["window_type"] == "pre_event"]
    background = features[features["window_type"] == "background"]

    rows = []
    for feature in candidates:
        sample_a = pre[feature].dropna().to_numpy()
        sample_b = background[feature].dropna().to_numpy()
        rows.append(
            {
                "feature": feature,
                "pre_mean": np.nanmean(sample_a) if sample_a.size else np.nan,
                "background_mean": np.nanmean(sample_b) if sample_b.size else np.nan,
                "pre_n": sample_a.size,
                "background_n": sample_b.size,
                "mann_whitney_p": mann_whitney_pvalue(sample_a, sample_b),
                "ttest_p": ttest_pvalue(sample_a, sample_b),
                "cohen_d": cohen_d(sample_a, sample_b),
            }
        )
    return pd.DataFrame(rows).sort_values("mann_whitney_p")

def write_summary(
    output_dir: Path,
    stats: pd.DataFrame,
    seismic_scores: pd.DataFrame,
    score_config: ScoreConfig,
) -> None:
    pre = seismic_scores[seismic_scores["window_type"] == "pre_event"]
    background = seismic_scores[seismic_scores["window_type"] == "background"]
    labels = np.concatenate(
        [np.ones(len(pre), dtype=int), np.zeros(len(background), dtype=int)]
    )
    scores = np.concatenate([pre["seismic_score"].to_numpy(), background["seismic_score"].to_numpy()])
    auc = auc_from_scores(labels, scores)

    lines = [
        "# Norcia Baseline Phase 3 Summary",
        "",
        f"Seismic score method: **{score_config.method}**",
        f"Score features: {', '.join(score_config.features)}",
        "",
        f"AUC (pre-event vs background): {auc:.3f}" if np.isfinite(auc) else "AUC: unavailable",
        "",
        "## Top Features by Mann-Whitney p-value",
    ]

    top = stats.head(10)
    for _, row in top.iterrows():
        lines.append(
            f"- {row['feature']}: p={row['mann_whitney_p']:.3e}, d={row['cohen_d']:.2f}"
        )

    summary_path = output_dir / "phase3_summary.md"
    summary_path.write_text("\n".join(lines))

features_path = Path("norcia_outputs/features.csv")
output_dir = Path("norcia_outputs")
output_dir.mkdir(parents=True, exist_ok=True)

features = pd.read_csv(features_path)

score_config = ScoreConfig(
    method="composite",
    features=["n_events", "cumulative_moment", "mean_pga", "mean_snr"],
)

seismic_score = score_windows(features, score_config)
scores = features[["window_id", "window_type"]].copy()
if "hours_before_mainshock" in features.columns:
    scores["hours_before_mainshock"] = features["hours_before_mainshock"]
scores["seismic_score"] = seismic_score
scores_path = output_dir / "seismic_scores.csv"
scores.to_csv(scores_path, index=False)

stats = summarize_feature_stats(features)
stats_path = output_dir / "feature_stats.csv"
stats.to_csv(stats_path, index=False)

write_summary(output_dir, stats, scores, score_config)

print(f"✓ Seismic scores written to {scores_path}")
print(f"✓ Feature stats written to {stats_path}")
print(f"✓ Summary written to {output_dir / 'phase3_summary.md'}")
