Network Control Theory _ Alignment & Control Energy

In [None]:
# # sEEG ↔ Centroidi (B0/EPI/MNI) — Verifica, Allineamento, Control Set, Control Energy
# Pipeline completa:
# 1) Caricamento e QC (bounding box, distanze)
# 2) Verifica midpoint bipolari
# 3) Allineamento (min–max affine + ICP rigido) se necessario
# 4) Costruzione control set (k-NN su B0)
# 5) QC distanze + bundle Excel
# 6) Control Energy (Procedure 1) per banda + salvataggi 
# 7) Post-analisi ed estrazione principali controller per banda

In [76]:
# ## IMPORT LIBRERIE (paper) & PATH BASE
# --- standard/core ---
import os
from pathlib import Path

import numpy as np
import pandas as pd

import scipy as sp
from scipy.io import loadmat
from scipy import stats
from scipy.spatial import distance
from sklearn.cluster import KMeans
from tqdm import tqdm

# opzionali / velocità / grafica: scikit-learn per KDTree 
# (altrimenti fallback NumPy)
try:
    from sklearn.neighbors import KDTree
    HAVE_SKLEARN = True
except Exception:
    HAVE_SKLEARN = False

# --- plotting ---
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
plt.rcParams.update({"font.size": 10})
plt.rcParams["svg.fonttype"] = "none"
try:
    import seaborn as sns
except Exception:
    sns = None

# --- neuroimaging ---
try:
    from nilearn import datasets, plotting  # richiede nibabel
except Exception:
    datasets = plotting = None
# carica atlanti/surface e plot cerebrali.

# --- nctpy (dal toolkit del paper, Procedure 1) ---
from nctpy.energies import integrate_u, get_control_inputs
from nctpy.pipelines import ComputeControlEnergy, ComputeOptimizedControlEnergy
from nctpy.metrics import ave_control
from nctpy.utils import (
    matrix_normalization,
    convert_states_str2int,
    normalize_state,
    normalize_weights,
    get_null_p,
    get_fdr_p,
)

from nctpy.plotting import (
    roi_to_vtx,
    null_plot,
    surface_plot,
    add_module_lines,  
)
# ATTENZIONE: null models è un modulo interno al repository degli autori, non un pacchetto PyPI “standard”. 
# Posso scegliere un strada alternativa richiamando surrogati equivalenti da librerie note.

# Path base
try:
    ROOT = Path(__file__).resolve().parent
except NameError:
    # Notebook / VS Code Interactive
    try:
        from IPython import get_ipython
        ip = get_ipython()
        # current working dir of the kernel/session
        ROOT = Path(ip.run_line_magic('pwd', ''))
    except Exception:
        ROOT = Path.cwd()

BASE = ROOT / "NCT_analysis\data"

B0_TXT    = BASE / "centroidsB0_850ROIs.txt"
EPI_TXT   = BASE / "centroidsEPI_850ROIs.txt"
MNI_XLSX  = BASE / "centroidsMNI2mm_850ROIs.xlsx"
SEEG_MONO = BASE / "sEEG_3D_contacts_locs.mat"
SEEG_BI   = BASE / "sEEG_3D_BIPOLARcontacts_locs.mat"

# Cartella per gli output
OUT = ROOT / "output"
OUT.mkdir(exist_ok=True)

# Controllo dei file:
for p in [B0_TXT, EPI_TXT, MNI_XLSX, SEEG_MONO, SEEG_BI]:
    assert p.exists(), f"File mancante: {p}"
print("Tutti i file trovati ✅")

Tutti i file trovati ✅


In [77]:
# ## a) Loader & Utilità geometriche
def _check_xyz_shape(xyz: np.ndarray, name: str):
    if not (isinstance(xyz, np.ndarray) and xyz.ndim == 2 and xyz.shape[1] == 3 and xyz.shape[0] > 0):
        raise ValueError(f"{name}: atteso (N,3) con N>0, trovato {getattr(xyz, 'shape', None)}")

def _to_float_series(s: pd.Series):
    # Supporta e prova a convertire numeri come stringhe con possibile virgola decimale
    return pd.to_numeric(s.astype(str).str.replace(",", ".", regex=False), errors="coerce")

def load_centroids_txt(path: Path):
    """
    Legge i centroidi da TXT con righe tipo: id x y z 
    Accetta spazi/tab/virgole, salta righe vuote o commenti (#).
    Ritorna: ids (N,), xyz (N,3)
    """
    df = pd.read_csv(
        path,
        sep=r"[\s,;]+",
        engine="python",
        header=None,
        comment="#",
        skip_blank_lines=True
    )
    # Converto tutto in numerico e rimuovo colonne/righe interamente vuote (pulizia)
    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(axis=1, how="all").dropna(how="any")
    if df.shape[1] < 4:
        raise ValueError(f"{path} non ha 4 colonne numeriche (id x y z). Letto shape={df.shape}")
    df = df.iloc[:, :4]
    ids = df.iloc[:, 0].astype(int).to_numpy()
    xyz = df.iloc[:, 1:4].astype(float).to_numpy()
    _check_xyz_shape(xyz, f"Centroidi TXT: {path.name}")
    return ids, xyz

def load_mni_centroids_excel(path: Path):
    """
    Legge centroidi MNI da Excel; cerca colonne 'R/L', 'A/P', 'S/I' o 
    fallback su ultime 3 colonne numeriche.
    Ritorna xyz (N,3)
    """
    df = pd.read_excel(path)
    cols = {c.lower(): c for c in df.columns}
    xcol = cols.get("r/l") or cols.get("x")
    ycol = cols.get("a/p") or cols.get("y")
    zcol = cols.get("s/i") or cols.get("z")
    if not (xcol and ycol and zcol):
        # fallback: prendi le ultime 3 colonne numeriche
        num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
        if len(num_cols) < 3:
            raise ValueError(f"Impossibile trovare 3 colonne numeriche in {path}")
        xcol, ycol, zcol = num_cols[-3:]
    xyz = df[[xcol, ycol, zcol]].astype(float).to_numpy()
    _check_xyz_shape(xyz, f"Centroidi MNI: {path.name}")
    return xyz

def parse_contacts_mat(path: Path, preferred_key: str | None = None) -> pd.DataFrame:
    """
    Carica contatti da .mat in modo robusto.
    - Se preferred_key è fornita, la usa (es. 'sEEG_3D_contacts_locs').
    - Altrimenti cerca la prima chiave non 'dunder' con tabella NxK.
    Seleziona le 3 colonne più 'numericabili' come x,y,z e usa la prima non-numerica 
    come label (se esiste).
    Ritorna DataFrame con colonne: label, x, y, z
    """
    data = loadmat(path, squeeze_me=True, struct_as_record=False)
    keys = [k for k in data.keys() if not k.startswith("__")]
    key = preferred_key if (preferred_key in keys) else (keys[0] if keys else None)
    if key is None:
        raise ValueError(f"Nessuna variabile utile trovata in {path}")
    arr = data[key]
    df = pd.DataFrame(arr)

    # valuto quali colonne possono diventare numeriche
    numeric_score = []
    for c in df.columns:
        conv = _to_float_series(df[c])
        numeric_score.append((conv.notna().sum(), c))
    numeric_score.sort(reverse=True)
    num_cols = [c for _, c in numeric_score[:3]]
    xyz = pd.concat([_to_float_series(df[c]) for c in num_cols], axis=1).to_numpy(dtype=float)

    # etichetta = prima colonna non numerica rimanente (se esiste)
    label_cols = [c for c in df.columns if c not in num_cols]
    if label_cols:
        labels = df[label_cols[0]].astype(str).to_numpy()
    else:
        labels = np.array([f"C{i+1}" for i in range(len(df))], dtype=str)

    # rimuovo righe con NaN
    keep = ~np.isnan(xyz).any(axis=1)
    xyz, labels = xyz[keep], labels[keep]
    _check_xyz_shape(xyz, f"Contatti: {path.name}")
    return pd.DataFrame({"label": labels, "x": xyz[:,0], "y": xyz[:,1], "z": xyz[:,2]})

 # ------------------ Utilità geometriche ----------------- 

def bbox(arr: np.ndarray):
    """Restituisce bounding box e conta punti."""
    arr = np.asarray(arr)
    mins, maxs = arr.min(axis=0), arr.max(axis=0)
    return dict(
        min_x=float(mins[0]), max_x=float(maxs[0]),
        min_y=float(mins[1]), max_y=float(maxs[1]),
        min_z=float(mins[2]), max_z=float(maxs[2]),
        n=int(arr.shape[0])
    )

def nearest_dists(points: np.ndarray, centroids: np.ndarray):
    """Distanza e indice del nearest neighbor di ogni point rispetto a centroids."""
    if HAVE_SKLEARN:
        tree = KDTree(centroids)
        d, idx = tree.query(points, k=1)
        return d.ravel(), idx.ravel()
    # Fallback NumPy
    P = points[:, None, :]           # (N,1,3)
    C = centroids[None, :, :]        # (1,M,3)
    d2 = np.sum((P - C)**2, axis=2)  # (N,M)
    idx = np.argmin(d2, axis=1)
    d = np.sqrt(d2[np.arange(points.shape[0]), idx])
    return d, idx


def two_nearest(points: np.ndarray, cloud: np.ndarray):
    """Per ogni point, trova i 2 vicini più prossimi in cloud. 
    Ritorna (dists (N,2), idx (N,2))."""
    if HAVE_SKLEARN:
        tree = KDTree(cloud)
        d, idx = tree.query(points, k=2)
        return d, idx
    # fallback NumPy
    P = points[:, None, :]
    C = cloud[None, :, :]
    d2 = np.sum((P - C) ** 2, axis=2)      # (N, M)
    idx1 = np.argmin(d2, axis=1)           # (N,)
    # maschera per trovare il 2° vicino
    mask = np.ones_like(d2, dtype=bool)
    mask[np.arange(points.shape[0]), idx1] = False
    d2_masked = np.where(mask, d2, np.inf)
    idx2 = np.argmin(d2_masked, axis=1)    # (N,)
    d1 = np.sqrt(d2[np.arange(points.shape[0]), idx1])
    d2v = np.sqrt(d2[np.arange(points.shape[0]), idx2])
    d = np.stack([d1, d2v], axis=1)
    idx = np.stack([idx1, idx2], axis=1)
    return d, idx


