
# CNT Topology Validation & Stability Map — Phase IV

**Purpose**  
Turn CNT's *visible topology* into *verified science* by:
1. **Quantitative validation** — compare CNT field dynamics against real networks (EEG, gene co‑expression, etc.).  
2. **Stability mapping** — find collapse margins (\(K_c\), knee, resilience) via Kuramoto/Ising on your graphs.  
3. **Auto‑report** — export a concise markdown/HTML report + CSV artifacts.

**Outputs** (saved under `artifacts_cnt_validation_<timestamp>/`):  
- `metrics_summary.csv` — graph + dynamics metrics per dataset  
- `validation_results.csv` — permutation tests, effect sizes, p‑values  
- `stability_map.csv` — K vs R_mean per dataset  
- `report.md` and `report.html` — one‑pager: methods, tables, highlights

> Tip: You can run the **One‑Click Pipeline** near the bottom once you set input paths.


In [1]:

# %% [setup] Environment & Paths
import os, sys, math, json, glob, time, warnings, pathlib
from pathlib import Path
from datetime import datetime
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

try:
    import networkx as nx
except Exception:
    print("Installing networkx…")
    %pip -q install networkx
    import networkx as nx

try:
    import scipy
    from scipy import stats, linalg, signal
except Exception:
    print("Installing scipy…")
    %pip -q install scipy
    import scipy
    from scipy import stats, linalg, signal

# Optional: plotly for 3D previews (not required for headless runs)
try:
    import plotly.graph_objects as go
except Exception:
    pass

# Resolve CNT base dirs (customize if needed)
CNT_LAB_DIR = os.environ.get("CNT_LAB_DIR")
WINDOWS_GUESSES = [r"E:\CNT", r"C:\Users\caleb\CNT_Lab", r"C:\Users\caleb\CNT"]
if CNT_LAB_DIR is None:
    for guess in WINDOWS_GUESSES:
        if Path(guess).exists():
            CNT_LAB_DIR = guess
            break
if CNT_LAB_DIR is None:
    CNT_LAB_DIR = str(Path.cwd())  # fallback

# Timestamped artifact dir (created inside current working dir)
_ts = datetime.utcnow().strftime("%Y%m%d-%H%M%SZ")
ARTIFACTS_DIR = Path(f"artifacts_cnt_validation_{_ts}")
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)

print("CNT_LAB_DIR:", CNT_LAB_DIR)
print("Artifacts  :", ARTIFACTS_DIR.resolve())


CNT_LAB_DIR: E:\CNT
Artifacts  : E:\CNT\notebooks\archive\artifacts_cnt_validation_20251104-031414Z


### Data — discover & load
Provide paths to **real** networks (adjacency or correlation matrices) and optional **3D coordinates**.

In [2]:

# %% [data] Discovery & loaders
import itertools
from typing import Optional, Tuple, Dict
from pathlib import Path

def discover_csvs(base_dirs, patterns=("eeg","coexp","gene","corr","adj","connect","matrix")):
    """
    Recursively search for CSV files whose names hint at adjacency/correlation matrices.
    Returns a dict {name: path}.
    """
    hits = {}
    for base in base_dirs:
        base = Path(base)
        if not base.exists(): 
            continue
        for p in base.rglob("*.csv"):
            name = p.name.lower()
            if any(k in name for k in patterns):
                key = p.stem
                if key in hits:
                    key = f"{key}__{abs(hash(str(p)))%99999}"
                hits[key] = str(p)
    return hits

def load_matrix(path: str, header=True) -> np.ndarray:
    """
    Load an NxN numeric matrix from CSV. Accepts header/no-header.
    """
    df = pd.read_csv(path, header=0 if header else None)
    # If first column looks like an index, drop it
    if df.shape[1] == df.shape[0] + 1 and not pd.to_numeric(df.columns[0], errors='coerce').notna().all():
        df = df.iloc[:,1:]
    mat = df.values.astype(float)
    if mat.shape[0] == mat.shape[1]:
        mat = (mat + mat.T) / 2.0
        np.fill_diagonal(mat, 0.0)
    return mat

