In [1]:
import numpy as np
from scipy.stats import pearsonr

import NPI

import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
import torch, torch.nn as nn
from pathlib import Path; import csv
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

def per_roi_std(signals):  # signals: [T,N]
    return signals.std(axis=0, ddof=1) + 1e-8

def make_snr_vector_uneven(N, center, sigma=0.4, seed=42):
    """
    ROI-wise SNR around a center (5 or 10). Larger sigma = more heterogeneity.
    """
    rng = np.random.default_rng(seed)
    factors = rng.lognormal(mean=0.0, sigma=sigma, size=N)
    factors = np.clip(factors, 0.3, 3.0)          # keep sane
    return center * factors

def add_noise_by_snr(signals, snr_per_roi):
    T, N = signals.shape
    sig_std = per_roi_std(signals)
    noise_std = sig_std / snr_per_roi
    return signals + np.random.randn(T, N) * noise_std

def pearson_r(a, b):
    a = a.reshape(-1); b = b.reshape(-1)
    ma, mb = a.mean(), b.mean()
    num = ((a-ma)*(b-mb)).sum()
    den = np.sqrt(((a-ma)**2).sum() * ((b-mb)**2).sum()) + 1e-12
    return float(num/den)

def log_row(path, row):
    path = Path(path); path.parent.mkdir(parents=True, exist_ok=True)
    write_header = not path.exists()
    with open(path, "a", newline="") as f:
        w = csv.DictWriter(f, fieldnames=row.keys())
        if write_header: w.writeheader()
        w.writerow(row)


In [3]:
class ANN_LSTM(nn.Module):
    def __init__(self, node_dim, hidden=128, num_layers=2, steps=3):
        super().__init__()
        self.node_dim, self.steps = node_dim, steps
        self.enc = nn.Linear(node_dim, hidden)
        self.lstm = nn.LSTM(input_size=hidden, hidden_size=hidden,
                            num_layers=num_layers, batch_first=True)
        self.out = nn.Linear(hidden, node_dim)
        self.to(device)
    def forward(self, x):
        if x.dim() == 1:               # allow (steps*N,)
            x = x.unsqueeze(0)         # -> (1, steps*N)
        B = x.shape[0]
        seq = x.view(B, self.steps, self.node_dim)
        h = torch.relu(self.enc(seq))
        h, _ = self.lstm(h)
        out = self.out(h[:, -1, :])    # (B, N)
        return out[0] if out.shape[0] == 1 else out   # <-- return (N,) for unbatched


In [4]:
import math
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)/d_model))
        pe[:, 0::2] = torch.sin(pos * div); pe[:, 1::2] = torch.cos(pos * div)
        self.register_buffer("pe", pe.unsqueeze(0))
    def forward(self, x):  # x: (B,T,D)
        return x + self.pe[:, :x.size(1), :]

class ANN_Transformer(nn.Module):
    def __init__(self, node_dim, d_model=128, nhead=8, num_layers=3, steps=3):
        super().__init__()
        self.node_dim, self.steps = node_dim, steps
        self.inp = nn.Linear(node_dim, d_model)
        self.pos = PositionalEncoding(d_model, max_len=steps)
        enc = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        self.encoder = nn.TransformerEncoder(enc, num_layers=num_layers)
        self.out = nn.Linear(d_model, node_dim)
        self.to(device)
    def forward(self, x):
        if x.dim() == 1:               # allow (steps*N,)
            x = x.unsqueeze(0)         # -> (1, steps*N)
        B = x.shape[0]
        seq = x.view(B, self.steps, self.node_dim)   # (B, T, N)
        h = self.inp(seq)
        h = self.pos(h)
        h = self.encoder(h)
        out = self.out(h[:, -1, :])    # (B, N)
        return out[0] if out.shape[0] == 1 else out   # <-- return (N,) for unbatched               # next-step prediction


