In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import seaborn as sns
import pymc as pm
import arviz as az
from scipy.stats import gaussian_kde, norm
import ipywidgets as widgets
from IPython.display import display, clear_output
import scipy.stats as st
from scipy.stats import gaussian_kde
from typing import Sequence, Tuple, Dict, Optional, Callable

print("Running with PyMC version:", pm.__version__)


In [None]:
# paths 
# here we have paths to folders where the logging histories of individual training runs are stored.

# for Unet and Swin Transformer models trained on MACS, we have two sets of logs:
unet_macs_loss_dir = r'N:\isipd\projects\p_planetdw\data\methods_test\logs\unet_ae_samples'
swin_macs_loss_dir = r'N:\isipd\projects\p_planetdw\data\methods_test\logs\swin_ae_samples'

# for Unet and Swin Transformer models trained on PS, we have two sets of logs:
unet_ps_loss_dir = r'N:\isipd\projects\p_planetdw\data\methods_test\logs\unet_ps_samples'
swin_ps_loss_dir = r'N:\isipd\projects\p_planetdw\data\methods_test\logs\swin_ps_samples'


# metrics to analyse
metrics = ['loss', 'accuracy', 'specificity', 'sensitivity', 'IoU', 'f1_score', 'Hausdorff_distance']

# which metrics are "higher is better"
maximize_metrics = {'accuracy', 'specificity', 'sensitivity', 'IoU', 'f1_score'}  # include accuracy too

# metrics that are bounded in (0,1) and should be modelled on logit scale
bounded_01_metrics = {'accuracy', 'specificity', 'sensitivity', 'IoU', 'f1_score'}

# priors for factorial effects (on transformed scale)
# - logit scale: Normal(0, 0.5) is a reasonable weakly-informative prior
# - log scale  : Normal(0, 0.2) corresponds to ~ +/- 22% multiplicative change (1 SD)
prior_sd_logit = 0.5
prior_sd_log = 0.2

# sampling config
DRAWS = 2000
TUNE = 2000
CHAINS = 4
TARGET_ACCEPT = 0.9
RANDOM_SEED = 42

# Savage–Dickey KDE bandwidth (can be adjusted)
SD_KDE_BW = 0.3

# overall ranking weights across metrics (explicit utility)
# leave as None for equal weights across metrics
metric_weights: Optional[Dict[str, float]] = None

# output dirs (optional; keep yours)
unet_macs_output_dir = unet_macs_loss_dir
swin_macs_output_dir = swin_macs_loss_dir


In [None]:
def read_metrics_as_array(directory, metrics):
    """
    Reads CSV files from a directory and extracts specified metrics into a 3D numpy array.
    Args:
        directory (str): Path to the directory containing CSV files.
        metrics (list): List of metric names to extract from the CSV files.
    Returns:
        tuple: A tuple containing:
            - data_array (np.ndarray): A 3D numpy array of shape (num_files, num_epochs, num_metrics).
            - lookup (dict): A dictionary mapping metric names to their indices in the data array.
            - files (list): List of file names processed.
    """

    files = sorted([f for f in os.listdir(directory) if f.endswith('.csv')])
    data_list = []
    
    metric_names = []
    for metric in metrics:
        metric_names.append(metric)
        metric_names.append('val_' + metric)

    for file in files:
        file_path = os.path.join(directory, file)
        df = pd.read_csv(file_path)
        file_data = []

        for name in metric_names:
            if name in df.columns:
                file_data.append(df[name].values)
            else:
                # Fill with NaNs if column is missing
                file_data.append(np.full(len(df), np.nan))
        
        # shape (epochs, metric_columns)
        file_data = np.stack(file_data, axis=1)
        data_list.append(file_data)

    # shape (files, epochs, metric_columns)
    data_array = np.stack(data_list, axis=0)

    # lookup metric name -> column index
    lookup = {name: idx for idx, name in enumerate(metric_names)}

    return data_array, lookup, files


# import arrays for each condition
unet_macs, unet_macs_metric_lookup, unet_macs_file_names = read_metrics_as_array(unet_macs_loss_dir, metrics)
swin_macs, swin_macs_metric_lookup, swin_macs_file_names = read_metrics_as_array(swin_macs_loss_dir, metrics)
unet_ps, unet_ps_metric_lookup, unet_ps_file_names = read_metrics_as_array(unet_ps_loss_dir, metrics)
swin_ps, swin_ps_metric_lookup, swin_ps_file_names = read_metrics_as_array(swin_ps_loss_dir, metrics)