def corr_to_graph(corr: np.ndarray, threshold: float=0.35, keep_sign=False) -> nx.Graph:
    """
    Build an undirected graph from a correlation matrix using a threshold.
    If keep_sign=True, store edge attribute "sign" as np.sign(corr_ij).
    """
    n = corr.shape[0]
    G = nx.Graph()
    G.add_nodes_from(range(n))
    for i in range(n):
        for j in range(i+1, n):
            c = corr[i,j]
            if abs(c) >= threshold:
                sgn = 1 if c >= 0 else -1
                G.add_edge(i,j, weight=float(abs(c)))
                if keep_sign:
                    G[i][j]['sign'] = sgn
    return G

def adj_to_graph(adj: np.ndarray, weighted=False) -> nx.Graph:
    """
    Build a graph from adjacency matrix; if weighted=False, any nonzero is an edge with weight=1.
    """
    n = adj.shape[0]
    G = nx.Graph()
    G.add_nodes_from(range(n))
    for i in range(n):
        for j in range(i+1, n):
            v = adj[i,j]
            if v != 0:
                w = float(v if weighted else 1.0)
                G.add_edge(i,j, weight=w)
    return G

# Example: Auto-discover candidates (preview first 5)
CANDIDATES = discover_csvs([CNT_LAB_DIR, Path(CNT_LAB_DIR)/'artifacts', Path.cwd()])
print(f"Discovered {len(CANDIDATES)} candidate CSV(s).")
list(itertools.islice(CANDIDATES.items(), 5))


Discovered 74 candidate CSV(s).


[('cnt_correlates_report_20251015-163558',
  'E:\\CNT\\notebooks\\archive\\cnt_correlates_report_20251015-163558.csv'),
 ('cnt_correlates_report_20251015-164130',
  'E:\\CNT\\notebooks\\archive\\cnt_correlates_report_20251015-164130.csv'),
 ('top_gini_genes',
  'E:\\CNT\\notebooks\\archive\\cnt_runs\\3i_atlas_checkin\\20251029-054356Z\\top_gini_genes.csv'),
 ('top_gini_genes__88859',
  'E:\\CNT\\notebooks\\archive\\cnt_runs\\3i_atlas_checkin\\20251029-060737Z\\top_gini_genes.csv'),
 ('cfe_EEGBCI_subj1_motor_20251015-143523',
  'E:\\CNT\\notebooks\\archive\\cnt_mega_out\\tables\\cfe_EEGBCI_subj1_motor_20251015-143523.csv')]

### Metrics & Nulls — compute graph properties and degree‑preserving randomizations

In [3]:

# %% [metrics] Graph metrics & nulls

import itertools
from math import log2

def shannon_entropy(seq):
    if len(seq)==0: return 0.0
    vals, counts = np.unique(seq, return_counts=True)
    p = counts / counts.sum()
    return float(-(p * np.log2(p)).sum())

def graph_metrics(G: nx.Graph) -> dict:
    n = G.number_of_nodes()
    m = G.number_of_edges()
    deg = np.array([d for _,d in G.degree()])
    cc = nx.average_clustering(G, weight='weight' if nx.is_weighted(G) else None)
    # Greedy modularity communities (may be slow for very large graphs)
    try:
        from networkx.algorithms.community import greedy_modularity_communities, modularity
        comms = list(greedy_modularity_communities(G, weight='weight' if nx.is_weighted(G) else None))
        Q = modularity(G, comms, weight='weight' if nx.is_weighted(G) else None)
        n_comms = len(comms)
    except Exception:
        Q, n_comms = np.nan, np.nan
    return {
        "n": n, "m": m,
        "degree_mean": float(deg.mean()) if n>0 else np.nan,
        "degree_std": float(deg.std()) if n>0 else np.nan,
        "degree_entropy": shannon_entropy(deg),
        "clustering_avg": float(cc),
        "modularity_Q": float(Q),
        "communities": float(n_comms) if isinstance(n_comms,(int,float)) else np.nan,
        "density": nx.density(G),
    }