In [5]:
def train_NN_masked(model, input_X, target_Y, steps, p_node=0.15,
                    batch_size=64, train_set_proportion=0.8,
                    num_epochs=80, lr=5e-4, l2=5e-5):
    Xtr = torch.tensor(input_X[:int(train_set_proportion*len(input_X))], dtype=torch.float, device=device)
    ytr = torch.tensor(target_Y[:int(train_set_proportion*len(target_Y))], dtype=torch.float, device=device)
    Xte = torch.tensor(input_X[int(train_set_proportion*len(input_X)):], dtype=torch.float, device=device)
    yte = torch.tensor(target_Y[int(train_set_proportion*len(target_Y)):], dtype=torch.float, device=device)
    tr = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(Xtr, ytr), batch_size, shuffle=True)
    te = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(Xte, yte), batch_size, shuffle=False)

    loss = nn.MSELoss(); opt = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=l2)
    train_hist, test_hist = [], []
    N = ytr.shape[1]

    for _ in range(num_epochs):
        model.train()
        for xb, yb in tr:
            # reshape, mask random ROIs across ~60% of timesteps
            B = xb.size(0)
            xv = xb.view(B, steps, N)
            mask = torch.zeros_like(xv, dtype=torch.bool)
            num_nodes = max(1, int(p_node*N))
            t_keep = max(1, int(0.4*steps))
            t_idx = torch.randint(0, steps, (B, t_keep), device=device)
            for b in range(B):
                n_idx = torch.randperm(N, device=device)[:num_nodes]
                mask[b, :, n_idx] = True
                mask[b, t_idx[b], :] = False  # keep some timesteps unmasked
            xc = xv.masked_fill(mask, 0.0).view(B, steps*N)

            pred = model(xc)
            l = loss(pred, yb)
            opt.zero_grad(); l.backward(); opt.step()

        # epoch eval
        model.eval(); tl=0; tn=0
        with torch.no_grad():
            for xb, yb in tr:
                tl += loss(model(xb), yb).item()*yb.size(0); tn += yb.size(0)
            train_hist.append(tl/tn); tl=0; tn=0
            for xb, yb in te:
                tl += loss(model(xb), yb).item()*yb.size(0); tn += yb.size(0)
            test_hist.append(tl/tn)
    return model, train_hist, test_hist


In [6]:
def eval_block(model, X, y, steps, true_signals=None, real_EC=None, SC=None):
    N = y.shape[1]
    out = {}
    model_fc = NPI.model_FC(model, node_num=N, steps=steps); out["model_fc"]=model_fc
    if true_signals is not None:
        emp_fc = NPI.corrcoef(true_signals)
        mask = ~np.eye(N, dtype=bool)
        out["r_fc"] = pearson_r(model_fc[mask], emp_fc[mask])
    ec  = NPI.model_EC(model, X, y, pert_strength=1.0); out["ec"]=ec
    jac = NPI.model_Jacobian(model, X, steps=steps);   out["jac"]=jac
    if real_EC is not None: out["r_ec_real"]=pearson_r(ec, real_EC); out["r_jac_real"]=pearson_r(jac, real_EC)
    if SC is not None:      out["r_ec_sc"]=pearson_r(ec, SC)
    return out


In [7]:
# --- baseline run config (no noise notebook) ---
SEED = 42

import random, numpy as np
try:
    import torch
except ImportError:
    torch = None

random.seed(SEED)
np.random.seed(SEED)
if torch:
    torch.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)

In [8]:
def flat_without_diagonal(matrix):

    "Flatten the matrix without including the diagonal"
    
    n = matrix.shape[0]
    flattened = []
    for i in range(n):
        for j in list(range(i)) + list(range(i + 1, n)):
            flattened.append(matrix[i][j])
    
    return np.array(flattened)

### **NPI usage demo**

In [9]:
batch_size              = 50
train_set_proportion    = 0.8
ROI_num                 = 20
using_steps             = 3

