In [1]:
# Importacion de bivliotecas 
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
from scipy.signal import coherence, butter, filtfilt, hilbert
from itertools import combinations
from networkx.algorithms import community
import warnings
warnings.filterwarnings("ignore")


In [2]:
# Garantiza que los resultados aleatorios (por ejemplo el grafico spring_layout) sean reproducibles.
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)


In [3]:
# Definición de bandas de frecuencia EEG
BANDS = {
    "delta": (1, 4),
    "theta": (4, 8),
    "alpha": (8, 13),
    "beta": (13, 30),
    "gamma": (30, 45)
}

In [4]:
# Crear carpeta si no existe
def ensure_dir(path):
    os.makedirs(path, exist_ok=True)


In [5]:
# Función para cargar el CSV automáticamente
def load_csv_detect(data_path):
    df = pd.read_csv(data_path, index_col=None)

    # Caso 1: filas = canales
    if df.shape[0] < df.shape[1]:
        data = df.to_numpy(dtype=float)
        ch_names = list(df.index.astype(str))

        if df.dtypes[0] == object:
            try:
                names = df.iloc[:,0].astype(str).values
                data = df.iloc[:,1:].to_numpy(dtype=float)
                ch_names = list(names)
            except:
                ch_names = [f"Ch{i}" for i in range(data.shape[0])]
    else:
        # Caso 2: filas = muestras → transponer
        if all(np.issubdtype(dt, np.number) for dt in df.dtypes):
            data = df.to_numpy(dtype=float).T
            ch_names = list(df.columns.astype(str))
        else:
            data = df.select_dtypes(include=[np.number]).to_numpy(dtype=float).T
            ch_names = list(df.select_dtypes(include=[np.number]).columns.astype(str))

    return {"data": data, "ch_names": ch_names}



In [6]:
# Filtro 
def load_csv_detect(data_path):
    df = pd.read_csv(data_path, index_col=None)

    # Caso 1: filas = canales
    if df.shape[0] < df.shape[1]:
        data = df.to_numpy(dtype=float)
        ch_names = list(df.index.astype(str))

        if df.dtypes[0] == object:
            try:
                names = df.iloc[:,0].astype(str).values
                data = df.iloc[:,1:].to_numpy(dtype=float)
                ch_names = list(names)
            except:
                ch_names = [f"Ch{i}" for i in range(data.shape[0])]
    else:
        # Caso 2: filas = muestras → transponer
        if all(np.issubdtype(dt, np.number) for dt in df.dtypes):
            data = df.to_numpy(dtype=float).T
            ch_names = list(df.columns.astype(str))
        else:
            data = df.select_dtypes(include=[np.number]).to_numpy(dtype=float).T
            ch_names = list(df.select_dtypes(include=[np.number]).columns.astype(str))

    return {"data": data, "ch_names": ch_names}



In [7]:
#Matriz de coherencia

def compute_coherence_matrix(data, fs, fmin, fmax, nperseg=None):
    n_channels = data.shape[0]
    nperseg = nperseg or int(fs*2)

    coh = np.zeros((n_channels, n_channels))

    for i in range(n_channels):
        for j in range(i, n_channels):
            f, Cxy = coherence(data[i], data[j], fs=fs, nperseg=nperseg)
            mask = (f >= fmin) & (f <= fmax)
            coh[i, j] = np.mean(Cxy[mask]) if mask.any() else np.mean(Cxy)
            coh[j, i] = coh[i, j]

    np.fill_diagonal(coh, 0)
    return coh


In [8]:
#Matriz PLV
def compute_plv_matrix(data):
    analytic = hilbert(data, axis=1)
    phases = np.angle(analytic)
    n_channels = phases.shape[0]

    plv = np.zeros((n_channels, n_channels))

    for i in range(n_channels):
        for j in range(i+1, n_channels):
            phase_diff = phases[i] - phases[j]
            val = np.abs(np.mean(np.exp(1j*phase_diff)))
            plv[i,j] = val
            plv[j,i] = val

    np.fill_diagonal(plv, 0)
    return plv


