- `conda activate mri`
  - (created in `0_setup.ipynb`)

---

- `jupyter lab` => open this file

---

- Selected Jupyter kernel (`ms_classification`)
  - (created in `0_setup.ipynb`)

---

---

# Imports

In [1]:
import math, numpy as np, pandas as pd
from scipy.stats import betabinom, binomtest, chi2

---

# Read in Conformal Prediction Result

In [2]:
df_combined = pd.read_pickle('x4_cal-test_combos__100x_cp__per_variant_test_data__cp_instance_col.pkl')
counts_df = pd.read_csv('x4_cal-test_combos__100x_cp__per_variant_test_data__ms_vs_healthy_scan_cnt_per_config_run.csv')

---

---

---

# Coverage in Conformal Prediction

In the context of **Conformal Prediction (CP)**, **coverage** refers to the proportion of predictions where the *true label* is included within the prediction set produced by the conformal predictor. It directly measures how often the method successfully contains the correct answer.

---

## üìê Formal Definition

Let:
- $ Y_i $: the true label for instance $ i $,
- $ \Gamma_i $: the prediction set for instance  $i$.

Then, **coverage** is calculated as:

$$
\text{Coverage} = \frac{1}{N} \sum_{i=1}^N \mathbf{1}\{ Y_i \in \Gamma_i \}
$$

Where:
- $ N $ is the total number of test instances.
- $ \mathbf{1}\{\cdot\} $ is the indicator function, equal to 1 if the condition is true and 0 otherwise.

---

## üìñ Context in Conformal Prediction

- If a conformal predictor is calibrated at significance level $ \alpha $, it **guarantees coverage of at least $ 1 - \alpha $** on average, assuming exchangeability between calibration and test data.
- **Example**: If $ \alpha = 0.1 $, the prediction sets should include the true label at least **90% of the time**.

---

## üìä Types of Coverage

- **Marginal Coverage**:  
  The overall coverage across the entire population or dataset.

- **Conditional Coverage**:  
  Coverage conditioned on specific subgroups or covariates (e.g., per-class or per-region coverage).

- **Empirical Coverage**:  
  The actual observed coverage on a finite test set, used to assess whether the theoretical guarantee holds in practice.

---

---


# Beta‚ÄìBinomial Distribution: Cumulative Probability of Covered Counts


- Distribution ‚áí Probability of Observing ‚â§ a Given # of Covered Points



---

---

## Formal Definition:

---

Formally, let  
$$
C_j = \#\{\text{covered points in one conformal split}\}
$$  

with    

$$
C_j \sim \text{Beta‚Äìbinomial}(n_{\text{val}},\; a=n_{\text{cal}}+1-\lceil(n_{\text{cal}}+1)\alpha\rceil,\; b=\lceil(n_{\text{cal}}+1)\alpha\rceil)
$$  

under the null hypothesis that calibration and test sets are exchangeable with true miscoverage rate $\alpha$. Then, for an observed count $k_{\text{obs}}$, the **p-value** is:

$$
\text{p\_value} = \Pr(C_j \le k_{\text{obs}}) = \sum_{i=0}^{k_{\text{obs}}} \binom{n_{\text{val}}}{i} \cdot \frac{B(i + a,\; n_{\text{val}} - i + b)}{B(a, b)}.
$$

It represents the probability‚Äîunder the exact Beta‚Äìbinomial distribution implied by conformal prediction assumptions‚Äîof observing **no more than** $k_{\text{obs}}$ covered points in a single calibration/test split.


---

## Python Implementation:

---

In [3]:
# ------------------------------------------------------------------
# per‚Äërun one‚Äësided p‚Äëvalue (Beta‚Äìbinomial null, Angelopoulos‚ÄØ&‚ÄØBates (2021, ¬ßC))
# ------------------------------------------------------------------
def beta_binom_p(n_cal: int, n_val: int, alpha: float, k_cov: int) -> float:
    """One‚Äësided Beta‚Äìbinomial CDF  P(C <= k_cov)."""
    l = math.ceil((n_cal + 1) * alpha)
    a, b = (n_cal + 1) - l, l
    return betabinom.cdf(k_cov, n_val, a, b)


---

## Inuitive Explanation:

---

The **p-value** tells us how surprising our result is, assuming everything went as it should.

In this case, it‚Äôs the chance of getting **as few or fewer correct predictions** as we actually saw, under the assumption that:

- The calibration and test data are similar (exchangeable),
- The conformal method was run correctly at a target error rate of $\alpha$.

If this p-value is very small, it means the result (e.g., low coverage) is unusually bad‚Äîeven under ideal conditions‚Äîand **suggests something might be wrong**.

