# Ultrametric Rank Correlation: Patient Comparison

Compute ultrametric rank-correlation distances (Spearman, Kendall) across phases for every patient, per EEG band.
Plots show, for each band and method, 1 - correlation matrices side-by-side for all patients.


In [None]:
%matplotlib inline
# Core imports
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# Project helpers
from lrgsglib import *
move_to_rootf(pathname='lrg_eegfc')
from lrg_eegfc import *

# Specific utilities
from lrgsglib.utils.lrg.infocomm import extract_ultrametric_matrix
from lrg_eegfc.utils.corrmat import process_network_for_phase
from lrg_eegfc.utils.datamanag.loaders import load_data_dict
from lrg_eegfc.config.const import PHASE_LABELS, BRAIN_BANDS, PATIENTS_LIST, BRAIN_BAND_LABELS

# Figure output path
path_figs = Path('data') / 'figures' / 'new_ultrametric_rank_correlation_pat_comparison'
path_figs.mkdir(parents=True, exist_ok=True)

phase_labels = list(PHASE_LABELS)
bands = list(BRAIN_BANDS.keys())
patients = list(PATIENTS_LIST)

# Correlation network protocol (consistent with other notebooks)
correlation_protocol = dict(threshold=0, spectral_cleaning=False)

print(f'Patients: {patients}')
print(f'Phases: {phase_labels}')
print(f'Bands: {bands}')

from lrgsglib.utils.basic.linalg import ultrametric_rank_correlation


## Load Data

In [None]:
data_dict, int_label_map = load_data_dict()
print('✓ Data loaded')


## Build Ultrametric Matrices per Patient, Phase, Band

In [None]:
def compute_structures_for_patient(patient: str, filter_order: int = 1):
    U = {b: {} for b in bands}
    Z = {b: {} for b in bands}
    D = {b: {} for b in bands}

    pin_labels = int_label_map[patient]['label']

    for phase in phase_labels:
        data_pat_phase = data_dict[patient][phase]
        data_pat_phase_ts = data_pat_phase['data']
        fs_raw = data_pat_phase.get('fs', None)
        fs = float(np.asarray(fs_raw).flat[0]) if fs_raw is not None else 1000.0

        for band in bands:
            G, label_dict, lnkgM, clTh, corr_mat, dists = process_network_for_phase(
                data_pat_phase_ts, fs, band, correlation_protocol, pin_labels, filter_order=filter_order
            )
            if lnkgM is None or G is None:
                U[band][phase] = None
                Z[band][phase] = None
                D[band][phase] = None
                continue
            try:
                U[band][phase] = extract_ultrametric_matrix(lnkgM, G.number_of_nodes())
            except Exception as e:
                print(f'[WARN] {patient} {phase} {band}: failed to extract ultrametric ({e})')
                U[band][phase] = None
            Z[band][phase] = lnkgM
            if isinstance(dists, dict):
                D[band][phase] = dists.get('condensed')
            else:
                D[band][phase] = None
    return U, Z, D

ultra_by_pat = {}
linkage_by_pat = {}
condensed_by_pat = {}
for pat in patients:
    print(f'Computing ultrametric structures for {pat} ...')
    U, Z, D = compute_structures_for_patient(pat)
    ultra_by_pat[pat] = U
    linkage_by_pat[pat] = Z
    condensed_by_pat[pat] = D
print('✓ Ultrametric structures computed for all patients')


## Phase Distance Matrices per Patient, Band, Method

In [None]:
methods = ['spearman', 'kendall']
results = {m: {b: {} for b in bands} for m in methods}
n = len(phase_labels)

for method in methods:
    print(f'Method: {method}')
    for band in bands:
        for pat in patients:
            dm = np.full((n, n), np.nan, dtype=float)
            for i, pi in enumerate(phase_labels):
                for j, pj in enumerate(phase_labels):
                    if i == j:
                        dm[i, j] = 0.0
                        continue
                    U1 = ultra_by_pat[pat][band].get(pi)
                    U2 = ultra_by_pat[pat][band].get(pj)
                    if U1 is None or U2 is None or U1.shape != U2.shape:
                        dm[i, j] = np.nan
                    else:
                        corr = ultrametric_rank_correlation(U1, U2, method=method)
                        dm[i, j] = 1.0 - corr
            results[method][band][pat] = dm
    print('  ✓ done')

print('✓ All phase-distance matrices computed')


## Plot: Side-by-Side Patients per Band and Method

In [None]:
def plot_band_method_side_by_side(method: str, band: str):
    vmax = 0.0
    for pat in patients:
        m = results[method][band].get(pat)
        if m is not None and np.isfinite(m).any():
            vmax = max(vmax, np.nanmax(m))
    if vmax == 0.0:
        print(f'[SKIP] No finite values for {method} / {band}')
        return

    fig, axes = plt.subplots(1, len(patients), figsize=(4*len(patients)+2, 4), constrained_layout=True)
    if len(patients) == 1:
        axes = [axes]

    cmap = plt.cm.viridis.copy()
    cmap.set_bad(color='lightgray')

    for ax, pat in zip(axes, patients):
        M = results[method][band][pat]
        im = ax.imshow(M, vmin=0.0, vmax=vmax, cmap=cmap, aspect='equal')
        ax.set_title(f"{pat}")
        ax.set_xticks(range(len(phase_labels)))
        ax.set_yticks(range(len(phase_labels)))
        ax.set_xticklabels(phase_labels, rotation=45, ha='right')
        ax.set_yticklabels(phase_labels)
        for spine in ax.spines.values():
            spine.set_visible(False)

    cbar = fig.colorbar(im, ax=axes, shrink=0.85)
    cbar.set_label(f'1 - {method} correlation')
    outdir = path_figs / method
    outdir.mkdir(parents=True, exist_ok=True)
    outfile = outdir / f'{band}.png'
    fig.savefig(outfile, dpi=200, bbox_inches='tight')
    plt.show()
    print(f'Saved: {outfile}')

for method in methods:
    for band in bands:
        plot_band_method_side_by_side(method, band)

print('✓ All figures generated')