def degree_preserving_null(G: nx.Graph, swaps_per_edge=3, seed=1) -> nx.Graph:
    H = G.copy()
    try:
        nx.double_edge_swap(H, nswap=max(1, swaps_per_edge*H.number_of_edges()), max_tries=100*H.number_of_edges(), seed=seed)
    except Exception:
        pass
    return H


### Dynamics — Kuramoto sweep to estimate $K_c$, knee, and resilience

In [4]:

# %% [dynamics] Kuramoto + Kc detection

def simulate_kuramoto(G: nx.Graph, K: float, T: float=50.0, dt: float=0.05, seed: int=0, omega_std: float=1.0):
    """
    Simple Kuramoto on an undirected (possibly weighted) graph.
    Returns order parameter R_mean and the time-series R_t.
    """
    rng = np.random.default_rng(seed)
    n = G.number_of_nodes()
    if n == 0:
        return 0.0, np.array([])
    theta = rng.uniform(0, 2*np.pi, size=n)
    omega = rng.normal(0.0, omega_std, size=n)
    A = nx.to_numpy_array(G, weight='weight' if nx.is_weighted(G) else None)
    deg = A.sum(axis=1)
    t_steps = int(T/dt)
    R_t = np.zeros(t_steps, dtype=float)
    for t in range(t_steps):
        coupling = (A * np.sin(theta[:,None] - theta[None,:])).sum(axis=1)
        theta = theta + dt * (omega + (K * coupling / np.maximum(deg, 1e-9)))
        R = np.abs(np.mean(np.exp(1j*theta)))
        R_t[t] = R
    burn = int(0.3*t_steps)
    return float(R_t[burn:].mean()), R_t

def find_knee(x, y, direction='increasing'):
    """
    Lightweight knee detector using smoothed curve and curvature proxy.
    """
    x = np.asarray(x); y = np.asarray(y)
    from scipy.ndimage import gaussian_filter1d
    ys = gaussian_filter1d(y, sigma=1.0)
    dy = np.gradient(ys, x)
    d2 = np.gradient(dy, x)
    if direction == 'increasing':
        score = (dy - dy.min())/(dy.max()-dy.min()+1e-9) + (d2 - d2.min())/(d2.max()-d2.min()+1e-9)
    else:
        score = (dy.max()-dy)/(dy.max()-dy.min()+1e-9) + (d2.max()-d2)/(d2.max()-d2.min()+1e-9)
    idx = int(np.argmax(score))
    return idx

def sweep_k(G, k_grid=None, **sim_kw):
    if k_grid is None:
        k_grid = np.linspace(0.0, 3.0, 25)
    records = []
    R_means = []
    for K in k_grid:
        R_mean, R_t = simulate_kuramoto(G, K=K, **sim_kw)
        R_means.append(R_mean)
        records.append({"K": float(K), "R_mean": float(R_mean)})
    R = np.array(R_means)
    idx_knee = find_knee(k_grid, R, direction='increasing')
    K_knee = float(k_grid[idx_knee])
    idx_Kc = int(np.argmin(np.abs(R - 0.5)))
    K_c = float(k_grid[idx_Kc])
    return pd.DataFrame(records), K_c, K_knee


### Validation — permutation tests vs degree‑preserving nulls

In [5]:

# %% [validation] Permutation tests

