In [2]:
import os
import pandas as pd
import numpy as np
import networkx as nx
from scipy.signal import hilbert

In [3]:
bands = {
    'delta': (1, 4),
    'theta': (4, 8),
    'alpha': (8, 13),
    'beta': (13, 30),
    'gamma': (30, 50)
}

Bandpass filter the data

In [4]:
from scipy.signal import butter, filtfilt

def bandpass_filter(data, band, fs = 256, order=4, freq_bands = bands):
    nyq = 0.5 * fs
    band = freq_bands[band]
    lowcut = band[0]
    highcut = band[1]
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    filtered_data = np.zeros_like(data)

    # Loop through trials and channels
    for trial in range(data.shape[0]):
        for ch in range(data.shape[1]):
            filtered_data[trial, ch, :] = filtfilt(b, a, data[trial, ch, :])
    
    return filtered_data

Getting Data 

In [5]:
csv_1, csv_3, csv_5 = [], [], []

In [None]:
csvs_1, csvs_3, csvs_5 = [], [], []

root_dir = "/Users/anusha/Desktop/all_data"

for folder in os.listdir(root_dir):
    top_folder_path = os.path.join(root_dir, folder)

    if not os.path.isdir(top_folder_path):
        continue

    # Sort and filter only subfolders (ignore files like .DS_Store)
    subfolders = sorted([
        sf for sf in os.listdir(top_folder_path)
        if os.path.isdir(os.path.join(top_folder_path, sf))
    ])

    for i, idx in enumerate([0, 2, 4]):  # 1st, 3rd, and 5th subfolders
        try:
            subfolder_path = os.path.join(top_folder_path, subfolders[idx])
            for file in os.listdir(subfolder_path):
                if file.endswith('.csv'):
                    csv_path = os.path.join(subfolder_path, file)
                    if i == 0:
                        csvs_1.append(csv_path)
                    elif i == 1:
                        csvs_3.append(csv_path)
                    elif i == 2:
                        csvs_5.append(csv_path)
        except IndexError:
            print(f"⚠️  {top_folder_path} has fewer than 5 subfolders.")

CSV to Numpy

In [7]:
import numpy as np

def csv_to_np(path):
    data = np.genfromtxt(path, delimiter = '\t', invalid_raise=False)
    num_cols = data[0,:].shape[0]
    filtered_data = np.array([row for row in data if len(row) == num_cols])
    eeg_data = filtered_data[:, 0:17]
    reset_indices = np.where(eeg_data[:,0] == 0)[0]
    subarrays = [eeg_data[start:start + 128, 1:] for start in reset_indices]
    target_shape = subarrays[0].shape
    for i in range(len(subarrays)):
        if subarrays[i].shape != target_shape:
            del subarrays[i]
    result_3d = np.array(subarrays)
    result = np.transpose(result_3d, (0, 2, 1))
    return result

In [8]:
def get_data(paths):
    d = []

    for csv in paths:
        data = csv_to_np(csv)
        d.append(data)

    return d
        

In [9]:
resting = get_data(csvs_1)
middle = get_data(csvs_3)
fatigue = get_data(csvs_5)

Filtering data

In [10]:
resting_delta = []
for sample in resting:
    resting_delta.append(bandpass_filter(sample, "delta"))

resting_alpha = []
for sample in resting:
    resting_alpha.append(bandpass_filter(sample, "alpha"))

resting_theta = []
for sample in resting:
    resting_theta.append(bandpass_filter(sample, "theta"))

resting_beta = []
for sample in resting:
    resting_beta.append(bandpass_filter(sample, "beta"))

In [11]:
fatigue_delta = []
for sample in fatigue:
    fatigue_delta.append(bandpass_filter(sample, "delta"))

fatigue_beta = []
for sample in fatigue:
    fatigue_beta.append(bandpass_filter(sample, "beta"))

fatigue_alpha = []
for sample in fatigue:
    fatigue_alpha.append(bandpass_filter(sample, "alpha"))

fatigue_theta = []
for sample in fatigue:
    fatigue_theta.append(bandpass_filter(sample, "theta"))

Coherence

In [12]:
import numpy as np