In [None]:
# Scientific choice:
# We represent each training run by the metric value at the epoch where validation loss is minimal
# (i.e., "checkpoint selected by min val_loss"). This mirrors common early-stopping/model-selection practice.

def get_best_metric_at_min_val_loss(data_array, metric_lookup, metric):
    """
    For each run/file:
      - pick epoch where val_loss is minimal (fallback to loss if val_loss not present)
      - return the metric value at that epoch
    Returns:
        np.ndarray of shape (num_runs,)
    """

    # choose selection criterion
    if "val_loss" in metric_lookup:
        sel_loss_name = "val_loss"
    elif "loss" in metric_lookup:
        sel_loss_name = "loss"
    else:
        raise ValueError("Neither 'val_loss' nor 'loss' found in the metric lookup.")

    best_values = []
    for i in range(data_array.shape[0]):
        losses = data_array[i, :, metric_lookup[sel_loss_name]]
        values = data_array[i, :, metric_lookup[metric]]
        best_epoch = np.nanargmin(losses)
        best_values.append(values[best_epoch])

    return np.array(best_values)


In [None]:
# We model bounded metrics (IoU, F1, sensitivity, ...) on logit scale.
# We model positive metrics (loss, Hausdorff, ...) on log scale.
# This avoids Normal likelihood pathologies at boundaries and ensures positivity.

def base_metric_name(metric_col: str) -> str:
    return metric_col[4:] if metric_col.startswith("val_") else metric_col

def is_bounded_01(metric_col: str) -> bool:
    return base_metric_name(metric_col) in bounded_01_metrics

def higher_is_better(metric_col: str) -> bool:
    m = base_metric_name(metric_col)
    return m in maximize_metrics

def transform_y(y: np.ndarray, metric_col: str, eps: float = 1e-6) -> Tuple[np.ndarray, Callable[[np.ndarray], np.ndarray], str]:
    """
    Returns:
      y_transformed,
      inverse_transform,
      transform_name
    """
    if is_bounded_01(metric_col):
        y_clip = np.clip(y, eps, 1 - eps)
        y_t = np.log(y_clip / (1 - y_clip))         # logit
        inv = lambda z: 1 / (1 + np.exp(-z))
        return y_t, inv, "logit"
    else:
        y_clip = np.clip(y, eps, None)
        y_t = np.log(y_clip)                        # log
        inv = lambda z: np.exp(z)
        return y_t, inv, "log"


In [None]:
def rank_probabilities_from_draws(draws: np.ndarray, group_names, higher_better: bool) -> pd.DataFrame:
    """
    draws: (S, G) posterior draws of each group's performance (ORIGINAL scale preferred)
    higher_better: True if larger = better, else smaller = better
    """
    S, G = draws.shape
    score = draws if higher_better else -draws

    order = np.argsort(-score, axis=1)  # best..worst
    ranks = np.empty_like(order)
    for s in range(S):
        ranks[s, order[s]] = np.arange(1, G + 1)

    out = {"group": list(group_names)}
    for k in range(1, G + 1):
        out[f"Pr(rank={k})"] = [(ranks[:, j] == k).mean() for j in range(G)]
    out["E[rank]"] = [ranks[:, j].mean() for j in range(G)]
    out["Pr(best)"] = out["Pr(rank=1)"]

    return pd.DataFrame(out).sort_values("E[rank]").reset_index(drop=True)


In [None]:
# BF10 = prior_density(Δ=0) / posterior_density(Δ=0)
# Requires idata.prior draws for the effect parameter.

def _density_at_zero(draws_1d: np.ndarray, bw=SD_KDE_BW) -> float:
    draws_1d = np.asarray(draws_1d).ravel()
    kde = gaussian_kde(draws_1d, bw_method=bw)
    return float(kde.evaluate(0.0)[0])

def savage_dickey_bf10(idata: az.InferenceData, var_name: str, bw=SD_KDE_BW) -> float:
    """
    Savage–Dickey BF10 for H1: var != 0 vs H0: var = 0
    BF10 = p(var=0 | prior) / p(var=0 | posterior)
    """
    if not hasattr(idata, "prior"):
        raise ValueError(
            "idata has no prior group. Ensure you ran pm.sample_prior_predictive(var_names=[...]) "
            "and extended idata with it."
        )

    post = az.extract(idata, group="posterior", var_names=[var_name]).to_numpy().ravel()
    prior = az.extract(idata, group="prior", var_names=[var_name]).to_numpy().ravel()

    prior0 = _density_at_zero(prior, bw=bw)
    post0  = _density_at_zero(post,  bw=bw)

    return prior0 / post0