def summary_line(name, d):
    return f"{name}: N={d.size}, median={np.median(d):.3f}, mean={np.mean(d):.3f}, min={np.min(d):.3f}, max={np.max(d):.3f}"


In [78]:
# ## b) Allineamento: min-max affine (pre-asse) + ICP rigido
def minmax_affine(X: np.ndarray, T: np.ndarray):
    """
    Allinea grossolanamente X a T mappando il bounding box per asse:
    Y = X * scale + offset, dove scale/offset sono vettori (3,)
    """
    X = np.asarray(X, float)
    T = np.asarray(T, float)
    minX, maxX = X.min(axis=0), X.max(axis=0)
    minT, maxT = T.min(axis=0), T.max(axis=0)
    scale = (maxT - minT) / (maxX - minX + 1e-12)
    offset = minT - minX * scale
    Y = X * scale + offset
    return Y, scale, offset


def kabsch_rigid(A: np.ndarray, B: np.ndarray):
    """
    Stima trasformazione rigida (R,t) che porta A → B dato un matching punto-a-punto.
    R è 3x3 ortonormale; t è (3,)
    """
    A = np.asarray(A, float); B = np.asarray(B, float)
    cA = A.mean(axis=0); cB = B.mean(axis=0)
    AA = A - cA; BB = B - cB
    H = AA.T @ BB
    U, S, Vt = np.linalg.svd(H)
    R = Vt.T @ U.T
    # gestisci riflessione
    if np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = Vt.T @ U.T
    t = cB - R @ cA
    return R, t


def icp_rigid(X: np.ndarray, T: np.ndarray, max_iter=50, tol=1e-5):
    """
    ICP semplice: stima R,t tali che R*X + t ≈ T.
    Inizializza con identità (si consiglia di passare X già grossolanamente allineato con minmax_affine).
    """
    Xc = X.copy()
    prev_err = None
    for it in range(max_iter):
        # 1) nearest nei target
        d, idx = nearest_dists(Xc, T)
        matched_T = T[idx]
        # 2) kabsch su corrispondenze
        R, t = kabsch_rigid(Xc, matched_T)
        Xc_new = (R @ Xc.T).T + t
        # 3) errore e stop
        err = np.median(np.linalg.norm(Xc_new - matched_T, axis=1))
        Xc = Xc_new
        if prev_err is not None and abs(prev_err - err) < tol:
            break
        prev_err = err
    return Xc, dict(median_err=float(prev_err or err), it=it+1)


Caricamento dati, Quick Check, Verifica spazi, Midpoint bipolari, Allineamento

In [79]:
THR_ALIGN = 10.0  # soglia euristica (mm) per dire "stesso spazio": 
                  # se mediana distanza <= 10 → "ok stesso spazio"

# 1) Caricamento dati
b0_ids, b0_xyz  = load_centroids_txt(B0_TXT)    # recupero anche gli ID (coerenti con l'ordine del connettoma strutturale)
epi_ids, epi_xyz = load_centroids_txt(EPI_TXT)
mni_xyz   = load_mni_centroids_excel(MNI_XLSX)

mono_df = parse_contacts_mat(SEEG_MONO, preferred_key="sEEG_3D_contacts_locs")
bi_df   = parse_contacts_mat(SEEG_BI,   preferred_key="sEEG_3D_BIPOLARcontacts_locs")
mono_pts = mono_df[["x", "y", "z"]].to_numpy()
bi_pts   = bi_df[["x", "y", "z"]].to_numpy()

# 2) Bounding boxes (QC rapido)
bb_tbl = pd.DataFrame([
    {"set": "B0 centroids",   **bbox(b0_xyz)},
    {"set": "EPI centroids",  **bbox(epi_xyz)},
    {"set": "MNI centroids",  **bbox(mni_xyz)},
    {"set": "sEEG monopolar", **bbox(mono_pts)},
    {"set": "sEEG bipolar",   **bbox(bi_pts)},
])
print("\n# Bounding boxes (unità dei file):")
print(bb_tbl.to_string(index=False))
bb_tbl.to_csv(OUT / "bbox_summary.csv", index=False)

# 3) Verifica spazio coordinate: distanze ai centroidi (pre-allineamento)
d_mono_b0, _ = nearest_dists(mono_pts, b0_xyz)
d_bi_b0,   _ = nearest_dists(bi_pts,   b0_xyz)
d_mono_mni, _ = nearest_dists(mono_pts, mni_xyz)
d_bi_mni,   _ = nearest_dists(bi_pts,   mni_xyz)
d_mono_epi, _ = nearest_dists(mono_pts, epi_xyz)  # facoltativo/diagnostico
d_bi_epi,   _ = nearest_dists(bi_pts,   epi_xyz)

print("\n# Distanze nearest-centroid (pre-allineamento):")
print(summary_line("Monopolar vs B0",  d_mono_b0))
print(summary_line("Bipolar   vs B0",  d_bi_b0))
print(summary_line("Monopolar vs MNI", d_mono_mni))
print(summary_line("Bipolar   vs MNI", d_bi_mni))
print(summary_line("Monopolar vs EPI", d_mono_epi))
print(summary_line("Bipolar   vs EPI", d_bi_epi))

aligned_b0  = (np.median(d_mono_b0) <= THR_ALIGN) and (np.median(d_bi_b0) <= THR_ALIGN)
aligned_mni = (np.median(d_mono_mni) <= THR_ALIGN) and (np.median(d_bi_mni) <= THR_ALIGN)
print("\n# Giudizio euristico di spazio:")
print(f"Stesso spazio dei centroidi B0?  {'SI' if aligned_b0  else 'NO'}")
print(f"Stesso spazio dei centroidi MNI? {'SI' if aligned_mni else 'NO'}")

# 4) Verifica: i bipolari sono midpoint dei 2 monopolari più vicini?
d2, idx2 = two_nearest(bi_pts, mono_pts)  # per ogni bi → due mono più vicini
midpoints = (mono_pts[idx2[:, 0]] + mono_pts[idx2[:, 1]]) / 2.0
dist_mid = np.linalg.norm(bi_pts - midpoints, axis=1)     # quanto il bipolare sta vicino al midpoint?
edge_len = np.linalg.norm(mono_pts[idx2[:, 0]] - mono_pts[idx2[:, 1]], axis=1)  # distanza tra i due mono
ratio = dist_mid / (edge_len + 1e-12)                     # normalizzato alla spaziatura
mid_df = pd.DataFrame({
    "bipolar_label": bi_df["label"],
    "mono1_idx": idx2[:, 0] + 1,  # +1 se vuoi numerazione 1-based
    "mono2_idx": idx2[:, 1] + 1,
    "dist_to_midpoint": dist_mid,
    "mono_pair_distance": edge_len,
    "midpoint_ratio": ratio
})
mid_df.to_csv(OUT / "check_bipolar_midpoint.csv", index=False)
print("\n# Bipolari ~ midpoint di 2 monopolar più vicini:")
print(f"median dist to midpoint = {np.median(dist_mid):.3f} ; "
      f"median ratio = {np.median(ratio):.3f} (≈0 se esatto midpoint; ~0.5 se sul contatto)")

# 5) Se NON allineati, eseguo allineamento verso B0 e/o MNI
def align_to_target(name: str, target_xyz: np.ndarray):
    """
    Allinea mono/bi ai centroidi 'target_xyz' in due step:
      (A) affine min–max per asse
      (B) ICP rigido di rifinitura
    Salva CSV e ritorna mediana distanza prima/dopo.
    """
    # prima delle trasformazioni
    d_mono_pre, _ = nearest_dists(mono_pts, target_xyz)
    d_bi_pre,   _ = nearest_dists(bi_pts,   target_xyz)

    # (A) min–max affine
    mono_mm, s_mono, o_mono = minmax_affine(mono_pts, target_xyz)
    bi_mm,   s_bi,   o_bi   = minmax_affine(bi_pts,   target_xyz)

    # (B) ICP rigido (su output affine)
    mono_icp, info_mono = icp_rigid(mono_mm, target_xyz, max_iter=50, tol=1e-5)
    bi_icp,   info_bi   = icp_rigid(bi_mm,   target_xyz, max_iter=50, tol=1e-5)

    # distanze finali
    d_mono_post, _ = nearest_dists(mono_icp, target_xyz)
    d_bi_post,   _ = nearest_dists(bi_icp,   target_xyz)

    # salvataggi
    pd.DataFrame(mono_icp, columns=["x","y","z"]).to_csv(OUT / f"sEEG_monopolar_aligned_to_{name}.csv", index=False)
    pd.DataFrame(bi_icp,   columns=["x","y","z"]).to_csv(OUT / f"sEEG_bipolar_aligned_to_{name}.csv",   index=False)

    # breve log
    print(f"\n# Allineamento verso {name}")
    print(f"Monopolar pre  → median nn-dist = {np.median(d_mono_pre):.3f}")
    print(f"Monopolar post → median nn-dist = {np.median(d_mono_post):.3f} (ICP iters={info_mono['it']}, med.err={info_mono['median_err']:.3f})")
    print(f"Bipolar   pre  → median nn-dist = {np.median(d_bi_pre):.3f}")
    print(f"Bipolar   post → median nn-dist = {np.median(d_bi_post):.3f} (ICP iters={info_bi['it']}, med.err={info_bi['median_err']:.3f})")

    return dict(
        mono_pre=np.median(d_mono_pre), mono_post=np.median(d_mono_post),
        bi_pre=np.median(d_bi_pre),     bi_post=np.median(d_bi_post)
    )

# Esegui allineamenti solo se necessario
if not aligned_b0:
    res_b0 = align_to_target("B0", b0_xyz)