def validate_against_nulls(G: nx.Graph, k_grid=None, n_nulls: int=64, seed: int=1):
    """
    Compare real graph's K_c and R(K) curve vs degree-preserving nulls.
    Returns: summary dict and a DataFrame of null results.
    """
    rng = np.random.default_rng(seed)
    sweep_df, K_c_real, K_knee_real = sweep_k(G, k_grid=k_grid, seed=seed)
    near = sweep_df.iloc[(sweep_df['K']-K_c_real).abs().argsort()[:3]]['R_mean'].mean()

    null_rows = []
    for i in range(n_nulls):
        H = degree_preserving_null(G, swaps_per_edge=3, seed=int(seed+i))
        dfH, KcH, KkH = sweep_k(H, k_grid=k_grid, seed=int(seed+i))
        nearH = dfH.iloc[(dfH['K']-K_c_real).abs().argsort()[:3]]['R_mean'].mean()
        null_rows.append({"run": i, "K_c": KcH, "K_knee": KkH, "R_near_Kc(realKc)": float(nearH)})
    null_df = pd.DataFrame(null_rows)
    p_Kc = ( (null_df["K_c"] <= K_c_real).sum() + 1 ) / (len(null_df)+1) if not math.isnan(K_c_real) else np.nan
    p_knee = ( (null_df["K_knee"] <= K_knee_real).sum() + 1 ) / (len(null_df)+1) if not math.isnan(K_knee_real) else np.nan
    p_near = ( (null_df["R_near_Kc(realKc)"] >= near).sum() + 1 ) / (len(null_df)+1) if not math.isnan(near) else np.nan

    summary = {
        "K_c_real": float(K_c_real),
        "K_knee_real": float(K_knee_real),
        "R_near_realKc": float(near),
        "perm_p_Kc_le": float(p_Kc),
        "perm_p_Kknee_le": float(p_knee),
        "perm_p_Rnear_ge": float(p_near),
    }
    return summary, sweep_df, null_df


### 3D Field — edge‑density & memory nodes (optional)

In [6]:

# %% [3D] Field stability scoring (optional)

def load_coords_csv(path: str, cols=('x','y','z')) -> np.ndarray:
    df = pd.read_csv(path)
    for c in cols:
        assert c in df.columns, f"Missing column '{c}' in {path}"
    return df[list(cols)].values.astype(float)

def node_edge_density_score(G: nx.Graph, coords: np.ndarray, radius: float=0.2) -> pd.DataFrame:
    """
    For each node, score local edge density within a ball of "radius" in 3D.
    Returns DataFrame with "node", "density3D", "deg", "betweenness".
    """
    from sklearn.neighbors import KDTree
    N = G.number_of_nodes()
    assert coords.shape[0] == N, "coords must align with G nodes (0..N-1)"
    tree = KDTree(coords)
    deg = dict(G.degree())
    btw = nx.betweenness_centrality(G, normalized=True, weight='weight' if nx.is_weighted(G) else None)
    rows = []
    for i in range(N):
        idx = tree.query_radius(coords[[i]], r=radius)[0]
        S = G.subgraph(idx)
        density = nx.density(S) if S.number_of_nodes()>1 else 0.0
        rows.append({"node": i, "density3D": float(density), "deg": float(deg.get(i,0)), "betweenness": float(btw.get(i,0.0))})
    return pd.DataFrame(rows)

def memory_node_candidates(score_df: pd.DataFrame, top_pct: float=0.05) -> pd.DataFrame:
    # Z-scores
    for col in ["density3D","betweenness","deg"]:
        score_df[f"z_{col}"] = (score_df[col] - score_df[col].mean()) / (score_df[col].std()+1e-9)
    score_df["memory_score"] = score_df["z_density3D"] + 0.7*score_df["z_betweenness"] - 0.3*score_df["z_deg"]
    k = max(1, int(len(score_df)*top_pct))
    return score_df.sort_values("memory_score", ascending=False).head(k).reset_index(drop=True)


### Reporting — CSVs + Markdown/HTML summary

In [7]:

# %% [report] Writers

from datetime import datetime
from pathlib import Path

def write_csv(df: pd.DataFrame, name: str) -> Path:
    p = ARTIFACTS_DIR / name
    df.to_csv(p, index=False)
    print("Saved:", p)
    return p