In [None]:
def fit_factorial_model_with_bf(
    df: pd.DataFrame,
    dataset_col: str,
    arch_col: str,
    metric_col: str,
    draws=DRAWS,
    tune=TUNE,
    chains=CHAINS,
    target_accept=TARGET_ACCEPT,
    seed=RANDOM_SEED,
    bw=SD_KDE_BW
):
    data = df[[dataset_col, arch_col, metric_col]].dropna().copy()

    dcat = pd.Categorical(data[dataset_col])
    acat = pd.Categorical(data[arch_col])

    if len(dcat.categories) != 2 or len(acat.categories) != 2:
        raise ValueError("Factorial model expects exactly 2 dataset levels and 2 architecture levels.")

    # 0/1 coding
    d = (dcat.codes == 1).astype(int)     # dataset(1) - dataset(0)
    a = (acat.codes == 1).astype(int)     # arch(1) - arch(0)
    da = d * a

    y_raw = data[metric_col].to_numpy()
    y_t, inv, tname = transform_y(y_raw, metric_col)

    # meaningful prior scale (on transformed scale)
    effect_sd = prior_sd_logit if tname == "logit" else prior_sd_log

    with pm.Model() as model:
        d_data = pm.Data("d", d)
        a_data = pm.Data("a", a)
        da_data = pm.Data("da", da)

        intercept = pm.Normal("intercept", 0.0, 1.5)

        beta_dataset = pm.Normal("beta_dataset", 0.0, effect_sd)
        beta_arch = pm.Normal("beta_arch", 0.0, effect_sd)
        beta_interaction = pm.Normal("beta_interaction", 0.0, effect_sd)

        # robust likelihood
        sigma = pm.HalfNormal("sigma", 1.0)
        nu = pm.Exponential("nu", 1/30) + 1

        mu = intercept + beta_dataset*d_data + beta_arch*a_data + beta_interaction*da_data

        pm.StudentT("y", nu=nu, mu=mu, sigma=sigma, observed=y_t)

        idata = pm.sample(
            draws=draws,
            tune=tune,
            chains=chains,
            target_accept=target_accept,
            random_seed=seed,
            return_inferencedata=True
        )

        # prior draws for Savage–Dickey (must include the effect vars)
        prior = pm.sample_prior_predictive(
            var_names=["beta_dataset", "beta_arch", "beta_interaction"],
            random_seed=seed
        )
        idata.extend(prior)

    # effect size + uncertainty + BF10 (on transformed scale)
    summ = az.summary(idata, var_names=["beta_dataset","beta_arch","beta_interaction"], hdi_prob=0.94).reset_index()
    summ = summ.rename(columns={"index":"param"})

    # directional probability
    for v in ["beta_dataset","beta_arch","beta_interaction"]:
        draws_v = az.extract(idata, group="posterior", var_names=[v]).to_numpy().ravel()
        summ.loc[summ["param"] == v, "Pr(>0)"] = (draws_v > 0).mean()
        summ.loc[summ["param"] == v, "BF10"] = savage_dickey_bf10(idata, v, bw=bw)

    # label effects with category names
    d0, d1 = list(dcat.categories)
    a0, a1 = list(acat.categories)
    label_map = {
        "beta_dataset": f"dataset effect ({d1} - {d0})",
        "beta_arch": f"architecture effect ({a1} - {a0})",
        "beta_interaction": "interaction (difference-in-differences)"
    }
    summ["effect"] = summ["param"].map(label_map)

    report = summ.loc[:, ["effect","mean","sd","hdi_3%","hdi_97%","Pr(>0)","BF10"]].copy()
    report["scale"] = tname
    report["prior"] = f"Normal(0, {effect_sd}) on {tname} scale"

    return idata, report


In [None]:

# We model on transformed scale, but report group means on ORIGINAL scale.

