In [1]:
## ==============================================================
## Neurodevelopmental Brain–Heart Dynamics Pipeline (Simplified)
## --------------------------------------------------------------
## This streamlined version preserves all computational logic while
## removing redundancy and improving readability.
##
## 1. Load EEG–ECG correlations, HEP features, and directional BHI indices.
## 2. Merge and normalize all multimodal features.
## 3. Run SparsePCA to extract latent components.
## 4. Construct developmental divergence/convergence models.
## 5. Compute similarity matrices, detect outliers, and visualize.
## 6. Perform Mantel correlation analyses (age–similarity coupling).
##
## ==============================================================

In [None]:
# ==============================================================
# === Imports ===
# ==============================================================
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.io import loadmat
from scipy.stats import pearsonr
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.decomposition import SparsePCA
from skbio.stats.distance import mantel


# ==============================================================
# === Utility Functions ===
# ==============================================================

def normalize_matrix(mtx):
    """Scale matrix to [0,1], preserving zeros on diagonal."""
    mask = ~np.eye(len(mtx), dtype=bool)
    mmin, mmax = mtx[mask].min(), mtx[mask].max()
    mtx = (mtx - mmin) / (mmax - mmin)
    np.fill_diagonal(mtx, 0)
    return mtx

def compute_similarity(features):
    """Compute pairwise similarity (1 - normalized Euclidean distance)."""
    n = len(features)
    dist = np.linalg.norm(features[:, None] - features, axis=2)
    return 1 - normalize_matrix(dist)

def remove_outliers(sim_mtx, threshold_sigma=2):
    """Remove subjects with high mean dissimilarity (>mean+2σ)."""
    row_means = sim_mtx.sum(1) / (len(sim_mtx) - 1)
    thr = row_means.mean() + threshold_sigma * row_means.std()
    out_idx = np.where(row_means > thr)[0]
    clean = np.delete(sim_mtx, out_idx, axis=(0, 1))
    return clean, out_idx

def plot_heatmap(mtx, title, age_positions=None, age_labels=None, vmin=0.3, vmax=0.9):
    """Standardized similarity heatmap."""
    plt.figure(figsize=(8, 8))
    sns.heatmap(mtx, cmap='RdYlBu_r', square=True, vmin=vmin, vmax=vmax,
                cbar_kws={'label': 'Similarity', 'shrink': 0.35},
                xticklabels=[], yticklabels=[])
    plt.title(title, fontsize=13)
    if age_positions and age_labels:
        plt.xticks(age_positions, age_labels)
        plt.yticks(age_positions, age_labels)
    plt.tick_params(axis='both', direction='out', length=8, width=1.2)
    plt.tight_layout()
    plt.show()

def run_sparse_pca(df, alpha=1, n_components=5, plot=True):
    """Run SparsePCA and return components and transformed scores."""
    X = SimpleImputer(strategy='mean').fit_transform(df)
    X = StandardScaler().fit_transform(X)
    spca = SparsePCA(n_components=n_components, alpha=alpha, random_state=0)
    X_trans = spca.fit_transform(X)

    if plot:
        var = np.var(X_trans, 0)
        ratio = var / np.var(X, 0).sum()
        cum = np.cumsum(ratio)
        plt.plot(range(1, n_components+1), ratio, 'o-', label='Explained')
        plt.plot(range(1, n_components+1), cum, 's--', label='Cumulative')
        plt.axhline(0.8, color='r', ls='--')
        plt.xlabel("Component"); plt.ylabel("Variance Ratio")
        plt.title(f"SparsePCA Scree Plot (α={alpha})")
        plt.legend(); plt.tight_layout(); plt.show()
    return spca, X_trans


# ==============================================================
# === Divergence / Convergence Matrices ===
# ==============================================================

def build_age_matrices(ages):
    """Return normalized divergence & convergence matrices."""
    n = len(ages)
    avg = (ages[:, None] + ages[None, :]) / 2
    div = ages.max() - avg
    conv = avg
    return normalize_matrix(div), normalize_matrix(conv)


# ==============================================================
# === Mantel Analysis Helper ===
# ==============================================================

def mantel_compare(sim, div, conv, label):
    """Run Mantel tests for divergence/convergence associations."""
    r_div, p_div, _ = mantel(sim, div, method='spearman', permutations=5000)
    r_con, p_con, _ = mantel(sim, conv, method='spearman', permutations=5000)
    print(f"{label} — Divergence: r={r_div:.3f}, p={p_div:.4f} | Convergence: r={r_con:.3f}, p={p_con:.4f}")
    return (r_div, p_div), (r_con, p_con)