In [None]:
# seeds = [0, 1, 42]
# for seed in seeds:
#     np.random.seed(seed)
#     torch.manual_seed(seed)
#     if torch.cuda.is_available():
#         torch.cuda.manual_seed_all(seed)


# # load the matching world for this seed (recommended)
# signals_clean = np.loadtxt(f'./RNN_simulation_data/sim_data/dynamics_020nodes_8000steps_{seed:02d}seed.txt')
# SC      = np.loadtxt(f'./RNN_simulation_data/sim_data/SC_020nodes_8000steps_{seed:02d}seed.txt')
# real_EC = np.loadtxt(f'./RNN_simulation_data/sim_data/real_EC_020nodes_8000steps_{seed:02d}seed.txt')

# signals = signals_clean.copy()      # clean reference
# N = signals_clean.shape[1]
# steps = 3

In [None]:
# # N, steps = 20, 3
# dummy = torch.randn(steps*N, device=device)  # 1-D
# print(ANN_LSTM(N, steps=steps)(dummy).shape)         # torch.Size([1, N])
# print(ANN_Transformer(N, steps=steps)(dummy).shape)  # torch.Size([1, N])


torch.Size([20])
torch.Size([20])


In [None]:

seeds = [0, 1, 42]

Path("metrics").mkdir(parents=True, exist_ok=True)
results_csv = "metrics/uneven_snr_5_10_mlp_lstm_mask.csv"