else:
    print("\nContatti già nel sistema B0 (entro soglia). Non eseguo allineamento verso B0.")

if not aligned_mni:
    res_mni = align_to_target("MNI", mni_xyz)
else:
    print("Contatti già nel sistema MNI (entro soglia). Non eseguo allineamento verso MNI.")

print("\n== FINE VERIFICA/ALLINEAMENTO ==")
print(f"Output salvati in: {OUT.resolve()}")


# Come leggere l'output:
# output/bbox_summary.csv → range degli assi per ogni set 
#                           (fa vedere subito se le scale/origini sono comparabili).

# stampa a console delle distanze: se la mediana sEEG→B0 (o →MNI) è ≤ ~10 
#                                   → di solito stesso spazio.

# output/check_bipolar_midpoint.csv → per ogni bipolare:
#   dist_to_midpoint: distanza dal punto medio dei 2 monopolari più vicini; ideale ≈ 0.
#   midpoint_ratio = dist_to_midpoint / distanza_mono_pair: normalizzato; ideale ≈ 0.

# Se serve l’allineamento: 
#   →  sEEG_monopolar_aligned_to_B0.csv / sEEG_bipolar_aligned_to_B0.csv (e le versioni _to_MNI)
#       con le coordinate allineate. Le mediane delle distanze “post” dovrebbero scendere marcatamente.



# Bounding boxes (unità dei file):
           set      min_x     max_x      min_y     max_y     min_z     max_z   n
  B0 centroids  23.039216 87.866667   8.184080 85.152672 20.052356 70.195122 850
 EPI centroids  21.476190 73.230769   7.707317 71.888889  8.741071 51.130435 850
 MNI centroids  12.014925 77.220000  11.221311 96.903846 13.945545 75.250000 850
sEEG monopolar -33.867000  1.591000 -33.723000 13.342000 13.132000 71.684000  86
  sEEG bipolar -33.478000  1.528500 -33.182500 13.298500 15.075500 69.966000  77

# Distanze nearest-centroid (pre-allineamento):
Monopolar vs B0: N=86, median=61.512, mean=60.995, min=48.321, max=73.800
Bipolar   vs B0: N=77, median=61.617, mean=60.975, min=48.648, max=73.229
Monopolar vs MNI: N=86, median=56.967, mean=55.654, min=40.674, max=68.487
Bipolar   vs MNI: N=77, median=57.054, mean=55.643, min=41.324, max=67.601
Monopolar vs EPI: N=86, median=60.419, mean=59.719, min=48.557, max=73.131
Bipolar   vs EPI: N=77, median=60.708, mean=59.693, min=

COSTRUZIONE CONTROL SET (ROI più vicine)

In [80]:
# Obiettivo: associare a ciascun contatto sEEG la ROI più vicina (centroide B0) 
# (K=1, distanza euclidea minima) e costruire il control set mantenendo i duplicati
# (uno-per-contatto). Userò il control set MONO nella control energy.

# --- Parametri principali  ---
K_NEAREST = 1            # quante ROI vicine devono essere prese per ogni contatto (k-NN)
MAX_DIST = None          # soglia massima di distanza (stesse unità dei centroidi). Se None, nessun filtro.

# --- Si preferiscono i contatti allineati in B0 salvati ---
MONO_ALIGNED_CSV = OUT / "sEEG_monopolar_aligned_to_B0.csv"
BI_ALIGNED_CSV   = OUT / "sEEG_bipolar_aligned_to_B0.csv"

# --- Helper: k-NN (usa KDTree caricato sopra; altrimenti fallback NumPy) ---
def knn_indices_dists(points: np.ndarray, refs: np.ndarray, k: int):
    """
    Dato un insieme di punti (points) e i centroidi (refs), 
    restituisce (d, idx) con shape (N, k): per ogni point[i], distanze e indici dei k
    centroidi più vicini in 'refs'.
    d = distanze ai k più vicini
    idx = indici di quei centroidi
    """
    if 'HAVE_SKLEARN' in globals() and HAVE_SKLEARN:
        tree = KDTree(refs)
        d, idx = tree.query(points, k=k)
        return d, idx
    # Fallback O(NM): va benissimo per N e M ~ 10^3
    P = points[:, None, :]             # (N,1,3)
    R = refs[None, :, :]               # (1,M,3)
    d2 = np.sum((P - R)**2, axis=2)    # (N,M)
    idx_sorted = np.argsort(d2, axis=1)[:, :k]
    rows = np.arange(points.shape[0])[:, None]
    d = np.sqrt(d2[rows, idx_sorted])
    return d, idx_sorted

######################################################################################################
##################################### COSTRUZIONE CONTROL SET ########################################
######################################################################################################
# --- 1) Recupero ID+centroidi B0 (gli ID servono per allinearsi al connettoma) ---
#    Nel blocco precedente ho fatto: "b0_ids, b0_xyz = load_centroids_txt(B0_TXT)",
#    dove recupero anche gli ID (coerenti con l'ordine del connettoma strutturale).

# --- 2) Considero i contatti sEEG (monopolar & bipolar) memorizzati nel blocco precedente
# mono_df = parse_contacts_mat(SEEG_MONO, preferred_key="sEEG_3D_contacts_locs")
# bi_df   = parse_contacts_mat(SEEG_BI,   preferred_key="sEEG_3D_BIPOLARcontacts_locs")
# mono_pts = mono_df[["x","y","z"]].to_numpy()
# bi_pts   = bi_df[["x","y","z"]].to_numpy()

# --- 3) Scelgo quali coordinate dei contatti usare (allineate se presenti) ---
def pick_contacts_for_control(mono_pts, bi_pts):
    """
    Restituisce (mono_use, bi_use):
    - Se esistono CSV allineati_to_B0 -> li carica e li usa
    - Altrimenti usa i punti in memoria (mono_pts/bi_pts)
    """
    # MONOPOLARI
    if MONO_ALIGNED_CSV.exists():
        df = pd.read_csv(MONO_ALIGNED_CSV)
        mono_use = df[["x","y","z"]].to_numpy()
        print(f"[control set] Uso MONO allineati: {MONO_ALIGNED_CSV.name}")
    else:
        mono_use = mono_pts
        print("[control set] ATTENZIONE: non trovo MONO allineati; uso i MONO correnti.")
    # BIPOLARI
    if BI_ALIGNED_CSV.exists():
        df = pd.read_csv(BI_ALIGNED_CSV)
        bi_use = df[["x","y","z"]].to_numpy()
        print(f"[control set] Uso BI allineati: {BI_ALIGNED_CSV.name}")
    else:
        bi_use = bi_pts
        print("[control set] ATTENZIONE: non trovo BI allineati; uso i BI correnti.")
    return mono_use, bi_use

# Si assume che 'mono_pts' e 'bi_pts' siano in memoria dal blocco precedente
mono_for_ctrl, bi_for_ctrl = pick_contacts_for_control(mono_pts, bi_pts)

# --- 4) Funzione: costruzione della tabella contatto → ROI k-NN (con soglia MAX_DIST) ---
def build_contact_to_roi(points: np.ndarray,
                         contact_labels: list[str] | None,
                         roi_xyz: np.ndarray,
                         roi_ids: np.ndarray,
                         k: int = 1,
                         max_dist: float | None = None,
                         tag: str = "type") -> pd.DataFrame:
    """
    la funzione calcola i k centroidi B0 più vicini a ciascun contatto sEEG (knn_indeces_dists)
    points:      (N,3) contatti (idealmente in B0)
    contact_labels: lista etichette (se None, creo C1..CN)
    roi_xyz:     (M,3) centroidi B0
    roi_ids:     (M,)  ID ROI coerenti con connettoma
    k:           numero di vicini da tenere
    max_dist:    soglia distanza per filtrare coppie troppo lontane (None = nessun filtro)
    tag:         "monopolar" / "bipolar" per etichettare le righe
    """
    if points is None or points.size == 0:
        return pd.DataFrame(columns=["type","contact_index","contact_label","roi_id","roi_rank","distance"])
    # etichette contatto: se non fornite, creo C1..CN
    if (contact_labels is None) or (len(contact_labels) != points.shape[0]):
        contact_labels = [f"C{i+1}" for i in range(points.shape[0])]

    # k-nearest neighbors
    d, idx = knn_indices_dists(points, roi_xyz, k=k)  # shape (N,k) ciascuno

    rows = []
    for i in range(points.shape[0]):
        for r in range(k):
            dist_ir = float(d[i, r])
            if (max_dist is not None) and (dist_ir > max_dist):
                continue
            roi_id = int(roi_ids[idx[i, r]])  # ID coerente col connettoma
            rows.append({
                "type": tag,
                "contact_index": int(i+1),           # 1-based per leggibilità
                "contact_label": contact_labels[i],  # es. A'3, B'7, ecc.
                "roi_id": roi_id,
                "roi_rank": int(r+1),                # 1=più vicina
                "distance": dist_ir
            })
    return pd.DataFrame(rows)

# --- 5) Etichette dei contatti (se i DataFrame dai .mat esistono) ---
mono_labels = mono_df["label"].tolist() if 'mono_df' in globals() else None
bi_labels   = bi_df["label"].tolist()   if 'bi_df'   in globals() else None

# --- 5) Costruisco le mappe per monopolar e bipolar ---
df_map_mono = build_contact_to_roi(
    points=mono_for_ctrl,
    contact_labels=mono_labels,
    roi_xyz=b0_xyz,
    roi_ids=b0_ids,
    k=K_NEAREST,
    max_dist=MAX_DIST,
    tag="monopolar"
)
df_map_bi = build_contact_to_roi(
    points=bi_for_ctrl,
    contact_labels=bi_labels,
    roi_xyz=b0_xyz,
    roi_ids=b0_ids,
    k=K_NEAREST,
    max_dist=MAX_DIST,
    tag="bipolar"
)