def fit_group_model_with_ranking(
    df: pd.DataFrame,
    group_col: str,
    metric_col: str,
    draws=DRAWS,
    tune=TUNE,
    chains=CHAINS,
    target_accept=TARGET_ACCEPT,
    seed=RANDOM_SEED
):
    data = df[[group_col, metric_col]].dropna().copy()
    gcat = pd.Categorical(data[group_col])
    g_idx = gcat.codes
    groups = list(gcat.categories)
    G = len(groups)

    y_raw = data[metric_col].to_numpy()
    y_t, inv, tname = transform_y(y_raw, metric_col)

    coords = {"group": groups}

    with pm.Model(coords=coords) as model:
        g = pm.Data("g", g_idx)
        y_obs = pm.Data("y_obs", y_t)

        # hierarchical priors (transformed scale)
        mu0 = pm.Normal("mu0", 0.0, 1.5)
        tau = pm.HalfNormal("tau", 1.0)

        mu = pm.Normal("mu", mu0, tau, dims="group")
        sigma = pm.HalfNormal("sigma", 1.0, dims="group")

        nu = pm.Exponential("nu", 1/30) + 1

        pm.StudentT("y", nu=nu, mu=mu[g], sigma=sigma[g], observed=y_obs)

        # deterministic group means on original scale
        mu_orig = pm.Deterministic("mu_orig", inv(mu), dims="group")

        idata = pm.sample(
            draws=draws,
            tune=tune,
            chains=chains,
            target_accept=target_accept,
            random_seed=seed,
            return_inferencedata=True
        )

    # posterior draws for ranking (original scale)
    mu_draws = (
        idata.posterior["mu_orig"]
        .stack(sample=("chain","draw"))
        .transpose("sample","group")
        .values
    )

    hib = higher_is_better(metric_col)

    rank_table = rank_probabilities_from_draws(mu_draws, groups, higher_better=hib)

    # summary table (original scale)
    summ = az.summary(idata, var_names=["mu_orig"], hdi_prob=0.94).reset_index()
    summ = summ.rename(columns={"index":"param"})
    summ["group_idx"] = summ["param"].str.extract(r"mu_orig\[(\d+)\]")[0].astype(int)
    summ["group"] = summ["group_idx"].apply(lambda i: groups[i])

    perf_table = (
        summ.loc[:, ["group","mean","sd","hdi_3%","hdi_97%"]]
        .merge(rank_table, on="group", how="left")
        .sort_values("E[rank]")
        .reset_index(drop=True)
    )

    return idata, perf_table, rank_table, mu_draws, tname


In [None]:
# We define a composite score:
#  - for each metric, ensure "higher is better" by sign-flip if necessary
#  - standardise per metric so scales don't dominate
#  - weighted average across metrics (equal weights by default)
#
# Then compute posterior rank probabilities from composite draws.

def overall_ranking_across_metrics(
    mu_draws_by_metric: Dict[str, np.ndarray],
    higher_is_better_by_metric: Dict[str, bool],
    group_names: Sequence[str],
    weights: Optional[Dict[str, float]] = None,
):
    metric_list = list(mu_draws_by_metric.keys())

    if weights is None:
        weights = {m: 1.0 for m in metric_list}
    wsum = sum(weights.values())
    weights = {m: weights[m] / wsum for m in metric_list}

    # align draws
    S = min(mu_draws_by_metric[m].shape[0] for m in metric_list)
    G = len(group_names)

    composite = np.zeros((S, G))
    for m in metric_list:
        mu = mu_draws_by_metric[m][:S, :]
        score = mu if higher_is_better_by_metric[m] else -mu

        # standardize to avoid scale dominance
        sd = score.std() + 1e-12
        z = score / sd

        composite += weights[m] * z

    overall_rank_table = rank_probabilities_from_draws(composite, group_names, higher_better=True)

    return composite, overall_rank_table, weights


In [None]:

# For each metric:
#  1) extract per-run "best checkpoint" values for 4 combinations
#  2) build dataframe with factors (group/dataset/arch)
#  3) plot distribution
#  4) fit:
#     - hierarchical group model (ranking)
#     - factorial model (effects + Savage–Dickey BF10)
#  5) store posterior draws for overall multi-metric ranking

tabs = []
tab_titles = []

# store posterior group-mean draws per metric for overall ranking
mu_draws_by_metric: Dict[str, np.ndarray] = {}
hib_by_metric: Dict[str, bool] = {}
group_order = ["U-Net | Aerial", "Swin U-Net | Aerial", "U-Net | PS", "Swin U-Net | PS"]  # consistent order