def render_markdown_report(summaries: list, metrics_rows: list, stability_rows: list, outfile_md="report.md"):
    dt = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
    md = []
    md.append("# CNT Topology Validation — Auto Report\n")
    md.append(f"_Generated: {dt}_\n")
    md.append("\n## Highlights\n")
    for sm in summaries:
        s = sm["summary"]
        line = "- **{label}** — K_c={Kc:.3f}, knee={Kk:.3f}, p(K_c)={pKc:.3f}, p(knee)={pKk:.3f}, p(R_near)={pRn:.3f}".format(
            label=sm["label"], Kc=s["K_c_real"], Kk=s["K_knee_real"], pKc=s["perm_p_Kc_le"], pKk=s["perm_p_Kknee_le"], pRn=s["perm_p_Rnear_ge"]
        )
        md.append(line)
    md.append("\n## Methods\n- Kuramoto sweep on input graphs; degree-preserving nulls for permutation tests.\n"
              "- Graph metrics: degree entropy, clustering, modularity, density.\n"
              "- Optional 3D memory-node scoring via local edge density.\n")
    md.append("\n## Files\n")
    md.append("- `metrics_summary.csv` — graph properties per dataset")
    md.append("- `validation_results.csv` — permutation results per dataset")
    md.append("- `stability_map.csv` — K vs R_mean per dataset\n")
    text = "\n".join(md)
    p = ARTIFACTS_DIR / outfile_md
    p.write_text(text, encoding="utf-8")
    print("Saved:", p)
    html = "<html><head><meta charset='utf-8'><title>CNT Validation Report</title></head><body><pre style='font-family:ui-monospace,Consolas,monospace'>{}</pre></body></html>".format(text)
    ph = ARTIFACTS_DIR / "report.html"
    ph.write_text(html, encoding="utf-8")
    print("Saved:", ph)
    return p, ph


### One‑Click Pipeline — set your inputs and run

In [8]:

# %% [pipeline] Configure your inputs here

# Example inputs (edit these). Provide either correlation matrices (will threshold) or adjacency matrices.
INPUTS = [
    # {"label": "EEG_example", "path": r"C:\Users\caleb\CNT_Lab\artifacts\tables\eeg_adj.csv", "kind": "adj", "weighted": False},
    # {"label": "GENE_example", "path": r"C:\Users\caleb\CNT_Lab\artifacts\tables\gene_corr.csv", "kind": "corr", "threshold": 0.35, "keep_sign": False},
]

# Optional 3D coordinates (x,y,z) per dataset label (for the same node order)
COORDS = {
    # "GENE_example": r"C:\Users\caleb\CNT_Lab\artifacts\tables\gene_coords.csv"
}

# Kuramoto sweep parameters
K_GRID = np.linspace(0.0, 3.0, 25)
SIM_KW = dict(T=40.0, dt=0.05, seed=42, omega_std=1.0)
N_NULLS = 64


In [9]:

# %% [pipeline] Run
metrics_rows = []
validation_rows = []
stability_rows = []
summaries = []

if not INPUTS:
    print("⚠️ No INPUTS configured yet. Edit the INPUTS list above and re-run this cell.")