for seed in seeds:
    # set RNGs
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

    # --- load matching world for THIS seed ---
    signals_clean = np.loadtxt(f'./RNN_simulation_data/sim_data/dynamics_020nodes_8000steps_{seed:02d}seed.txt')
    SC      = np.loadtxt(f'./RNN_simulation_data/sim_data/SC_020nodes_8000steps_{seed:02d}seed.txt')
    real_EC = np.loadtxt(f'./RNN_simulation_data/sim_data/real_EC_020nodes_8000steps_{seed:02d}seed.txt')

    N = signals_clean.shape[1]
    steps = 3

    # ---- run experiments for both SNR centers for THIS seed ----
    for center in [5, 10]:
        snr_vec = make_snr_vector_uneven(N, center=center, sigma=0.4, seed=seed)
        noisy = add_noise_by_snr(signals_clean, snr_vec)
        X, y = NPI.multi2one(noisy, steps)

        # MLP (no mask)
        mlp = NPI.ANN_MLP(input_dim=steps*N, hidden_dim=int(2.5*N), latent_dim=int(0.8*N), output_dim=N)
        mlp, tr, te = NPI.train_NN(mlp, X, y, num_epochs=80, lr=1e-3, l2=5e-5)
        out = eval_block(mlp, X, y, steps, true_signals=signals_clean, real_EC=real_EC, SC=SC)
        log_row(results_csv, {"seed": seed,"model":"MLP","mask":"no","SNR_center":center,
                              "train":tr[-1],"test":te[-1],
                              "r_fc":out.get("r_fc",""),"r_ec_real":out.get("r_ec_real",""),
                              "r_jac_real":out.get("r_jac_real",""),"r_ec_sc":out.get("r_ec_sc","")})

        # MLP (mask)
        mlp_m = NPI.ANN_MLP(input_dim=steps*N, hidden_dim=int(2.5*N), latent_dim=int(0.8*N), output_dim=N)
        mlp_m, tr, te = train_NN_masked(mlp_m, X, y, steps, p_node=0.15, num_epochs=80, lr=1e-3, l2=5e-5)
        out = eval_block(mlp_m, X, y, steps, true_signals=signals_clean, real_EC=real_EC, SC=SC)
        log_row(results_csv, {"seed": seed,"model":"MLP","mask":"yes","SNR_center":center,
                              "train":tr[-1],"test":te[-1],
                              "r_fc":out.get("r_fc",""),"r_ec_real":out.get("r_ec_real",""),
                              "r_jac_real":out.get("r_jac_real",""),"r_ec_sc":out.get("r_ec_sc","")})

        # LSTM (no mask)
        lstm = ANN_LSTM(node_dim=N, hidden=128, num_layers=2, steps=steps)
        lstm, tr, te = NPI.train_NN(lstm, X, y, num_epochs=80, lr=5e-4, l2=5e-5)
        out = eval_block(lstm, X, y, steps, true_signals=signals_clean, real_EC=real_EC, SC=SC)
        log_row(results_csv, {"seed": seed,"model":"LSTM","mask":"no","SNR_center":center,
                              "train":tr[-1],"test":te[-1],
                              "r_fc":out.get("r_fc",""),"r_ec_real":out.get("r_ec_real",""),
                              "r_jac_real":out.get("r_jac_real",""),"r_ec_sc":out.get("r_ec_sc","")})

        # LSTM (mask)
        lstm_m = ANN_LSTM(node_dim=N, hidden=128, num_layers=2, steps=steps)
        lstm_m, tr, te = train_NN_masked(lstm_m, X, y, steps, p_node=0.15, num_epochs=80, lr=5e-4, l2=5e-5)
        out = eval_block(lstm_m, X, y, steps, true_signals=signals_clean, real_EC=real_EC, SC=SC)
        log_row(results_csv, {"seed": seed,"model":"LSTM","mask":"yes","SNR_center":center,
                              "train":tr[-1],"test":te[-1],
                              "r_fc":out.get("r_fc",""),"r_ec_real":out.get("r_ec_real",""),
                              "r_jac_real":out.get("r_jac_real",""),"r_ec_sc":out.get("r_ec_sc","")})

        # Transformer (no mask)
        tfm = ANN_Transformer(node_dim=N, d_model=128, nhead=8, num_layers=3, steps=steps)
        tfm, tr, te = NPI.train_NN(tfm, X, y, num_epochs=80, lr=5e-4, l2=5e-5)
        out = eval_block(tfm, X, y, steps, true_signals=signals_clean, real_EC=real_EC, SC=SC)
        log_row(results_csv, {"seed": seed,"model":"Transformer","mask":"no","SNR_center":center,
                              "train":tr[-1],"test":te[-1],
                              "r_fc":out.get("r_fc",""),"r_ec_real":out.get("r_ec_real",""),
                              "r_jac_real":out.get("r_jac_real",""),"r_ec_sc":out.get("r_ec_sc","")})

        # Transformer (mask)
        tfm_m = ANN_Transformer(node_dim=N, d_model=128, nhead=8, num_layers=3, steps=steps)
        tfm_m, tr, te = train_NN_masked(tfm_m, X, y, steps, p_node=0.15, num_epochs=80, lr=5e-4, l2=5e-5)
        out = eval_block(tfm_m, X, y, steps, true_signals=signals_clean, real_EC=real_EC, SC=SC)
        log_row(results_csv, {"seed": seed,"model":"Transformer","mask":"yes","SNR_center":center,
                              "train":tr[-1],"test":te[-1],
                              "r_fc":out.get("r_fc",""),"r_ec_real":out.get("r_ec_real",""),
                              "r_jac_real":out.get("r_jac_real",""),"r_ec_sc":out.get("r_ec_sc","")})
# end for seed



KeyboardInterrupt: 

In [None]:
# ====== CONTROL FLAGS ======
RUN_OLD_DEMO = False          # <- set to True if you want to see the old single-seed demo again
RUN_GROUP_BASELINE = False    # <- set to True if you want the 50-seed baseline loop
# ===========================


##### train NN

Several ANNs (MLP, CNN, RNN, VAR) are provided in the NPI framework, which can be used as a surrogate brain.

In [None]:
inputs, targets = NPI.multi2one(signals, steps = using_steps)