for metric in metrics:
    full_metric = 'val_' + metric if not metric.startswith('val_') else metric
    out = widgets.Output()

    with out:
        clear_output(wait=True)
        print(f'Processing: {full_metric} at point of best loss\n')

        # per-run values at best checkpoint
        unet_macs_best = get_best_metric_at_min_val_loss(unet_macs, unet_macs_metric_lookup, full_metric)
        swin_macs_best = get_best_metric_at_min_val_loss(swin_macs, swin_macs_metric_lookup, full_metric)
        unet_ps_best   = get_best_metric_at_min_val_loss(unet_ps,   unet_ps_metric_lookup,   full_metric)
        swin_ps_best   = get_best_metric_at_min_val_loss(swin_ps,   swin_ps_metric_lookup,   full_metric)

        # long dataframe with factorial coding
        data = pd.concat([
            pd.DataFrame({full_metric: unet_macs_best, "group": "U-Net | Aerial",      "dataset": "Aerial", "arch": "U-Net"}),
            pd.DataFrame({full_metric: swin_macs_best, "group": "Swin U-Net | Aerial", "dataset": "Aerial", "arch": "Swin"}),
            pd.DataFrame({full_metric: unet_ps_best,   "group": "U-Net | PS",          "dataset": "PS",     "arch": "U-Net"}),
            pd.DataFrame({full_metric: swin_ps_best,   "group": "Swin U-Net | PS",     "dataset": "PS",     "arch": "Swin"}),
        ], ignore_index=True)

        # enforce group order
        data["group"] = pd.Categorical(data["group"], categories=group_order, ordered=True)

        # visualize run-to-run distribution
        sns.kdeplot(data=data, x=full_metric, hue="group")
        plt.title(f"Distribution across runs: {full_metric}")
        plt.show()

        # 1) hierarchical group model -> rank probabilities
        idata_g, perf_table, rank_table, mu_draws, scale_name = fit_group_model_with_ranking(
            data, group_col="group", metric_col=full_metric
        )

        print("\n--- Hierarchical group model (group means on original scale) ---")
        print(f"Modelled on {scale_name} scale; reporting mu_orig on original scale.\n")
        display(perf_table)

        # store for overall ranking
        mu_draws_by_metric[full_metric] = mu_draws
        hib_by_metric[full_metric] = higher_is_better(full_metric)

        # 2) factorial model -> effect size + uncertainty + BF10 (Savage–Dickey)
        idata_f, factorial_report = fit_factorial_model_with_bf(
            data, dataset_col="dataset", arch_col="arch", metric_col=full_metric
        )

        print("\n--- Factorial effects (transformed scale) + Savage–Dickey BF10 ---")
        display(factorial_report)

    tabs.append(out)
    tab_titles.append(full_metric)

tab_widget = widgets.Tab(children=tabs)
for i, title in enumerate(tab_titles):
    tab_widget.set_title(i, title)

display(tab_widget)


In [None]:
# This produces a single posterior ranking of the 4 combinations "overall".
# IMPORTANT: "overall" depends on the utility you define (weights + standardization).
# Default: equal weights.

composite_draws, overall_rank_table, used_weights = overall_ranking_across_metrics(
    mu_draws_by_metric=mu_draws_by_metric,
    higher_is_better_by_metric=hib_by_metric,
    group_names=group_order,
    weights=metric_weights
)

print("\n===============================")
print("OVERALL MULTI-METRIC RANKING")
print("===============================\n")

display(overall_rank_table)

print("\nWeights used:")
print(used_weights)

print("\nInterpretation:")
print("- Pr(rank=1) is the posterior probability a combination is best overall under this utility.")
print("- E[rank] is the expected rank (lower = better).")
print("- If Pr(rank=1) is not dominant and ranks are spread, the 'overall winner' is uncertain.")


In [None]:

def interpret_bf10(bf10: float) -> str:
    """
    Common descriptive categories (Jeffreys-style heuristics).
    """
    if bf10 < 1/10:
        return "strong evidence for H0 (no effect)"
    if bf10 < 1/3:
        return "moderate evidence for H0"
    if bf10 < 1:
        return "anecdotal evidence for H0"
    if bf10 < 3:
        return "anecdotal evidence for H1 (effect)"
    if bf10 < 10:
        return "moderate evidence for H1"
    if bf10 < 30:
        return "strong evidence for H1"
    if bf10 < 100:
        return "very strong evidence for H1"
    return "extreme evidence for H1"


# Example usage:
# interpret_bf10(12.5)