else:
    for cfg in INPUTS:
        label = cfg["label"]
        path = cfg["path"]
        kind = cfg.get("kind","corr")
        print(f"\n=== {label} ===")
        print("Loading:", path)
        mat = load_matrix(path, header=True)

        if kind == "corr":
            thr = cfg.get("threshold", 0.35)
            keep_sign = cfg.get("keep_sign", False)
            G = corr_to_graph(mat, threshold=thr, keep_sign=keep_sign)
        else:
            G = adj_to_graph(mat, weighted=cfg.get("weighted", False))

        # Metrics
        gm = graph_metrics(G); gm["label"] = label
        metrics_rows.append(gm)

        # Validation vs nulls
        summary, sweep_df, null_df = validate_against_nulls(G, k_grid=K_GRID, n_nulls=N_NULLS, seed=123)
        summaries.append({"label": label, "summary": summary})

        # Stability map rows
        s = sweep_df.copy(); s["label"] = label
        stability_rows.append(s)

        # Validation rows
        nd = null_df.copy(); nd["label"] = label
        validation_rows.append(nd)

        # Optional 3D stability scoring
        if label in COORDS:
            try:
                coords = load_coords_csv(COORDS[label])
                score_df = node_edge_density_score(G, coords, radius=0.2)
                mem_df = memory_node_candidates(score_df, top_pct=0.05)
                write_csv(score_df, f"{label}_3d_density_scores.csv")
                write_csv(mem_df, f"{label}_memory_nodes_top5pct.csv")
            except Exception as e:
                print("3D scoring skipped:", e)

    # Write artifacts
    metrics_df = pd.DataFrame(metrics_rows)
    validation_df = pd.concat(validation_rows, ignore_index=True) if validation_rows else pd.DataFrame()
    stability_df = pd.concat(stability_rows, ignore_index=True) if stability_rows else pd.DataFrame()
    write_csv(metrics_df, "metrics_summary.csv")
    if not validation_df.empty:
        write_csv(validation_df, "validation_results.csv")
    if not stability_df.empty:
        write_csv(stability_df, "stability_map.csv")

    # Report
    render_markdown_report(summaries, metrics_rows, stability_rows, outfile_md="report.md")


⚠️ No INPUTS configured yet. Edit the INPUTS list above and re-run this cell.


### (Optional) Smoke Test — run on a synthetic small‑world graph

In [10]:

# %% [test] Quick smoke test (no real data required)

G_test = nx.watts_strogatz_graph(200, k=8, p=0.15, seed=1)
gm_test = graph_metrics(G_test)
print("Graph metrics:", gm_test)

sweep_df, Kc, Kk = sweep_k(G_test, k_grid=nDiscovered 74 candidate CSV(s).
[('cnt_correlates_report_20251015-163558',
  'E:\\CNT\\notebooks\\archive\\cnt_correlates_report_20251015-163558.csv'),
 ('cnt_correlates_report_20251015-164130',
  'E:\\CNT\\notebooks\\archive\\cnt_correlates_report_20251015-164130.csv'),
 ('top_gini_genes',
  'E:\\CNT\\notebooks\\archive\\cnt_runs\\3i_atlas_checkin\\20251029-054356Z\\top_gini_genes.csv'),
 ('top_gini_genes__88859',
  'E:\\CNT\\notebooks\\archive\\cnt_runs\\3i_atlas_checkin\\20251029-060737Z\\top_gini_genes.csv'),
 ('cfe_EEGBCI_subj1_motor_20251015-143523',
  'E:\\CNT\\notebooks\\archive\\cnt_mega_out\\tables\\cfe_EEGBCI_subj1_motor_20251015-143523.csv')]p.linspace(0,3,20), seed=1, T=20.0, dt=0.05)
print(f"Estimated K_c ~ {Kc:.3f}, knee ~ {Kk:.3f}")
write_csv(sweep_df, "SMOKETEST_stability_map.csv")


Graph metrics: {'n': 200, 'm': 800, 'degree_mean': 8.0, 'degree_std': 1.0816653826391966, 'degree_entropy': 2.121956571486788, 'clustering_avg': 0.4112683982683983, 'modularity_Q': 0.62972109375, 'communities': 5.0, 'density': 0.04020100502512563}
Estimated K_c ~ 0.000, knee ~ 0.789
Saved: artifacts_cnt_validation_20251104-031414Z\SMOKETEST_stability_map.csv


WindowsPath('artifacts_cnt_validation_20251104-031414Z/SMOKETEST_stability_map.csv')


---
**Notes**  
- `degree_preserving_null` uses `double_edge_swap` which may not fully randomize for small graphs; increase `swaps_per_edge` if needed.  
- Kuramoto settings (`T`, `dt`, `omega_std`) can be tuned per dataset; report captures your exact config.  
- For Ising dynamics, add a parallel sweep block using Glauber/Metropolis; Kuramoto alone suffices for synchronization thresholds.

**License**  
MIT — Use freely in CNT Lab. Please cite Cognitive Nexus Theory (CNT) work by Telos.