ANN = NPI.ANN_MLP(input_dim = using_steps * ROI_num, hidden_dim = 2 * ROI_num, latent_dim = int(0.8 * ROI_num), output_dim = ROI_num)
# ANN = NPI.ANN_CNN(in_channels = ROI_num, hidden_channels = 3 * ROI_num, out_channels = ROI_num, data_length = using_steps)
# ANN = NPI.ANN_RNN(input_dim = ROI_num, hidden_dim = int(2.5 * ROI_num), latent_dim = int(2.5 * ROI_num), output_dim = ROI_num, data_length = using_steps)
# ANN = NPI.ANN_VAR(input_dim = using_steps * ROI_num, output_dim = ROI_num)

ANN, training_loss, testing_loss = NPI.train_NN(ANN, inputs, targets, batch_size, train_set_proportion, num_epochs = 80, lr = 2e-4, l2 = 5e-5)

In [None]:
plt.plot(training_loss, label = 'Training loss')
plt.plot(testing_loss, label = 'Testing loss')
plt.legend(loc = 'upper right')
plt.show()

##### calculate empirical FC & model FC

In [None]:
empirical_FC = NPI.corrcoef(signals)
model_FC = NPI.model_FC(ANN, node_num = ROI_num, steps = using_steps)

##### calculate NPI Inferred EC

In [None]:
NPI_EC = NPI.model_EC(ANN, inputs, targets, pert_strength = 1.0)
np.fill_diagonal(NPI_EC, 0)

##### calculate model Jacobian

In [None]:
NPI_Jacobian = NPI.model_Jacobian(ANN, inputs, steps = using_steps)
np.fill_diagonal(NPI_Jacobian, 0)