def coherence(signal, FS):
    """
    signal: (channels, timesteps), averaged across trials
    FS: sampling rate
    Returns: (channels x channels) phase coherence matrix over full band
    """
    signal = np.mean(signal, axis = 0)
    n_channels, n_samples = signal.shape

    # FFT and normalize by magnitude
    spect = np.fft.fft(signal, axis=1)
    spect = spect / np.abs(spect)

    # Take only positive frequencies (avoid redundancy)
    spect = spect[:, :n_samples // 2]
    angles = np.angle(spect)

    # Compute mean phase difference for each pair
    pc_matrix = np.zeros((n_channels, n_channels))

    for i in range(n_channels):
        for j in range(i + 1, n_channels):
            phase_diff = angles[i] - angles[j]
            # Inter-trial phase coherence = length of average vector
            pc = np.abs(np.mean(np.exp(1j * phase_diff)))
            pc_matrix[i, j] = pc_matrix[j, i] = pc

    return pc_matrix


Phase Lag Index

In [13]:
def phase_lag_index(data, FS = 256):
    data = np.mean(data, axis = 0)
    n_channels, n_samples = data.shape
    pli_matrix = np.zeros((n_channels, n_channels))

    # Get analytic signal via Hilbert transform
    analytic_signal = hilbert(data, axis=1)
    phase_data = np.angle(analytic_signal)

    for i in range(n_channels):
        for j in range(i + 1, n_channels):
            phase_diff = phase_data[i] - phase_data[j]
            pli = np.abs(np.mean(np.sign(np.sin(phase_diff))))
            pli_matrix[i, j] = pli_matrix[j, i] = pli

    return pli_matrix

Phase Locking Value

In [14]:
import numpy as np
from scipy.signal import hilbert

def phase_locking_value(data):

    n_channels, n_times = data.shape
    plv_matrix = np.zeros((n_channels, n_channels))

    # Apply Hilbert transform to get analytic signal (complex-valued)
    analytic_signal = hilbert(data, axis=1)
    phases = np.angle(analytic_signal)

    for i in range(n_channels):
        for j in range(i + 1, n_channels):
            phase_diff = phases[i] - phases[j]
            plv = np.abs(np.sum(np.exp(1j * phase_diff)) / n_times)
            plv_matrix[i, j] = plv_matrix[j, i] = plv

    np.fill_diagonal(plv_matrix, 1)  # Diagonal = 1 (PLV with itself)
    return plv_matrix

Matrix to Graph

In [15]:
def matrix_to_graph(matrix):
    n_channels = matrix.shape[0]
    G = nx.Graph()

    for i in range(n_channels):
        for j in range(i + 1, n_channels):
            weight = matrix[i, j]
            G.add_edge(i, j, weight=weight)

    return G

Node Strengths - Coherence

In [17]:
def node_strengths(eeg_data, FS = 256):
    """
    eeg_data: (trials, channels, timesteps)
    FS: sampling rate
    Returns: node strengths per channel (summed connectivity)
    """  
    pc_matrix = coherence(eeg_data, FS)
    np.fill_diagonal(pc_matrix, 0)
    return np.sum(pc_matrix, axis=1)

Betweenness Centrality - Phase Lag Index

In [16]:
def betweenness_centrality(eeg_data, FS = 256):
    matrix = phase_lag_index(eeg_data)
    G = matrix_to_graph(matrix)
    centrality_dict = nx.betweenness_centrality(G, weight='weight')
    centrality_array = np.array([centrality_dict[node] for node in sorted(centrality_dict)])
    return centrality_array

Clustering Coefficient - Phase Lag Index

In [18]:
def clustering_coefficient(eeg_data, FS = 256):
    matrix = phase_lag_index(eeg_data)
    G = matrix_to_graph(matrix)
    clustering_dict = nx.clustering(G, weight='weight')
    clustering_array = np.array([clustering_dict[node] for node in sorted(clustering_dict)])
    return clustering_array

Clustering Coefficient - Phase Lag Index

In [19]:
def clustering_coefficient_plv(eeg_data, FS = 256):
    matrix = phase_locking_value(eeg_data)
    G = matrix_to_graph(matrix)
    clustering_dict = nx.clustering(G, weight='weight')
    clustering_array = np.array([clustering_dict[node] for node in sorted(clustering_dict)])
    return clustering_array

Make Channel Dominant

In [20]:
def make_channel_dominant(data, n_channels = 16):
    output = [[] for _ in range(n_channels)]
    for sample in data:
        for ch in range(n_channels):
            output[ch].append(sample[ch])
    return data

Phase Lag Index = Delta

In [22]:
resting_delta_bc = []
for sample in resting_delta:
    resting_delta_bc.append(betweenness_centrality(sample))

fatigue_delta_bc = []
for sample in fatigue_delta:
    fatigue_delta_bc.append(betweenness_centrality(sample))

In [23]:
resting_delta_bc = make_channel_dominant(resting_delta_bc)
fatigue_delta_bc = make_channel_dominant(fatigue_delta_bc)

In [21]:
"""import matplotlib.pyplot as plt

n_channels = len(resting_delta_bc)

for ch in range(n_channels):
    plt.figure(figsize=(8, 6))
    
    # Scatter plot for resting condition (blue points)
    plt.scatter(range(len(resting_delta_bc[ch])), resting_delta_bc[ch], label='Resting', color='blue', alpha=0.6)
    
    # Scatter plot for fatigued condition (red points)
    plt.scatter(range(len(fatigue_delta_bc[ch])), fatigue_delta_bc[ch], label='Fatigued', color='red', alpha=0.6)
    
    plt.title(f'Channel {ch+1} - Betweenness Centrality - Delta')
    plt.xlabel('Sample Index')
    plt.ylabel('Node Strength')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()"""

"import matplotlib.pyplot as plt\n\nn_channels = len(resting_delta_bc)\n\nfor ch in range(n_channels):\n    plt.figure(figsize=(8, 6))\n    \n    # Scatter plot for resting condition (blue points)\n    plt.scatter(range(len(resting_delta_bc[ch])), resting_delta_bc[ch], label='Resting', color='blue', alpha=0.6)\n    \n    # Scatter plot for fatigued condition (red points)\n    plt.scatter(range(len(fatigue_delta_bc[ch])), fatigue_delta_bc[ch], label='Fatigued', color='red', alpha=0.6)\n    \n    plt.title(f'Channel {ch+1} - Betweenness Centrality - Delta')\n    plt.xlabel('Sample Index')\n    plt.ylabel('Node Strength')\n    plt.legend()\n    plt.grid(True)\n    plt.tight_layout()"

Phase Lag Index - Theta

In [24]:
resting_theta_bc = []
for sample in resting_theta:
    resting_theta_bc.append(betweenness_centrality(sample))

fatigue_theta_bc = []
for sample in fatigue_theta:
    fatigue_theta_bc.append(betweenness_centrality(sample))

In [25]:
resting_theta_bc = make_channel_dominant(resting_theta_bc)
fatigue_theta_bc = make_channel_dominant(fatigue_theta_bc)

In [26]:
"""import matplotlib.pyplot as plt

n_channels = len(resting_theta_bc)

for ch in range(n_channels):
    plt.figure(figsize=(8, 6))
    
    # Scatter plot for resting condition (blue points)
    plt.scatter(range(len(resting_theta_bc[ch])), resting_theta_bc[ch], label='Resting', color='blue', alpha=0.6)
    
    # Scatter plot for fatigued condition (red points)
    plt.scatter(range(len(fatigue_theta_bc[ch])), fatigue_theta_bc[ch], label='Fatigued', color='red', alpha=0.6)
    
    plt.title(f'Channel {ch+1} - Betweenness Centrality - Delta')
    plt.xlabel('Sample Index')
    plt.ylabel('Node Strength')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()

    plt.show()"""

"import matplotlib.pyplot as plt\n\nn_channels = len(resting_theta_bc)\n\nfor ch in range(n_channels):\n    plt.figure(figsize=(8, 6))\n    \n    # Scatter plot for resting condition (blue points)\n    plt.scatter(range(len(resting_theta_bc[ch])), resting_theta_bc[ch], label='Resting', color='blue', alpha=0.6)\n    \n    # Scatter plot for fatigued condition (red points)\n    plt.scatter(range(len(fatigue_theta_bc[ch])), fatigue_theta_bc[ch], label='Fatigued', color='red', alpha=0.6)\n    \n    plt.title(f'Channel {ch+1} - Betweenness Centrality - Delta')\n    plt.xlabel('Sample Index')\n    plt.ylabel('Node Strength')\n    plt.legend()\n    plt.grid(True)\n    plt.tight_layout()\n\n    plt.show()"

Clustering Coefficient - Delta Band - PLI

In [27]:
resting_delta_cc = []
for sample in resting_delta:
    resting_delta_cc.append(clustering_coefficient(sample))

fatigue_delta_cc = []
for sample in fatigue_delta:
    fatigue_delta_cc.append(clustering_coefficient(sample))

In [28]:
resting_delta_cc = make_channel_dominant(resting_delta_cc)
fatigue_delta_cc = make_channel_dominant(fatigue_delta_cc)

In [None]:
"""import matplotlib.pyplot as plt

n_channels = len(resting_delta_cc)

for ch in range(n_channels):
    plt.figure(figsize=(8, 6))
    
    # Scatter plot for resting condition (blue points)
    plt.scatter(range(len(resting_delta_cc[ch])), resting_delta_cc[ch], label='Resting', color='blue', alpha=0.6)
    
    # Scatter plot for fatigued condition (red points)
    plt.scatter(range(len(fatigue_delta_cc[ch])), fatigue_delta_cc[ch], label='Fatigued', color='red', alpha=0.6)
    
    plt.title(f'Channel {ch+1} - Clustering Coefficient - Delta')
    plt.xlabel('Sample Index')
    plt.ylabel('Node Strength')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()

    plt.show()"""