# --- 6) Salvo le mappe complete (k=1 -> 1 riga per contatto) ---
map_mono_path = OUT / "controlset_map_monopolar_k1.csv"
map_bi_path   = OUT / "controlset_map_bipolar_k1.csv"
df_map_mono.to_csv(map_mono_path, index=False)
df_map_bi.to_csv(map_bi_path,   index=False)
"ogni riga = 1 contatto (mono o bi); "
"colonne: type, contact_index, contact_label, roi_id, roi_rank, distance (mm)"

# --- 7) Estraggo la top-1 per contatto ---
mono_top1 = (df_map_mono
             .sort_values(["contact_index","roi_rank"])
             .groupby("contact_index", as_index=False)
             .first())

# (opzionale) audit anche per bipolari
bi_top1   = (df_map_bi
             .sort_values(["contact_index","roi_rank"])
             .groupby("contact_index", as_index=False)
             .first())
"qui per sicurezza faccio un groupby(contact_index) e tengo solo la riga con roi_rank più piccolo."
"In teoria con k=1 non serve, ma se in futuro cambio k > 1, è utile e robusto."

# Salvo le mappe per-contatto (comode per QC)
mono_top1_path = OUT / "contact2roi_MONO_top1.csv"
bi_top1_path   = OUT / "contact2roi_BI_top1.csv"
mono_top1.to_csv(mono_top1_path, index=False)
bi_top1.to_csv(bi_top1_path,     index=False)

# --- 8) costruzione CONTROL SET PER-CONTATTO (CON DUPLICATI), 1 riga per contatto ---
# Ordine dei contatti dato da contact_index; una riga per contatto
# estraggo roi_id e ottengo una lista di ID ROI (con duplicati, se più contatti mappano alla stessa ROI)
mono_ids_per_contact = mono_top1.sort_values("contact_index")["roi_id"].astype(int).tolist()
bi_ids_per_contact   = bi_top1.sort_values("contact_index")["roi_id"].astype(int).tolist()

def write_ids_txt(path, ids):
    with open(path, "w") as f:
        for rid in ids:
            f.write(f"{rid}\n")

ctl_mono_percontact_path = OUT / "control_set_ROIs_B0_MONO_top1_perContact.txt"
ctl_bi_percontact_path   = OUT / "control_set_ROIs_B0_BI_top1_perContact.txt"
write_ids_txt(ctl_mono_percontact_path, mono_ids_per_contact)
write_ids_txt(ctl_bi_percontact_path,   bi_ids_per_contact)
"contengono:"
"ogni riga = ID di 1 ROI (intero da 1 a 850) associata al contatto sEEG più vicino corrispondente."
"il numero di righe corrisponde al n° di contatti sEEG considerati:"
"- MONO: attesi 86 contatti (righe)"
"- BI:   attesi 77 contatti (righe)"

# --- Check: devo avere 86 righe nel MONO (una per contatto) ---
n_contacts_mono = mono_top1["contact_index"].nunique()
print(f"[CHECK] MONO per-contatto: {len(mono_ids_per_contact)} righe (contatti unici={n_contacts_mono})")
if len(mono_ids_per_contact) != 86:
    print("[WARNING] Attese 86 righe. Se sono meno, verifica MAX_DIST (deve essere None) o contatti inattivi.")

# --- Check: devo avere 77 righe nel BI (una per contatto) ---
n_contacts_bi = bi_top1["contact_index"].nunique()
print(f"[CHECK] BI per-contatto: {len(bi_ids_per_contact)} righe (contatti unici={n_contacts_bi})")
if len(bi_ids_per_contact) != 77:
    print("[WARNING] Attese 77 righe. Se sono meno, verifica MAX_DIST (deve essere None) o contatti inattivi.")

print("\n== MAPPE & CONTROL SET (PER-CONTATTO, CON DUPLICATI) COSTRUITI ==")
print(f"- Mappe complete:      {map_mono_path.name} | {map_bi_path.name}")
print(f"- Mappe top-1:         {mono_top1_path.name} | {bi_top1_path.name}")
print(f"- Control set MONO PC: {ctl_mono_percontact_path.name} (#righe={len(mono_ids_per_contact)}, contatti unici={n_contacts_mono})")
print(f"- Control set BI   PC: {ctl_bi_percontact_path.name}   (#righe={len(bi_ids_per_contact)}, contatti unici={n_contacts_bi})")
if len(mono_ids_per_contact) != 86:
    print("[WARNING] Attese 86 righe per i MONO. Se sono meno, verifica che MAX_DIST=None e i contatti disponibili.")
print(">> Per la control energy usa:", ctl_mono_percontact_path.name)

# Nota importante:
# Gli ID salvati nel control set (uno per riga) devono corrispondere all'ordine delle ROI nel mio connettoma
# (connectome850_2b0.xlsx). Verificare che "b0_ids" sia la stessa indicizzazione usata per la matrice A.

# Risultati:
# file output/control_set_ROIs_B0_*.txt contiene la lista di ID ROI da usare come nodi di controllo (matrice B nella NCT).
# I CSV controlset_map_*.csv permettono di ispezionare quali ROI sono state associate ad ogni contatto (utile per QC o per cambiare k/soglia).


[control set] Uso MONO allineati: sEEG_monopolar_aligned_to_B0.csv
[control set] Uso BI allineati: sEEG_bipolar_aligned_to_B0.csv
[CHECK] MONO per-contatto: 86 righe (contatti unici=86)
[CHECK] BI per-contatto: 77 righe (contatti unici=77)