##### empirical FC - model FC comparison

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize = (12, 6))
sns.heatmap(empirical_FC, ax = ax1, vmin = -0.5, vmax = 0.5, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
sns.heatmap(model_FC, ax = ax2, vmin = -1.0, vmax = 1.0, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
ax1.set_title('Empirical FC'); ax2.set_title('Model FC')
plt.show()

In [None]:
r_value, p_value = pearsonr(flat_without_diagonal(model_FC), flat_without_diagonal(empirical_FC))

plt.figure(figsize = (4.8, 4.8))
plt.scatter(flat_without_diagonal(model_FC), flat_without_diagonal(empirical_FC))

plt.xlim(-1.05, 1.05); plt.xticks([-1.0, 0.0, 1.0]); plt.xlabel('model FC')
plt.ylim(-0.55, 0.55); plt.yticks([-0.5, 0.0, 0.5]); plt.ylabel('empirical FC')
plt.text(-0.95, 0.45, 'r = {:.2f}, p = {:.0e}'.format(r_value, p_value))

ax = plt.gca()
ax.spines['right'].set_visible(False); ax.spines['top'].set_visible(False)

plt.show()

##### real EC - NPI EC comparison

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize = (12, 6))
sns.heatmap(real_EC, ax = ax1, vmin = -0.2, vmax = 0.2, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
sns.heatmap(NPI_EC, ax = ax2, vmin = -0.2, vmax = 0.2, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
ax1.set_title('Real EC'); ax2.set_title('NPI Inferred EC')
plt.show()

In [None]:
r_value, p_value = pearsonr(flat_without_diagonal(NPI_EC), flat_without_diagonal(real_EC))

plt.figure(figsize = (4.8, 4.8))
plt.scatter(flat_without_diagonal(NPI_EC), flat_without_diagonal(real_EC))

plt.xlim(-0.25, 0.25); plt.xticks([-0.2, 0.0, 0.2]); plt.xlabel('NPI Inferred EC')
plt.ylim(-0.25, 0.25); plt.yticks([-0.2, 0.0, 0.2]); plt.ylabel('Real EC')
plt.text(-0.2, 0.2, 'r = {:.2f}, p = {:.0e}'.format(r_value, p_value))

ax = plt.gca()
ax.spines['right'].set_visible(False); ax.spines['top'].set_visible(False)

plt.show()

##### real EC - model Jacobian comparison

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize = (12, 6))
sns.heatmap(real_EC, ax = ax1, vmin = -0.2, vmax = 0.2, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
sns.heatmap(NPI_Jacobian, ax = ax2, vmin = -0.2, vmax = 0.2, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
ax1.set_title('Real EC'); ax2.set_title('Model Jacobian')
plt.show()

In [None]:
r_value, p_value = pearsonr(flat_without_diagonal(NPI_Jacobian), flat_without_diagonal(real_EC))

plt.figure(figsize = (4.8, 4.8))
plt.scatter(flat_without_diagonal(NPI_Jacobian), flat_without_diagonal(real_EC))

plt.xlim(-0.25, 0.25); plt.xticks([-0.2, 0.0, 0.2]); plt.xlabel('Model Jacobian')
plt.ylim(-0.25, 0.25); plt.yticks([-0.2, 0.0, 0.2]); plt.ylabel('Real EC')
plt.text(-0.2, 0.2, 'r = {:.2f}, p = {:.0e}'.format(r_value, p_value))

ax = plt.gca()
ax.spines['right'].set_visible(False); ax.spines['top'].set_visible(False)

plt.show()

##### SC - NPI EC comparison

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize = (12, 6))
sns.heatmap(SC, ax = ax1, vmin = -0.7, vmax = 0.7, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
sns.heatmap(NPI_EC, ax = ax2, vmin = -0.2, vmax = 0.2, cmap = 'RdBu_r', cbar = False, square = True, xticklabels = False, yticklabels = False)
ax1.set_title('SC'); ax2.set_title('NPI Inferred EC')
plt.show()

In [None]:
r_value, p_value = pearsonr(flat_without_diagonal(NPI_EC), flat_without_diagonal(SC))

plt.figure(figsize = (4.8, 4.8))
plt.scatter(flat_without_diagonal(NPI_EC), flat_without_diagonal(SC))

plt.xlim(-0.25, 0.25); plt.xticks([-0.2, 0.0, 0.2]); plt.xlabel('NPI Inferred EC')
plt.ylim(-0.75, 0.75); plt.yticks([-0.7, 0.0, 0.7]); plt.ylabel('SC')
plt.text(-0.2, 0.6, 'r = {:.2f}, p = {:.0e}'.format(r_value, p_value))

ax = plt.gca()
ax.spines['right'].set_visible(False); ax.spines['top'].set_visible(False)

plt.show()

##### Group-level analysis

In [None]:
NPIFC_empiricalFC_corr = []
NPIEC_realEC_corr = []
NPIEC_SC_corr = []

for i in range(50):

    signals = np.loadtxt('./RNN_simulation_data/sim_data/dynamics_020nodes_8000steps_{:02d}seed.txt'.format(i))
    SC      = np.loadtxt('./RNN_simulation_data/sim_data/SC_020nodes_8000steps_{:02d}seed.txt'.format(i))
    real_EC = np.loadtxt('./RNN_simulation_data/sim_data/real_EC_020nodes_8000steps_{:02d}seed.txt'.format(i))

    inputs, targets = NPI.multi2one(signals, steps = 3)
    ANN = NPI.ANN_MLP(input_dim = using_steps * ROI_num, hidden_dim = 2 * ROI_num, latent_dim = int(0.8 * ROI_num), output_dim = ROI_num)
    ANN, training_loss, testing_loss = NPI.train_NN(ANN, inputs, targets, num_epochs = 80, lr = 2e-4, l2 = 5e-5)

    empirical_FC = NPI.corrcoef(signals)
    model_FC = NPI.model_FC(ANN, node_num = ROI_num, steps = 3)
    NPIFC_empiricalFC_corr.append(pearsonr(flat_without_diagonal(model_FC), flat_without_diagonal(empirical_FC))[0])

    NPI_EC = NPI.model_EC(ANN, inputs, targets, pert_strength = 1.0)
    np.fill_diagonal(NPI_EC, 0)
    NPIEC_realEC_corr.append(pearsonr(flat_without_diagonal(NPI_EC), flat_without_diagonal(real_EC))[0])
    NPIEC_SC_corr.append(pearsonr(flat_without_diagonal(NPI_EC), flat_without_diagonal(SC))[0])

In [None]:
categories = ['NPI FC -\n real FC', 'NPI EC -\n real EC', 'NPI EC -\n SC']
values = [np.mean(NPIFC_empiricalFC_corr), np.mean(NPIEC_realEC_corr), np.mean(NPIEC_SC_corr)]
stds = [np.std(NPIFC_empiricalFC_corr), np.std(NPIEC_realEC_corr), np.std(NPIEC_SC_corr)]

plt.figure(figsize = (3, 3))
plt.bar(categories, values, yerr = stds, capsize = 5)
plt.ylabel('Pearson Correlation')
plt.ylim(0, 1)

ax = plt.gca()
ax.spines['right'].set_visible(False); ax.spines['top'].set_visible(False)

plt.show()

In [None]:
import os, numpy as np, pandas as pd

def pick(*names):
    g = globals()
    for n in names:
        if n in g:
            return g[n]
    raise NameError(f"Could not find any of: {names}")

def upper_tri(m):
    m = np.asarray(m)
    i = np.triu_indices_from(m, k=1)
    return m[i]

def fc_corr_from_any(emp_fc, mdl_fc):
    a = upper_tri(emp_fc)
    b = upper_tri(mdl_fc)
    return float(np.corrcoef(a, b)[0, 1])

# Try common names used in FC comparison cells
emp_fc = pick("empirical_FC", "empirical_fc", "FC_empirical", "FC_emp")
mdl_fc = pick("model_FC", "model_fc", "FC_model", "fc_model")

fc_corr = fc_corr_from_any(emp_fc, mdl_fc)

row = {
    "demo": "RNN",          # change to "WBM" if you're in that notebook
    "snr": "baseline",
    "mode": "none",
    "seed": SEED,
    "metric": "fc_corr",
    "value": fc_corr,
}
csv_path = "results_snr_sweep.csv"
pd.DataFrame([row]).to_csv(csv_path, mode="a", header=not os.path.exists(csv_path), index=False)
print("Appended to", csv_path, row)


In [None]:
# === Plots for inference from uneven-SNR experiments ===
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

df = pd.read_csv("metrics/uneven_snr_5_10_mlp_lstm_mask.csv")
df = df.dropna()

# Canonical order for models/masks
order = [("MLP","no"),("MLP","yes"),
         ("LSTM","no"),("LSTM","yes"),
         ("Transformer","no"),("Transformer","yes")]

metrics = ["r_fc","r_ec_real","r_jac_real","r_ec_sc"]

# Save dir
plot_dir = Path("metrics/graphs"); plot_dir.mkdir(parents=True, exist_ok=True)

for snr in sorted(df["SNR_center"].unique()):
    sub = df[df["SNR_center"]==snr]
    for metric in metrics:
        if metric not in sub.columns: continue
        plt.figure(figsize=(8,4))
        means = []
        stds  = []
        labels = []
        for model,mask in order:
            vals = sub[(sub["model"]==model)&(sub["mask"]==mask)][metric].dropna()
            if len(vals)==0: continue
            means.append(vals.mean()); stds.append(vals.std(ddof=1)); labels.append(f"{model}|{mask}")
        x = range(len(means))
        plt.bar(x, means, yerr=stds, capsize=4)
        plt.xticks(x, labels, rotation=30, ha="right")
        plt.ylabel(metric)
        plt.title(f"{metric} @ SNR {snr} (mean ± std over seeds)")
        plt.tight_layout()
        outpath = plot_dir / f"{metric}_SNR{snr}.png"
        plt.savefig(outpath, dpi=200)
        plt.show()
        print("Saved:", outpath)


In [None]:
print("✅ All experiments complete. Plots saved in metrics/graphs/")