In [9]:
#Umbral (threshold)

def threshold_matrix(mat, percentile=75):
    vals = mat.flatten()
    vals = vals[~np.isnan(vals)]
    th = np.percentile(vals, percentile)

    mat_bin = (mat >= th).astype(int)
    mat_thresh = mat.copy()
    mat_thresh[mat < th] = 0

    return mat_thresh, mat_bin, th


In [10]:
#Métricas del grafo
def graph_metrics_from_adj(adj):
    G = nx.from_numpy_array(adj)

    deg = dict(G.degree(weight='weight'))
    betw = nx.betweenness_centrality(G)
    clust = nx.clustering(G, weight='weight') if len(G) > 0 else {}

    try:
        comms = list(community.greedy_modularity_communities(G))
        modularity = nx.community.modularity(G, comms)
    except:
        comms, modularity = [], None

    return {
        "graph": G,
        "degree": deg,
        "betweenness": betw,
        "clustering": clust,
        "communities": comms,
        "modularity": modularity
    }


In [11]:
#Graficar heatmap
def plot_heatmap(mat, ch_names, title, outpath):
    plt.figure(figsize=(6,5))
    df = pd.DataFrame(mat, index=ch_names, columns=ch_names)
    plt.imshow(df, aspect='equal')
    plt.colorbar()
    plt.title(title)
    plt.tight_layout()
    plt.savefig(outpath)
    plt.close()


In [12]:
#Graficar grafo en 2D
def plot_graph_2d(G, ch_names, outpath, node_size_map=None):
    plt.figure(figsize=(6,6))
    pos = nx.spring_layout(G, seed=RANDOM_SEED)

    node_sizes = [node_size_map.get(i, 100) for i in range(len(G.nodes()))] if node_size_map else 100

    nx.draw(G, pos, with_labels=True, labels={i: ch_names[i] for i in range(len(ch_names))},
            node_size=node_sizes)

    plt.tight_layout()
    plt.savefig(outpath)
    plt.close()


In [13]:
#Ejecuta todo el proceso de análisis EEG de inicio a fin.
def run_pipeline_jupyter(
    data_path,
    fs=256,
    method="coherence",
    band="alpha",
    outdir="./outputs_jupyter",
    threshold_percentile=75
):

    print("Cargando archivo…")
    info = load_csv_detect(data_path)

    data_arr = info["data"]
    ch_names = info["ch_names"]

    ensure_dir(outdir)

    print("Aplicando filtro banda…")
    fmin, fmax = BANDS[band]
    data_bp = bandpass(data_arr, fs, fmin, fmax)

    print("Calculando conectividad…")
    if method == "coherence":
        conn = compute_coherence_matrix(data_bp, fs, fmin, fmax)
    else:
        conn = compute_plv_matrix(data_bp)

    plot_heatmap(conn, ch_names, f"Conectividad {method}", f"{outdir}/heatmap_continuous.png")

    print("Aplicando threshold…")
    conn_thresh, conn_bin, th = threshold_matrix(conn, percentile=threshold_percentile)

    plot_heatmap(conn_thresh, ch_names, "Thresholded", f"{outdir}/heatmap_thresholded.png")

    print("Calculando métricas…")
    metrics = graph_metrics_from_adj(conn_thresh)

    degree = metrics["degree"]
    node_size_map = {i: (degree[i]+1)*80 for i in degree}

    plot_graph_2d(metrics["graph"], ch_names, f"{outdir}/graph2d.png", node_size_map=node_size_map)

    pd.DataFrame(conn_thresh, index=ch_names, columns=ch_names).to_csv(f"{outdir}/adjacency_thresholded.csv")
    pd.DataFrame(conn, index=ch_names, columns=ch_names).to_csv(f"{outdir}/adjacency_continuous.csv")

    with open(f"{outdir}/metrics.json", "w") as f:
        json.dump(metrics, f, indent=2)

    print("Pipeline completado. Resultados guardados en:", outdir)

    return metrics