== MAPPE & CONTROL SET (PER-CONTATTO, CON DUPLICATI) COSTRUITI ==
- Mappe complete:      controlset_map_monopolar_k1.csv | controlset_map_bipolar_k1.csv
- Mappe top-1:         contact2roi_MONO_top1.csv | contact2roi_BI_top1.csv
- Control set MONO PC: control_set_ROIs_B0_MONO_top1_perContact.txt (#righe=86, contatti unici=86)
- Control set BI   PC: control_set_ROIs_B0_BI_top1_perContact.txt   (#righe=77, contatti unici=77)
>> Per la control energy usa: control_set_ROIs_B0_MONO_top1_perContact.txt


In [81]:
# ============ QC COMPLETO CONTROL SET: distanze, midpoint, integrità + crea bundle Excel =============
# il seguente blocco calcola statistiche globali e per tipo con percentili, 
# calcola e salva outlier globali e outlier per tipo,
# crea un istogramma delle distanze,
# esegue un QC sui midpoint dei bipolari,
# controlla che il control set MONO per-contatto abbia 86 righe, sia compatibile con B0 e col connettoma
# crea il file Excel unico con tutti i .csv importanti

# Parametri-soglia
THR_TOP1_MM = 6.0       # (mm) soglia distanza contatto - ROI
THR_MIDPOINT_MM = 2.0   # (mm) soglia distanza bipolare - midpoint

# Carico mappe mono/bi
map_mono_path = OUT / "controlset_map_monopolar_k1.csv"
map_bi_path   = OUT / "controlset_map_bipolar_k1.csv"

# --- Helper: carica mappa e restituisce solo la ROI top-1 (roi_rank==1 se esiste) ---
# una sola funzione per leggere le mappe e tenere solo roi_rank==1
def load_top1_df(path: Path, label: str) -> pd.DataFrame | None:
    if not path.exists():
        print(f"[QC] {label}: file non trovato ({path.name})")
        return None
    df = pd.read_csv(path)

    if "roi_rank" in df.columns:
        top1 = df[df["roi_rank"] == 1].copy()
        if top1.empty:
            print(f"[QC] {label}: nessuna riga roi_rank==1 (uso l'intero df)")
            top1 = df.copy()
    else:
        top1 = df.copy()

    top1["type"] = label
    return top1

def stats_dict(x):
    x = np.asarray(x, dtype=float)
    return {
        "N": int(x.size),
        "median": float(np.median(x)),
        "mean": float(np.mean(x)),
        "p75": float(np.percentile(x, 75)),
        "p90": float(np.percentile(x, 90)),
        "p95": float(np.percentile(x, 95)),
        "max": float(np.max(x)),
    }

# --- 1) QC distanze contatto–ROI (mono + bi) ---
top1_m = load_top1_df(map_mono_path, "monopolar")
top1_b = load_top1_df(map_bi_path,   "bipolar")
frames = [d for d in (top1_m, top1_b) if d is not None]

if frames:
    top1 = pd.concat(frames, ignore_index=True)

    print("\n# QC distanza (ROI più vicina per contatto)")
    print("GLOBAL:", stats_dict(top1["distance"].to_numpy()))
    for t_lab, g in top1.groupby("type"):
        print(f"{t_lab.upper()}:", stats_dict(g["distance"].to_numpy()))

    # quota entro soglia THR_TOP1_MM (globale)
    frac_within = (top1["distance"] <= THR_TOP1_MM).mean()
    print(f"\nFrazione di contatti con ROI più vicina entro {THR_TOP1_MM} mm (GLOBAL): {frac_within:.1%}")

    # outlier globali
    outliers_global = top1[top1["distance"] > THR_TOP1_MM].sort_values("distance", ascending=False)
    out_path_global = OUT / f"controlset_top1_outliers_gt_{THR_TOP1_MM}mm.csv"
    outliers_global.to_csv(out_path_global, index=False)
    print(f"Outlier globali (> {THR_TOP1_MM} mm) salvati in: {out_path_global}")

    # outlier per tipo (come nel QC light)
    for t_lab, g in top1.groupby("type"):
        arr = g["distance"].to_numpy(dtype=float)
        stats = dict(
            N=int(arr.size),
            median=float(np.median(arr)),
            mean=float(np.mean(arr)),
            p90=float(np.percentile(arr, 90)),
            max=float(np.max(arr)),
        )
        frac_t = float((arr <= THR_TOP1_MM).mean())
        print(f"[QC] {t_lab} top-1 → {stats} ; quota ≤ {THR_TOP1_MM} mm = {frac_t:.1%}")

        outliers_t = g[g["distance"] > THR_TOP1_MM].sort_values("distance", ascending=False)
        if not outliers_t.empty:
            outp = OUT / f"{t_lab}_top1_outliers_gt_{THR_TOP1_MM}mm.csv"
            outliers_t.to_csv(outp, index=False)
            print(f"[QC]  Outlier {t_lab} salvati in: {outp}")

    # istogramma globale distanze
    plt.figure()
    plt.hist(top1["distance"], bins=20)
    plt.title("Distanza ROI più vicina (top-1)")
    plt.xlabel("Distanza euclidea [mm]")
    plt.ylabel("Conteggio contatti")
    plt.tight_layout()
    plt.savefig(OUT / "distance_nearest_ROI.png", dpi=300, bbox_inches="tight")
    plt.close()
else:
    print("[QC] ATTENZIONE: nessuna mappa mono/bi trovata. Salto QC distanze.")

# --- 2) QC midpoint bipolari ---
mid_path = OUT / "check_bipolar_midpoint.csv"
if mid_path.exists():
    mid = pd.read_csv(mid_path)
    med_dist  = float(np.median(mid["dist_to_midpoint"]))
    med_ratio = float(np.median(mid["midpoint_ratio"]))
    print(f"\n[QC] Midpoint bipolari → median dist={med_dist:.3f} mm ; median ratio={med_ratio:.3f}")
    bad_mid = mid[mid["dist_to_midpoint"] > THR_MIDPOINT_MM]
    print(f"[QC] Bipolari con dist_to_midpoint > {THR_MIDPOINT_MM} mm: {len(bad_mid)}")
    if not bad_mid.empty:
        outp = OUT / f"midpoint_outliers_gt_{THR_MIDPOINT_MM}mm.csv"
        bad_mid.to_csv(outp, index=False)
        print(f"[QC]  Outlier midpoint salvati in: {outp}")
else:
    print("\n[QC] ATTENZIONE: check_bipolar_midpoint.csv non trovato (salta questo check).")

# --- 3) Integrità control set MONO per-contatto ---
ctrl_path = OUT / "control_set_ROIs_B0_MONO_top1_perContact.txt"
if ctrl_path.exists():
    ctrl_ids = np.loadtxt(ctrl_path, dtype=int, ndmin=1)  # mantiene ordine e duplicati
    n_total  = int(ctrl_ids.size)
    n_unique = len(set(ctrl_ids.tolist()))
    n_dupes  = n_total - n_unique
    print(f"\n[QC] Control set (MONO per-contatto): {ctrl_path.name} → #righe={n_total} ; #unici={n_unique} ; duplicati={n_dupes}")
    if n_total != 86:
        print("[QC]  WARNING: attese 86 righe. Se sono meno, verifica MAX_DIST=None o contatti mancanti.")

    # Verifica presenza in B0
    b0_df  = pd.read_csv(B0_TXT, sep=r"[\s,;]+", engine="python",
                         header=None, comment="#", skip_blank_lines=True)
    b0_ids = b0_df.iloc[:, 0].astype(int).to_numpy()
    missing = sorted(set(ctrl_ids) - set(b0_ids))
    if len(missing) == 0:
        print("[QC]  Tutti gli ID presenti in B0 ✅")
    else:
        print(f"[QC]  ATTENZIONE: ID non presenti in B0: {missing[:10]}{' ...' if len(missing) > 10 else ''}")

    # Coerenza con la dimensione del connectome
    conn = BASE / "connectome850_2b0.xlsx"
    if conn.exists():
        try:
            A_tmp = pd.read_excel(conn, header=None)
            nA = A_tmp.shape[0]
            if A_tmp.shape[0] != A_tmp.shape[1]:
                print(f"[QC]  ATTENZIONE: connectome non quadrato: shape={A_tmp.shape}")
            bad = [int(i) for i in ctrl_ids if not (1 <= i <= nA)]
            if bad:
                print(f"[QC]  ATTENZIONE: ID fuori range [1..{nA}]: {bad[:10]}{' ...' if len(bad) > 10 else ''}")
            else:
                print(f"[QC]  Range ID compatibile con connectome (N={nA}) ✅")
        except Exception as e:
            print(f"[QC]  Nota: non riesco a leggere {conn.name} ({e})")
else:
    print("\n[QC] ATTENZIONE: control_set_ROIs_B0_MONO_top1_perContact.txt non trovato.")

# --- 4) Bundle Excel con output principali (come nel blocco originale) ---
with pd.ExcelWriter(OUT / "nct_outputs_bundle.xlsx", engine="openpyxl") as writer:
    for p, name in [
        (OUT/"sEEG_monopolar_aligned_to_MNI.csv", "mono_MNI"),
        (OUT/"sEEG_bipolar_aligned_to_MNI.csv",   "bi_MNI"),
        (OUT/"sEEG_monopolar_aligned_to_B0.csv",  "mono_B0"),
        (OUT/"sEEG_bipolar_aligned_to_B0.csv",    "bi_B0"),
        (OUT/"bbox_summary.csv",                  "bbox"),
        (OUT/"check_bipolar_midpoint.csv",        "midpoint_QC"),
    ]:
        if p.exists():
            pd.read_csv(p).to_excel(writer, sheet_name=name, index=False)
print("\nCreato bundle Excel QC:", OUT / "nct_outputs_bundle.xlsx")
print("[QC] Fine QC completo ✅")




# QC distanza (ROI più vicina per contatto)
GLOBAL: {'N': 163, 'median': 3.1179013495002184, 'mean': 4.3606404403643655, 'p75': 5.334933307124771, 'p90': 9.054064650602902, 'p95': 11.193904067730934, 'max': 15.289535775049597}
BIPOLAR: {'N': 77, 'median': 3.1889018017358635, 'mean': 4.461605892017394, 'p75': 5.338793143566348, 'p90': 9.03706437112547, 'p95': 10.960992178744535, 'max': 15.289535775049597}
MONOPOLAR: {'N': 86, 'median': 3.005098189196934, 'mean': 4.2702411406285155, 'p75': 5.3095441252747815, 'p90': 9.00682530492946, 'p95': 11.076415912739773, 'max': 14.481755895596129}

Frazione di contatti con ROI più vicina entro 6.0 mm (GLOBAL): 77.9%
Outlier globali (> 6.0 mm) salvati in: c:\Users\barba\Visual Studio - Github\NCT\output\controlset_top1_outliers_gt_6.0mm.csv
[QC] bipolar top-1 → {'N': 77, 'median': 3.1889018017358635, 'mean': 4.461605892017394, 'p90': 9.03706437112547, 'max': 15.289535775049597} ; quota ≤ 6.0 mm = 76.6%
[QC]  Outlier bipolar salvati in: c:\Users\bar

CONTROL ENERGY - PROCEDURE 1_paper Parkes et al. -

In [82]:
# Control Energy (Procedure 1 Parkes et al.) adattata ai miei dati:
"""
Adattamento minimo della Procedure 1 (Parkes et al., 2024) al mio setup:
- A (adjacent matrix) da data/connectome850_2b0.xlsx (NxN)
- B (control set) da output/control_set_ROIs_B0_BI_top1_perContact.txt (indici 1-based)
- Stati: Vis -> Default (se labels disponibili) altrimenti fallback KMeans
"""
# - Stati:      iniziale = ROI MONO del mio control set ; target = complemento
# - B (controllo): partial control (1 sui miei nodi, 1e-3 sugli altri)
# - Sistema:    continuous-time, normalizzazione c=1
# Salva figure e risultati in: output/control_energy/ 
# -----------------------------------------------------------

CTL_TXT   = OUT / "control_set_ROIs_B0_BI_top1_perContact.txt"   #  control set
CONN_XLSX = BASE / "connectome850_2b0.xlsx"                    #  connettoma strutturale

# -- Caricamento CONTROL SET (lista di indici 1-based, con duplicati e ordinati) --
try:
    ctl_ids = np.loadtxt(CTL_TXT, dtype=int, ndmin=1)  # mantiene duplicati e ordine
except Exception:
    # fallback robusto se il file avesse righe/spazi strani
    ctl_ids = []
    with open(CTL_TXT, "r") as f:
        for line in f:
            line = line.strip()
            if line:
                ctl_ids.append(int(float(line)))
    ctl_ids = np.array(ctl_ids, dtype=int)

print(f"[CHECK] Control set (BI per-contatto): letti {ctl_ids.size} indici da {CTL_TXT.name}")
if ctl_ids.size != 77:
    print("[WARNING] Il control set non ha 77 righe. Controlla l’estrazione per-contatto e MAX_DIST=None.")

# -- Caricamento CONNETTOMA da Excel→ numpy, fix semplice se non quadrato --
#   - header=None per evitare che la prima riga diventi header
#   - coerce numeric, drop righe/colonne completamente NaN
# Next, we load a structural connectome as our adjacency matrix:
# # directory where data is stored 
# datadir = '/path/to/data'   
# adjacency_file = 'structural_connectome.npy' 
# # load adjacency matrix 
# adjacency = np.load(os.path.join(datadir, adjacency_file))

def load_adjacency_from_excel(path: Path) -> np.ndarray:
    df = pd.read_excel(path, header=None)
    # Forza numerico, elimina colonne/righe completamente vuote
    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(axis=0, how="all").dropna(axis=1, how="all")
    A = df.to_numpy(dtype=float)

    # Se non quadrata, tenta una correzione semplice (drop prima riga/col)
    if A.shape[0] != A.shape[1]:
        # prova drop col0
        if A.shape[1] - 1 == A.shape[0]:
            A = A[:, 1:]
        # prova drop row0
        elif A.shape[0] - 1 == A.shape[1]:
            A = A[1:, :]
        # prova drop row0 e col0
        elif A.shape[0] - 1 == A.shape[1] - 1:
            A = A[1:, 1:]
    assert A.shape[0] == A.shape[1], f"Matrice non quadrata anche dopo fix: {A.shape}"
    # Sostituisci eventuali NaN con 0
    A = np.nan_to_num(A, copy=False)
    return A

A = load_adjacency_from_excel(CONN_XLSX)
N = A.shape[0]  #n_nodes
print(f"[CHECK] Connettoma caricato: shape={A.shape}")
# -- PARCELLATION: 800 vs 850 e coerenza con control set --
if N == 850:
    print("[CHECK] Parcellazione = 850 ✅")
elif N == 800:
    print("[CHECK] Parcellazione = 800 ⚠️ verifica control set")
else:
    print(f"[CHECK] Parcellazione non standard (N={N}) ⚠️")
# range e duplicati nel control set
if ctl_ids.min() < 1 or ctl_ids.max() > N:
    raise ValueError(f"[CHECK] ID control set fuori range 1..{N} (min={ctl_ids.min()}, max={ctl_ids.max()}).")

# pulizia A: simmetria e diagonale
if not np.isfinite(A).all():
    raise ValueError("[CHECK] A contiene NaN/±inf.")
# simmetria (per reti non dirette ci si aspetta A≈A^T)
asym = np.linalg.norm(A - A.T, ord='fro') / max(1.0, np.linalg.norm(A, ord='fro'))
if asym > 1e-6:
    print(f"[CHECK] A non perfettamente simmetrica (relFro={asym:.2e}) → simmetrizzo.")
    A = (A + A.T) / 2.0
# diagonale: molte pipeline NCT usano diag=0 (niente self-loops)
n_diag_nz = int(np.sum(np.abs(np.diag(A)) > 1e-12))
if n_diag_nz > 0:
    print(f"[CHECK] Diagonale con {n_diag_nz} valori ≠0 → azzero.")
    np.fill_diagonal(A, 0.0)

# -- Salvataggi “puliti” e report sintetico --
pd.DataFrame(A).to_csv(OUT / "A_struct_clean.csv", index=False, header=False)
pd.DataFrame([{
    "N": N,
    "control_set_size": int(ctl_ids.size),
    "control_set_min": int(ctl_ids.min()),
    "control_set_max": int(ctl_ids.max()),
    "diag_nonzero_before": n_diag_nz,
    "asymmetry_relFro": float(asym),
}]).to_csv(OUT / "control_energy_qc_summary.csv", index=False)

# -- Preparazione vettore b e matrice B per la NCT --
# b: vettore di controllo (1 se ROI controllata, 0 altrimenti), 1-based → 0-based
b = np.zeros(N, dtype=float)
b[ctl_ids - 1] = 1.0    # le righe duplicate non cambiano il risultato (assegnazioni ripetute a 1)
pd.DataFrame(b, columns=["b"]).to_csv(OUT / "B_vector_diag.csv", index=False)
# matrice B diag (NxN) se si preferisce lo stile B=diag(b)
B = np.diag(b)
pd.DataFrame(B).to_csv(OUT/'B_diag_matrix.csv', index=False, header=False)


[CHECK] Control set (BI per-contatto): letti 77 indici da control_set_ROIs_B0_BI_top1_perContact.txt
[CHECK] Connettoma caricato: shape=(850, 850)
[CHECK] Parcellazione = 850 ✅
[CHECK] Diagonale con 833 valori ≠0 → azzero.


In [83]:
# ============================ Time System — passo 1 =========================================
# 1. Define a time system. Determine whether to model the linear dynamical system in discrete 
# or continuous time:
# system = "discrete" # or system = "continuous"
# --- come nel paper: continuo ---
system = "continuous"

# ============================ Normalizzazione di A — passo 2 =========================================
# 2. Normalize the adjacency matrix. Once a time system has been determined, normalize the 
# adjacency matrix, A:
# adjacency_norm = matrix_normalization(A=adjacency, system=system, c=1)
A_norm = matrix_normalization(A, system=system, c=1)
print("A_norm pronto (system=continuous, c=1).")

# ========================= Definizione degli stati del control task — passo 3 ============================
# 3. Definizione degli stati iniziale e target (x0, xf) dal control set
def load_roi_list_1based_to_mask(txt_path: Path, n_nodes: int) -> np.ndarray:
    ids = []
    with open(txt_path, "r") as f:
        for line in f:
            parts = line.replace(",", " ").split()
            if parts:
                try:
                    idx1 = int(float(parts[0]))
                    i0 = idx1 - 1
                    if 0 <= i0 < n_nodes:
                        ids.append(i0)
                except ValueError:
                    pass
    ids = sorted(set(ids))
    mask = np.zeros(n_nodes, dtype=bool)
    mask[ids] = True
    return mask

initial_mask = load_roi_list_1based_to_mask(CTL_TXT, N) # 77 ROI da BI per-contatto
n_ctrl = int(initial_mask.sum())
print(f"[STATE] #ROI controllate (da BI per-contatto, deduplicate) = {n_ctrl} / {N}")

target_mask  = ~initial_mask    # è solo una mask per distinguere nodi controllati vs non controllati
                                # (utile per B e per i plot)
strength = A.sum(axis=0)    # Strength (diagnostica)
print("  initial mean strength: {:.2f}".format(np.mean(strength[initial_mask])))
print("  target  mean strength: {:.2f}".format(np.mean(strength[target_mask])))

# Caricamento da MATLAB degli stati x0 = x_interictal & xf = x_ictal
STATE_MATE = BASE / "states_NCT_from_sEEG.mat"
mat_states = loadmat(STATE_MATE)

x_interictal = np.asarray(mat_states["x_interictal_80_150"], dtype=float).ravel()
x_ictal      = np.asarray(mat_states["x_ictal_80_150"],      dtype=float).ravel()

print("x_interictal shape:", x_interictal.shape)
print("x_ictal      shape:", x_ictal.shape)

assert x_interictal.shape[0] == N == A.shape[0]
assert x_ictal.shape[0] == N == A.shape[0]

# --- Normalizzazione come nel paper: stato di norma unitaria ---
initial_state_raw = x_interictal # stato iniziale grezzo (intercritico, baseline)
target_state_raw  = x_ictal      # stato target grezzo (ictale)

initial_state = normalize_state(x_interictal)
target_state  = normalize_state(x_ictal)

# salvo gli stati usati (opzionale)
np.save(OUT / "initial_state_850_sEEG.npy", np.asarray(initial_state, dtype=float))
np.save(OUT / "target_state_850_sEEG.npy",  np.asarray(target_state, dtype=float))
print("[STATE] Stati iniziale e target presi da PSD sEEG (non-binari, normalizzati).")

"adesso, initial state = x_interictal normalizzato ; target state = x_ictal normalizzato;"
"non c'è più alcun complemento: sono proprio i pattern di PSD z-scored normalizzati calcolati in MATLAB"

# == Partial control (1 su miei nodi, 1e-3 sugli altri): con questo scelgo dove applicare i control signals
control_set = np.zeros((N, N), dtype=float)
control_set[initial_mask, initial_mask] = 1.0 # controllo completo sui nodi iniziali
control_set[~initial_mask, ~initial_mask] = 1e-7 # controllo minimo sui nodi non iniziali (mettere a 0)

# == Heatmap dell’adjacency riordinata per stato (opzionale)
if sns is not None:
    order = np.r_[np.where(initial_mask)[0], np.where(~initial_mask)[0]]
    f, ax = plt.subplots(figsize=(7, 7))
    sns.heatmap(A[order, :][:, order], ax=ax, square=True, cmap="inferno",
                cbar_kws={"label": "connection strength", "shrink": 0.8})
    ax.set_title("Adjacency (ordinata: control-set prima)")
    f.tight_layout()
    f.savefig((OUT / "control_energy" / "adjacency_heatmap.png"), dpi=300,
              bbox_inches="tight", pad_inches=0.01)
    plt.close(f)

# ============================ Calcolo Control Energy — passo 4 ==============================
# Calcolo dei control signals u(t) & state trajectory x(t).
# parameteri e risoluzione 
time_horizon = 1             # come nel paper (unità arbitraria)
rho = 1                      # optimal control (peso uguale a traiettoria e input)
S = np.eye(N)  # S=trajectory_constraints: vincola tutti i nodi (optimal control)

# ottengo la state trajectory x(t) & i control signals u(t)
state_trajectory, control_signals, numerical_error = get_control_inputs(
    A_norm=A_norm,
    T=time_horizon,
    B=control_set,
    x0=initial_state,
    xf=target_state,
    system=system,
    rho=rho,
    S=S,
)

# Check inversion / reconstruction errors
thr = 1e-8  #threshold
print("inversion error = {:.2E} (<{:.2E}={:})".format(numerical_error[0], thr, numerical_error[0] < thr))
print("reconstruction error = {:.2E} (<{:.2E}={:})".format(numerical_error[1], thr, numerical_error[1] < thr))

# ============================ Plot x(t) & u(t) — passo 5 ============================
# Nota: nctpy ritorna di solito matrici shape (T, N). Il paper usa righe=tempo, colonne=nodi.
mask_init = initial_state_raw > np.median(initial_state_raw) # initial = nodi con PSD interictale alta (sopra la mediana del pattern interictale)
mask_target = target_state_raw > np.median(target_state_raw) # target = nodi con PSD ictale alta (sopra la mediana del pattern ictale)
mask_byst = ~(mask_init | mask_target)  # bystander = tutto il resto
# queste maschere servono solo per suddividere l'energia per gruppi funzionali e colorare i plot.

T_steps = state_trajectory.shape[0]           # es. 1001
t = np.linspace(0, time_horizon, T_steps)     # asse del tempo [0, T]

f, ax = plt.subplots(3, 2, figsize=(7, 7))

# A | control signals, x0, "initial-dominant"
if np.any(mask_init):
    ax[0, 0].plot(t, control_signals[:, mask_init], linewidth=0.75)
ax[0, 0].set_title("A | control signals, x0")

# B | neural activity, x0
if np.any(mask_init):
    ax[0, 1].plot(t, state_trajectory[:, mask_init], linewidth=0.75)
ax[0, 1].set_title("B | neural activity, x0")

# C | control signals, xf
if np.any(mask_target):
    ax[1, 0].plot(t, control_signals[:, mask_target], linewidth=0.75)
ax[1, 0].set_title("C | control signals, xf")

# D | neural activity, xf
if np.any(mask_target):
    ax[1, 1].plot(t, state_trajectory[:, mask_target], linewidth=0.75)
ax[1, 1].set_title("D | neural activity, xf")

# E | control signals, bystanders
if np.any(mask_byst):
    ax[2, 0].plot(t, control_signals[:, mask_byst], linewidth=0.75)
ax[2, 0].set_title("E | control signals, bystanders")

# F | neural activity, bystanders
if np.any(mask_byst):
    ax[2, 1].plot(t, state_trajectory[:, mask_byst], linewidth=0.75)
ax[2, 1].set_title("F | neural activity, bystanders")

for cax in ax.reshape(-1):
    cax.set_ylabel("activity")
    cax.set_xlabel("time (a.u.)")
    cax.set_xlim(t[0], t[-1])
    # tick a 0 e T (coerenti con l’asse del tempo esplicito)
    cax.set_xticks([t[0], t[-1]])    
    cax.set_xticklabels([0, time_horizon])

f.tight_layout()
(OUT / "control_energy").mkdir(exist_ok=True)
f.savefig(OUT / "control_energy" / "plot_xu_850.svg", dpi=600, bbox_inches="tight", pad_inches=0.01)
plt.close(f)

# ============================ Control energy — passo 6 =====================================
# integro i control signals
def integrate_u_compat(u, T):
    """
    Compat fallback per integrare l'energia:
    restituisce il vettore per-nodo ∫ u(t)^2 dt usando Simpson se disponibile
    oppure trapezi come fallback.
    u: array shape (T_steps, N)
    """
    try:
        # prova ad usare nctpy.energies.integrate_u
        return integrate_u(u)
    except Exception:
        # fallback robusto (trapezi): differenze minime con molti step
        
        dt = T / (u.shape[0] - 1)
        return np.trapezoid(u**2, dx=dt, axis=0)

# (A) Calcolo node-level control energy integrando i control signals:
node_energy = integrate_u_compat(control_signals, time_horizon)  # shape (N,)
print("node_energy shape:", node_energy.shape)
# salvataggio
np.save(OUT / "node_energy_850.npy", np.asarray(node_energy, dtype=float))
print("[SAVE] node_energy_850.npy salvato in", OUT)

# (B) Riassumo nodal energy:
energy = float(np.sum(node_energy))
print("Total control energy:", round(energy, 4))

# ============================ Salvataggi e riepilogo ===================================
RESULT = OUT / "control_energy"
RESULT.mkdir(exist_ok=True)
pd.DataFrame({"node": np.arange(1, N + 1), "node_energy": node_energy}).to_csv(
    RESULT / "node_energy_850.csv", index=False
)
with open(RESULT / "summary.txt", "w") as fsum:
    fsum.write(
        f"N={N}\n"
        f"energy={energy:.10f}\n"
        f"inversion_err={numerical_error[0]:.4e}\n"
        f"recon_err={numerical_error[1]:.4e}\n"
        f"control_set_file={CTL_TXT.name}\n"
    )
print(f"[DONE] Risultati scritti in: {RESULT}")




A_norm pronto (system=continuous, c=1).
[STATE] #ROI controllate (da BI per-contatto, deduplicate) = 50 / 850
  initial mean strength: 17071.86
  target  mean strength: 16975.72
x_interictal shape: (850,)
x_ictal      shape: (850,)
[STATE] Stati iniziale e target presi da PSD sEEG (non-binari, normalizzati).
inversion error = 1.49E-10 (<1.00E-08=True)
reconstruction error = 3.12E-09 (<1.00E-08=True)
node_energy shape: (850,)
[SAVE] node_energy_850.npy salvato in c:\Users\barba\Visual Studio - Github\NCT\output
Total control energy: 35.9815
[DONE] Risultati scritti in: c:\Users\barba\Visual Studio - Github\NCT\output\control_energy


In [85]:
# Siccome nel .mat file ho coppie di stati in riferimento a diverse bande di frequenza,
# posso ripetere il calcolo del control energy per tutte le bande disponibili.
# Per farlo, creo un ciclo che itera sulle coppie di stati nel .mat file,
# e salva i risultati in cartelle separate per ogni banda.
# Caricamento da MATLAB degli stati x0 = x_interictal & xf = x_ictal
STATE_MATE = BASE / "states_NCT_from_sEEG.mat"
mat_states = loadmat(STATE_MATE)

# elenco coppie di stati dal .mat file
BANDS = {
    "80_150": ("x_interictal_80_150", "x_ictal_80_150"),
    "80_250": ("x_interictal_80_250", "x_ictal_80_250"),
    "150_250": ("x_interictal_150_250", "x_ictal_150_250"),
    "250_500": ("x_interictal_250_500", "x_ictal_250_500"),
    "13_30": ("x_interictal_13_30", "x_ictal_13_30"),
}

# definisco una funzione che esegua tutta la pipeline per una banda:
def run_control_energy_for_band(
    band_id: str,
    name_x0: str,
    name_xf: str,
    mat_states,
    A_norm,
    control_set,
    time_horizon: float,
    system: str,
    rho: float,
    S: np.ndarray,
    initial_mask: np.ndarray,
    OUT: Path
):
    """
    Esegue tutta la pipeline NCT per una singola banda:
    - carica x0, xf dal .mat
    - normalizza
    - calcola control signals & traiettoria
    - calcola energia per nodo
    - salva CSV/figure specifiche per quella banda
    """
    print(f"\n=== Banda {band_id} Hz ===")
    
    # 1) prendo gli stati grezzi dal .mat (ROI-level, 850 x 1)
    x0_raw = np.asarray(mat_states[name_x0], dtype=float).ravel()
    xf_raw = np.asarray(mat_states[name_xf], dtype=float).ravel()

    assert x0_raw.shape[0] == A_norm.shape[0] == control_set.shape[0]

    # 2) normalizzazione a norma unitaria (come Parkes)
    x0 = normalize_state(x0_raw)
    xf = normalize_state(xf_raw)

    # 3) controllo ottimo
    state_traj, control_signals, num_err = get_control_inputs(
        A_norm=A_norm,
        T=time_horizon,
        B=control_set,
        x0=x0,
        xf=xf,
        system=system,
        rho=rho,
        S=S,
    )

    # Check inversion / reconstruction errors
    thr = 1e-8  #threshold
    print("inversion error = {:.2E} (<{:.2E}={:})".format(num_err[0], thr, num_err[0] < thr))
    print("reconstruction error = {:.2E} (<{:.2E}={:})".format(num_err[1], thr, num_err[1] < thr))

    # 4) energia per nodo
    node_energy = integrate_u_compat(control_signals, time_horizon)   # shape (N,)
    total_E = float(node_energy.sum())
    ctrl_E  = float(node_energy[initial_mask].sum())
    nonctrl_E = float(node_energy[~initial_mask].sum())

    print(f"  inversion_err      = {num_err[0]:.2e}")
    print(f"  reconstruction_err = {num_err[1]:.2e}")
    print(f"  total energy       = {total_E:.4f}")
    print(f"    ↳ controllers    = {ctrl_E:.4f}")
    print(f"    ↳ non-controllers = {nonctrl_E:.4f}")

    # 5) salvataggi per quella banda
    result_dir = OUT / "control_energy"
    result_dir.mkdir(exist_ok=True)
        # energia per nodo (.npy + .csv)
    np.save(result_dir / f"node_energy_850_{band_id}.npy", node_energy)

    pd.DataFrame({
        "node": np.arange(1, len(node_energy) + 1),
        "node_energy": node_energy,
        "is_controller": initial_mask.astype(int),
    }).to_csv(result_dir / f"node_energy_850_{band_id}.csv", index=False)

    # summary testuale (.txt)
    with open(result_dir / f"summary_{band_id}.txt", "w") as f:
        f.write(
            f"band={band_id}\n"
            f"energy={total_E:.10f}\n"
            f"energy_controllers={ctrl_E:.10f}\n"
            f"energy_noncontrollers={nonctrl_E:.10f}\n"
            f"inversion_err={num_err[0]:.4e}\n"
            f"recon_err={num_err[1]:.4e}\n"
        )


# ----- Propagazione nel tempo: energia istantanea controller vs non- -----
# Per verificare se l'energia si propaga (la dinamica), posso guardare il profilo temporale dell'energia
# Dopo aver chiamato get_control_inputs (hao control_signals shape (T_steps, N)), posso:
    dt = time_horizon / (control_signals.shape[0] - 1)
    t = np.linspace(0, time_horizon, control_signals.shape[0])

    # energia istantanea separata per gruppo (controllers vs non)
    E_t_ctrl    = np.sum(control_signals[:, initial_mask]**2, axis=1) * dt
    E_t_nonctrl = np.sum(control_signals[:, ~initial_mask]**2, axis=1) * dt

    plt.figure(figsize=(6,4))
    plt.plot(t, E_t_ctrl, label="controllers")
    plt.plot(t, E_t_nonctrl, label="non-controllers")
    plt.xlabel("time (a.u.)")
    plt.ylabel("instantaneous energy (controllers vs non)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(OUT / "control_energy" / f"energy_time_{band_id}.png", dpi=300)
    plt.close()

# Interpretazione:
# - se all'inizio l’energia è concentrata sui controllers e poi cresce sui non-controllers, 
#   => sto osservando la propagazione lungo la rete; 
# - se invece l’energia rimane quasi tutta sui controllers,
#   => la dinamica non propaga molto l’input di controllo (localizzazione senza coinvolgimento del resto della rete)

    # --- HEATMAP DI |u(t)| ---
    plt.figure(figsize=(7,5))
    plt.imshow(control_signals.T, aspect="auto", interpolation="nearest",
            extent=[t[0], t[-1], 0, N])
    plt.colorbar(label="u(t)")
    plt.xlabel("time (a.u.)")
    plt.ylabel("node index")
    plt.title(f"Control signals per node — {band_id} Hz")
    plt.tight_layout()
    plt.savefig(OUT / "control_energy" / f"u_heatmap_{band_id}.png", dpi=300)
    plt.close()

# ritorno i risultati per analisi cross-banda
    return {
        "band": band_id,
        "total_E": total_E,
        "ctrl_E": ctrl_E,
        "nonctrl_E": nonctrl_E,
        "errors": num_err,
        "node_energy": node_energy,
        "state_traj": state_traj,
        "u": control_signals,
        "x0_raw": x0_raw,
        "xf_raw": xf_raw,
    }


# ============== Loop su tutte le bande ===============
results = []
for band_id, (name_x0, name_xf) in BANDS.items():
    res = run_control_energy_for_band(
        band_id=band_id,
        name_x0=name_x0,
        name_xf=name_xf,
        mat_states=mat_states,
        A_norm=A_norm,
        control_set=control_set,
        time_horizon=time_horizon,
        system=system,
        rho=rho,
        S=S,
        initial_mask=initial_mask,
        OUT=OUT,
    )
    results.append(res)

    # Riepilogo generale in un CSV
summary_df = pd.DataFrame(
    [
        {
            "band": r["band"],
            "total_E": r["total_E"],
            "ctrl_E": r["ctrl_E"],
            "nonctrl_E": r["nonctrl_E"],
            "inv_err": r["errors"][0],
            "rec_err": r["errors"][1],
        }
        for r in results
    ]
)
summary_df.to_csv(OUT / "control_energy" / "summary_all_bands.csv", index=False)
print("[DONE] summary_all_bands.csv scritto.")

# così per ogni banda ho un file node_energy_850_<banda>.csv con l'energia per ROI/nodo
# e un summary_<banda>.txt + summary_all_bands.csv che dice il totale dell'energia e quanta nei nodi controllori.

# ======================================================================
#        Estrazione e visualizzazione dei principali controller
# ======================================================================

# Carico gli ID delle ROI (B0_TXT)
b0_ids_vec = pd.read_csv(
    B0_TXT, sep=r"[\s,;]+", engine="python", header=None, comment="#"
).iloc[:, 0].astype(int).to_numpy()
assert len(b0_ids_vec) == N

top_k = 100  # quante ROI considero come "principali" per banda

result_dir = OUT / "control_energy"
result_dir.mkdir(exist_ok=True)

for res in results:
    band = res["band"]
    E = res["node_energy"]  # energia per ROI in quella banda

    # ---- (a) top nodi tra TUTTE le ROI ----
    idx_sorted = np.argsort(E)[::-1]  # indici ordinati per energia decrescente
    idx_top = idx_sorted[:top_k]

    df_top_all = pd.DataFrame(
        {
            "rank": np.arange(1, len(idx_top) + 1),
            "roi_index": idx_top + 1,  # indice 1-based
            "roi_id": b0_ids_vec[idx_top],  # ID usato nel connectome
            "is_controller": initial_mask[idx_top],  # True se ROI ha contatti sEEG
            "node_energy": E[idx_top],
        }
    )
    df_top_all.to_csv(
        result_dir / f"top_nodes_all_{band}.csv",
        index=False,
    )

    # ---- (b) top nodi SOLO tra i controller (ROI con sEEG) ----
    idx_ctrl = np.where(initial_mask)[0]  # indici dei controller forti
    E_ctrl = E[idx_ctrl]
    order_ctrl_local = np.argsort(E_ctrl)[::-1]
    idx_top_ctrl = idx_ctrl[order_ctrl_local[:top_k]]

    df_top_ctrl = pd.DataFrame(
        {
            "rank": np.arange(1, len(idx_top_ctrl) + 1),
            "roi_index": idx_top_ctrl + 1,
            "roi_id": b0_ids_vec[idx_top_ctrl],
            "node_energy": E[idx_top_ctrl],
        }
    )
    df_top_ctrl.to_csv(
        result_dir / f"top_nodes_controllers_{band}.csv",
        index=False,
    )

    # ---- (c) bar-plot dei principali controller ----
    plt.figure(figsize=(8, 4))
    plt.bar(np.arange(len(idx_top_ctrl)), E[idx_top_ctrl])
    plt.xticks(
        np.arange(len(idx_top_ctrl)),
        [str(b0_ids_vec[i]) for i in idx_top_ctrl],
        rotation=90,
    )
    plt.ylabel("node control energy")
    plt.xlabel("ROI ID (controllers)")
    plt.title(f"Top {len(idx_top_ctrl)} controller nodes — banda {band} Hz")
    plt.tight_layout()
    plt.savefig(
        result_dir / f"top_controllers_bar_{band}.png",
        dpi=300,
    )
    plt.close()

print("[DONE] top_nodes_* e barplot dei controller principali per ogni banda.")

# per ogni banda ottengo:
# top_nodes_all_<band>.csv → i nodi con energia più alta nell’intera rete,
# top_nodes_controllers_<band>.csv → tra le 77 ROI con contatti sEEG, quali sono i controller principali,
# top_controllers_bar_<band>.png → grafico a barre dell’energia dei controller principali.


=== Banda 80_150 Hz ===
inversion error = 1.49E-10 (<1.00E-08=True)
reconstruction error = 3.12E-09 (<1.00E-08=True)
  inversion_err      = 1.49e-10
  reconstruction_err = 3.12e-09
  total energy       = 35.9815
    ↳ controllers    = 34.1677
    ↳ non-controllers = 1.8139

=== Banda 80_250 Hz ===
inversion error = 1.41E-10 (<1.00E-08=True)
reconstruction error = 3.05E-09 (<1.00E-08=True)
  inversion_err      = 1.41e-10
  reconstruction_err = 3.05e-09
  total energy       = 35.9593
    ↳ controllers    = 34.1414
    ↳ non-controllers = 1.8179

=== Banda 150_250 Hz ===
inversion error = 1.30E-10 (<1.00E-08=True)
reconstruction error = 2.91E-09 (<1.00E-08=True)
  inversion_err      = 1.30e-10
  reconstruction_err = 2.91e-09
  total energy       = 35.5421
    ↳ controllers    = 33.6525
    ↳ non-controllers = 1.8896

=== Banda 250_500 Hz ===
inversion error = 1.22E-10 (<1.00E-08=True)
reconstruction error = 4.00E-09 (<1.00E-08=True)
  inversion_err      = 1.22e-10
  reconstruction_err = 

Post-analisi energia + export CSV→XLSX

In [86]:
# ============================ Analisi energia per gruppo ===================================
# maschere: initial / target / bystanders
mask_init = initial_state_raw > np.median(initial_state_raw) # 
mask_targ = target_state_raw > np.median(target_state_raw)
mask_byst = ~(mask_init | mask_targ)

print("Energia iniziale:", float(node_energy[mask_init].sum()))
print("Energia target  :", float(node_energy[mask_targ].sum()))
print("Energia bystanders:", float(node_energy[mask_byst].sum()))
print("Energia totale:", float(node_energy.sum()))

# (opzionale ma utile) mappa ROI-ID → energia e gruppo, così ho un CSV pronto
b0_ids_vec = pd.read_csv(
    B0_TXT, sep=r"[\s,;]+", engine="python", header=None, comment="#"
).iloc[:, 0].astype(int).to_numpy()
assert len(b0_ids_vec) == len(node_energy), "Numero ID B0 diverso da #ROI dell’energia"

group = np.full(len(node_energy), "bystander", dtype=object)
group[mask_init] = "initial"
group[mask_targ] = "target"

dfE = pd.DataFrame({"roi_id": b0_ids_vec, "node_energy": node_energy, "group": group})
dfE.to_csv(OUT / "node_energy_by_group.csv", index=False)
print("[SAVE] node_energy_by_group.csv salvato in", OUT)

# istogramma
plt.figure()
plt.hist(node_energy, bins=30)
plt.xlabel("Node control energy")
plt.ylabel("Conteggio ROI")
plt.title("Distribuzione energia di controllo (850 ROI)")
plt.tight_layout()
plt.savefig(OUT / "hist_node_energy.png", dpi=300, bbox_inches="tight")
plt.close()
# ============================================================

#conversione file: un .xlsx per ogni .csv nella cartella output
for csv in OUT.glob("*.csv"):
    try:
        df = pd.read_csv(csv)  # se servisse, aggiungo: sep="," , encoding="utf-8"
        xlsx_path = csv.with_suffix(".xlsx")
        df.to_excel(xlsx_path, index=False, engine="openpyxl")
        print(f"Creato: {xlsx_path.name}")
    except Exception as e:
        print(f"Errore su {csv.name}: {e}")


Energia iniziale: 17.372077768890428
Energia target  : 18.340027276125035
Energia bystanders: 16.936328053630444
Energia totale: 35.98154033573532
[SAVE] node_energy_by_group.csv salvato in c:\Users\barba\Visual Studio - Github\NCT\output
Creato: A_struct_clean.xlsx
Creato: bbox_summary.xlsx
Creato: bipolar_top1_outliers_gt_6.0mm.xlsx
Creato: bi_k1_top1_outliers_gt_6.0mm.xlsx
Creato: bi_k3_top1_outliers_gt_6.0mm.xlsx
Creato: B_diag_matrix.xlsx
Creato: B_vector_diag.xlsx
Creato: check_bipolar_midpoint.xlsx
Creato: contact2roi_BI_top1.xlsx
Creato: contact2roi_MONO_top1.xlsx
Creato: contacts_vs_B0_MNI_nearest.xlsx
Creato: controlset_map_bipolar_k1.xlsx
Creato: controlset_map_monopolar_k1.xlsx
Creato: controlset_map_monopolar_k3.xlsx
Creato: controlset_top1_outliers_gt_6.0mm.xlsx
Creato: control_energy_qc_summary.xlsx
Creato: monopolar_top1_outliers_gt_6.0mm.xlsx
Creato: mono_k1_top1_outliers_gt_6.0mm.xlsx
Creato: mono_k3_top1_outliers_gt_6.0mm.xlsx
Creato: node_energy_by_group.xlsx
Creato