(e.g., maybe the test set is too different from the calibration data, or the conformal method wasn't applied properly)


---

# Calculate Coverage Guarantee Statistics

(per experiment run & per experimental setup)

---

## Definitions

In [4]:
# ------------------------------------------------------------------
# ‚àí2 Œ£ ln p  ~  œá¬≤_{2R};  returns p‚Äëvalue of Fisher‚Äôs global test
# ------------------------------------------------------------------
def fisher_p(pvec, eps: float = 1e-16) -> float:
    """Fisher‚Äôs global p‚Äëvalue for an array‚Äëlike of p‚Äëvalues."""
    stat = -2.0 * np.sum(np.log(np.clip(pvec, eps, None)))
    return 1.0 - chi2.cdf(stat, 2 * len(pvec))

# ---------------------------------------------------------------------
# 0.  helper shared by both pathways
# ---------------------------------------------------------------------
def _summarise_p_series(p, sig_level):
    """Return a dict of summary stats for a Series of p‚Äëvalues."""
    return {
        "median_p" : p.median(),
        "q25_p"    : p.quantile(0.25),
        "q75_p"    : p.quantile(0.75),
        "prop_sig" : (p < sig_level).mean(),
        "binom_p"  : binomtest((p < sig_level).sum(),
                               n=len(p),
                               p=sig_level,
                               alternative="greater").pvalue,
        "fisher_p" : fisher_p(p.to_numpy())
    }

# ---------------------------------------------------------------------
# 1¬†¬†global (marginal) conformal ‚Äî¬†class_conditional == False
# ---------------------------------------------------------------------
def assess_results_global(
        cp_df:       pd.DataFrame,
        slices_per_scan: int,
        alpha:       float = 0.10,
        sig_level:   float = 0.05) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Compute per‚Äërun and aggregated coverage statistics for **non**‚Äëclass‚Äë
    conditional conformal prediction.

    *cp_df* must contain only rows where `class_conditional == False`.
    """
    assert (cp_df.class_conditional == False).all(), \
        "Input df must contain ONLY class_conditional == False rows."

    # ---------------- per‚Äërun coverage & p‚Äëvalue ---------------------
    runs = (cp_df
            .groupby(["variant_test_data", "cal_test", "run"])["verdict"]
            .agg(n_cov="sum", n_val="count")
            .reset_index())
    runs["coverage"] = runs.n_cov / runs.n_val
    runs["p_value"]  = runs.apply(
        lambda r: beta_binom_p(42 * slices_per_scan, # 42 cal scans
                               int(r.n_val),
                               alpha,
                               int(r.n_cov)),
        axis=1)

    # ---------------- across‚Äëruns summary ---------------------------
    summary = (runs
               .groupby(["variant_test_data", "cal_test"])
               .p_value.apply(lambda s: pd.Series(
                   _summarise_p_series(s, sig_level)))
               .reset_index())

    return runs, summary

# ---------------------------------------------------------------------
# 2¬†¬†class‚Äëconditional conformal ‚Äî¬†class_conditional == True
# ---------------------------------------------------------------------
def assess_results_class_cond(
        cp_df:       pd.DataFrame,
        counts_df:   pd.DataFrame,
        slices_per_scan: int,
        alpha:       float = 0.10,
        sig_level:   float = 0.05,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Per‚Äëclass coverage statistics for **class‚Äëconditional** conformal
    prediction.  The returned *per_class* dataframe has one row per
    (variant, cal_test, run, true class).

    *cp_df* must contain only rows where `class_conditional == True`.
    """
    assert (cp_df.class_conditional == True).all(), \
        "Input df must contain ONLY class_conditional == True rows."

    # ---- per‚Äëclass hit counts --------------------------------------
    per_class = (cp_df
                 .groupby(["variant_test_data",
                           "cal_test",
                           "run",
                           "class"])["verdict"]
                 .agg(n_cov="sum", n_val="count")
                 .reset_index())
    
    # ---- per‚Äëclass coverage ----------------------------------------
    per_class['coverage'] = per_class.n_cov / per_class.n_val
    
    # ---- attach cal and test scan counts (‚Üí slice counts) ----------
    calib = (counts_df[["cal_test", "run",
                        "cal_num_ms_scans",  "cal_num_healthy_scans",
                        "test_num_ms_scans", "test_num_healthy_scans"]]
             .rename(columns={"cal_num_ms_scans"      : "n_cal_1_scan",
                              "cal_num_healthy_scans" : "n_cal_0_scan",
                              "test_num_ms_scans"     : "n_test_1_scan",
                              "test_num_healthy_scans": "n_test_0_scan"}))

    per_class = per_class.merge(calib, on=["cal_test", "run"], how="left")
    per_class["n_cal_cls"] = np.where(
        per_class["class"] == 1,
        per_class.n_cal_1_scan * slices_per_scan,
        per_class.n_cal_0_scan * slices_per_scan)
    per_class['n_test_cls'] = np.where(
        per_class['class'] == 1,
        per_class.n_test_1_scan * slices_per_scan,
        per_class.n_test_0_scan * slices_per_scan)
    per_class = per_class.drop(columns=['n_cal_1_scan', 'n_cal_0_scan', 
                                        'n_test_1_scan', 'n_test_0_scan'])
    
    # ---- compute class‚Äëspecific p‚Äëvalues ---------------------------
    per_class["p_value"] = per_class.apply(
        lambda r: beta_binom_p(int(r.n_cal_cls),
                               int(r.n_val),
                               alpha,
                               int(r.n_cov)),
        axis=1)

    # ---- aggregated summary ---------------------------------------
    summary = (per_class
               .groupby(["variant_test_data",
                         "cal_test",
                         "class"])
               .p_value.apply(lambda s: pd.Series(
                   _summarise_p_series(s, sig_level)))
               .reset_index()
               .rename(columns={'level_2': 'statistic'})
               .rename(columns={'level_3': 'statistic'}))

    return per_class, summary

---

## Limitations and `slices_per_scan` Considerations

---

###‚ÄØWhy slice data from the same scan may not be exchangeable

All 43 slices from the same MRI share the same anatomy, scanner coil, motion artefacts, etc.  


That can induce a cluster correlation: permuting slice‚ÄØ7 of Scan‚ÄØA with slice‚ÄØ7 of Scan‚ÄØB may change the joint distribution in a way a true exchangeable process would not.
Hence slice‚Äëlevel exchangeability could be violated.

Consequences:

- If we plug the full slice count  
ùëõ<sub>cal</sub> = 42 √ó 43  
into the Beta‚Äëbinomial CDF, we are pretending to have 1‚ÄØ806 i.i.d. calibration draws.  
The resulting p‚Äëvalue is __anti‚Äëconservative__ (too small): we flag coverage failures more often than the nominal level.

###‚ÄØUsing the scan count instead

Setting ùëõ<sub>cal</sub> = 42 is a pragmatic fix: we treat each scan as one effective calibration draw.

- Pro:
  - It is invariably more conservative than slice‚Äëlevel counting (p‚Äëvalues become larger).
  - The test now protects against the gross under‚Äëcoverage that would still be visible at scan level.
  
  
- Con:
  - It is not exact either: scans have unequal numbers of hard‚Äëto‚Äësegment slices; MS and healthy scans generate very different slice distributions.

So the scan‚Äëlevel p‚Äëvalue is best viewed as a conservative heuristic, not a finite‚Äësample guarantee.

---

---

---

## Peform Calculations

---

Treat each slice as a calibration point

In [6]:
cc_mask = (df_combined.class_conditional==True)

df_runs_slc_lvl, df_summary_slc_lvl = \
    assess_results_global(df_combined[~cc_mask], 
                          slices_per_scan=43)

df_runs_slc_lvl_cc, df_summary_slc_lvl_cc = \
    assess_results_class_cond(df_combined[cc_mask], 
                              counts_df, 
                              slices_per_scan=43)

---

Treat each scan as a calibration point (more conservative)

In [7]:
cc_mask = (df_combined.class_conditional==True)

df_runs_scn_lvl, df_summary_scn_lvl = \
    assess_results_global(df_combined[~cc_mask], 
                          slices_per_scan=1)

df_runs_scn_lvl_cc, df_summary_scn_lvl_cc = \
    assess_results_class_cond(df_combined[cc_mask], 
                              counts_df, 
                              slices_per_scan=1)

---

## Write Files

In [8]:
df_runs_slc_lvl.to_csv(      'conformal_coverage_guarantees__runs__slice_level.csv', index=False)
df_runs_slc_lvl_cc.to_csv(   'conformal_coverage_guarantees__runs__slice_level__class_conditional.csv', index=False)
df_summary_slc_lvl.to_csv(   'conformal_coverage_guarantees__summary__slice_level.csv', index=False)
df_summary_slc_lvl_cc.to_csv('conformal_coverage_guarantees__summary__slice_level__class_conditional.csv', index=False)
df_runs_scn_lvl.to_csv(      'conformal_coverage_guarantees__runs__scan_level.csv', index=False)
df_runs_scn_lvl_cc.to_csv(   'conformal_coverage_guarantees__runs__scan_level__class_conditional.csv', index=False)
df_summary_scn_lvl.to_csv(   'conformal_coverage_guarantees__summary__scan_level.csv', index=False)
df_summary_scn_lvl_cc.to_csv('conformal_coverage_guarantees__summary__scan_level__class_conditional.csv', index=False)

---

## Read Files

In [None]:
df_runs_slc_lvl = pd.read_csv(      'conformal_coverage_guarantees__runs__slice_level.csv')
df_runs_slc_lvl_cc = pd.read_csv(   'conformal_coverage_guarantees__runs__slice_level__class_conditional.csv')
df_summary_slc_lvl = pd.read_csv(   'conformal_coverage_guarantees__summary__slice_level.csv')
df_summary_slc_lvl_cc = pd.read_csv('conformal_coverage_guarantees__summary__slice_level__class_conditional.csv')
df_runs_scn_lvl = pd.read_csv(      'conformal_coverage_guarantees__runs__scan_level.csv')
df_runs_scn_lvl_cc = pd.read_csv(   'conformal_coverage_guarantees__runs__scan_level__class_conditional.csv')
df_summary_scn_lvl = pd.read_csv(   'conformal_coverage_guarantees__summary__scan_level.csv')
df_summary_scn_lvl_cc = pd.read_csv('conformal_coverage_guarantees__summary__scan_level__class_conditional.csv')

---

---

---

---

---