In [21]:
#!pip install torch-scatter -f https://data.pyg.org/whl/torch-2.2.0+cpu.html
#!pip install torch-sparse -f https://data.pyg.org/whl/torch-2.2.0+cpu.html
#!pip install torch-cluster -f https://data.pyg.org/whl/torch-2.2.0+cpu.html
#!pip install torch-spline-conv -f https://data.pyg.org/whl/torch-2.2.0+cpu.html


In [22]:
#!pip install torch-geometric

In [23]:
# -*- coding: utf-8 -*-
"""
Created on Sat Jan 27 18:26:26 2024
Rishan Patel, UCL, Bioelectronics Group.

Building a dataset and testing a GNN model. 

FBCSP works very well. Clearly frequency information is important.
Creating graphs based on full spectrum, raw data PLV. 3 Classes, L,R,Re.
ICA not performed. 
Using 10 frequency bands, band power measures as features 22x10. 
Using a standard Graph Neural Network.

To Do: 
- ICA for Brain Wave Isolation
- Optimising parameters
- Removing randomisation, and barching so that models update over sequential time
- Check accuracies for each class, not aggregate

Notes: 
    
    

https://pytorch-geometric.readthedocs.io/en/latest/get_started/introduction.html#data-handling-of-graphs
https://pytorch-geometric.readthedocs.io/en/latest/generated/torch_geometric.data.Data.html#torch_geometric.data.Data
https://pytorch-geometric.readthedocs.io/en/latest/tutorial/create_dataset.html
"""
import os
from os.path import dirname, join as pjoin
import scipy as sp
import scipy.io as sio
from scipy import signal
import numpy as np
from matplotlib.pyplot import specgram
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as sig
import networkx as nx
from scipy.signal import welch
from scipy.stats import entropy
from sklearn.feature_selection import mutual_info_classif
from scipy.integrate import simps
from sklearn.model_selection import KFold


import torch as torch
from torch.nn import Linear
import torch.nn.functional as F

In [24]:
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv, GATConv, GATv2Conv, GAT
from torch_geometric.nn import global_mean_pool

In [25]:
# % Preparing Data
data_dir = os.getcwd()   # return the current working directory

# Define the subject numbers
subject_numbers = [1, 2, 5, 9, 21, 31, 34, 39]   # number each EEG subject with their EEG recordings

# Dictionary to hold the loaded data for each subject
subject_data = {} # create an empty dictionary to store the EED data from all subjects

# Loop through the subject numbers and load the corresponding data
for subject_number in subject_numbers: # go through each subject index, process each their .mat files
    mat_fname = pjoin(data_dir, f'S{subject_number}.mat') # pjoin ensures cross-platform compatibility, constructs the full path to each .mat file
    mat_contents = sio.loadmat(mat_fname) # use scipy.io.loadmat to process .mat files
    subject_data[f'S{subject_number}'] = mat_contents[f'Subject{subject_number}'] # rename
del subject_number,subject_numbers,mat_fname,mat_contents,data_dir # delete all other temporary variables that are no longer useful

In [26]:
# %% Phase-GAT
S1 = subject_data['S1'][:,:] # [:,:] selects all rows and all columns from Subject 1


def plvfcn(eegData):
    numElectrodes = eegData.shape[1]
    numTimeSteps = eegData.shape[0] # T is the NO. of time points
    plvMatrix = np.zeros((numElectrodes, numElectrodes)) 
    # generate a square 2D array with every element initialized to 0
    for electrode1 in range(numElectrodes): # from 0 to numElectrodes
        for electrode2 in range(electrode1 + 1, numElectrodes): # save computational load, from electrode1 + 1 to numElectrodes 
            #avoid repeating symmetric computations and self-PLV (PLV(i, j) == PLV(j, i))
            phase1 = np.angle(sig.hilbert(eegData[:, electrode1])) # select all time steps for electrode 1
            phase2 = np.angle(sig.hilbert(eegData[:, electrode2]))
            # Hilbert Transform gives a complex number at each time point, including 
            # Real part: the original signal, and Imaginary part: the phase-shifted version
            # np.angle() extracts the instantaneous phase (in radians) at each time point from the Hilbert result
            phase_difference = phase2 - phase1
            plv = np.abs(np.sum(np.exp(1j * phase_difference)) / numTimeSteps)
            plvMatrix[electrode1, electrode2] = plv # calculate the upper triangle only
            plvMatrix[electrode2, electrode1] = plv # To complete the symmetric matrix, we mirror it
    return plvMatrix

In [27]:
def compute_plv(subject_data):
    idx = ['L', 'R']
    numElectrodes = subject_data['L'][0,1].shape[1] # ['L'][0,1] accesses one single EEG trial (row 0 and column 1)
                                                    # .shape[1] gets the second dimension: No. of electrodes
    plv = {field: np.zeros((numElectrodes, numElectrodes, subject_data.shape[1])) for field in idx}
    # This creates an empty 3D numpy array for each of 'L' and 'R':
    # plv shape: (electrodes × electrodes × trials)
    # example: if 16 electrodes and 40 trials, shape is (16, 16, 40)
    # subject_data.shape[1]: the number of trials per class, stores PLV matrices for each trial

    
    for i, field in enumerate(idx):
        for j in range(subject_data.shape[1]):
            x = subject_data[field][0, j]
            plv[field][:, :, j] = plvfcn(x)
    l, r = plv['L'], plv['R']
    # Unpacks the full PLV arrays for each class
    # l: shape (electrodes, electrodes, num_trials_L)
    # r: shape (electrodes, electrodes, num_trials_R)
    
    yl, yr = np.zeros((subject_data.shape[1], 1)), np.ones((subject_data.shape[1], 1)) 
    # shape: No. of rows = subject_data.shape[1], No. of colume = 1
    
    img = np.concatenate((l, r), axis=2) 
    # concatenate all PLV matrices (left and right trials) along the third dimension
    y = np.concatenate((yl, yr), axis=0)
    y = torch.tensor(y, dtype=torch.long) 
    # In machine learning, tensors are multi-dimensional arrays used to represent data, e.g. vector, matrix...
    # it converts the numpy label array into a PyTorch tensor with integer (long) data type, 
    # ready for model training
    return img, y

plv, y = compute_plv(S1) # S1 is the EEG data from Subject 1
                         # This line runs the function and stores the PLV matrices and labels

In [28]:
def create_graphs(plv, threshold):
    '''plv: a 3D numpy array of Phase Locking Value (PLV) matrices with shape (num_electrodes, num_electrodes, num_trials);
       threshold: a float (e.g. 0.5), used to decide whether to add an edge between nodes (electrodes)'''
    graphs = []
    for i in range(plv.shape[2]): # plv.shape[2] is the number of trials
        G = nx.Graph() # create an empty **undirected graph** using NetworkX
        G.add_nodes_from(range(plv.shape[0])) # add a node for each electrode
        for u in range(plv.shape[0]):
            for v in range(plv.shape[0]):
                if u != v and plv[u, v, i] > threshold: # u != v skips self-connections
                    # plv[u, v, i] > threshold checks if the plv value is strong enough
                    G.add_edge(u, v, weight=plv[u, v, i])
        graphs.append(G)
    return graphs
threshold = 0.5
graphs = create_graphs(plv, threshold)

In [29]:
plv.shape[0]

22

In [30]:
numElectrodes = S1['L'][0,1].shape[1] # S1['L'][0,1] obtains the second EEG trial for Left-hand imagery
                                      # .shape[1] accesses the No. of electrodes in that EEG trial
adj = np.zeros([numElectrodes, numElectrodes, len(graphs)]) # No. of graphs = No. of trials
for i, G in enumerate(graphs): # iterate through each graph G in the graphs list, where i is the index of the graph
    adj[:, :, i] = nx.to_numpy_array(G)
    # This gives a 2D matrix of shape (numElectrodes, numElectrodes) with:
    # 0 if no edge between two electrodes
    # PLV weight (e.g., 0.76) if there is an edge between them

#% Initialize an empty list to store edge indices
edge_indices = [] # % Edge indices are a list of source and target nodes in a graph. Think of it like the adjacency matrix

# Iterate over the adjacency matrices
for i in range(adj.shape[2]): # adj.shape[2] is the number of EEG trials, one trial creates one adjacent-matrix
    # Initialize lists to store source and target nodes
    source_nodes = []
    target_nodes = []
    # In PyG, we represent edges as pairs
    
    # Iterate through each element of the adjacency matrix
    for row in range(adj.shape[0]):
        for col in range(adj.shape[1]):
            # Check if there's an edge
            if adj[row, col, i] >= threshold:
                # Add source and target nodes to the lists
                source_nodes.append(row)
                target_nodes.append(col)
                # If the PLV between node u and v is ≥ threshold (e.g. 0.5), we consider this a valid edge.
            else:
                # If no edge exists, add placeholder zeros to maintain size
                source_nodes.append(0)
                target_nodes.append(0)
    
    # Create edge index as a LongTensor
    edge_index = torch.tensor([source_nodes, target_nodes], dtype=torch.long)
    # torch.tensor creates a tensor called edge_index from source and target nodes,
    # which is the standard format for edge lists in PyTorch Geometric.
    
    # Append edge index to the list
    edge_indices.append(edge_index)

# Stack all edge indices along a new axis to create a 2D tensor
edge_indices = torch.stack(edge_indices, dim=-1)

del col,edge_index,i,row,source_nodes,target_nodes


In [31]:
#%

def aggregate_eeg_data(S1,band): #%% This is to get the feat vector
    """
    Aggregate EEG data for each class.

    Parameters:
        S1 (dict): Dictionary containing EEG data for each class. Keys are class labels, 
                   values are arrays of shape (2, num_samples, num_channels), where the first dimension
                   corresponds to EEG data (index 0) and frequency data (index 1).

    Returns:
        l (ndarray): Aggregated EEG data for class 'L'.
        r (ndarray): Aggregated EEG data for class 'R'.
    """
    idx = ['L', 'R']
    numElectrodes = S1['L'][0,1].shape[1];
    max_sizes = {field: 0 for field in idx} # field = L or R, referring to two classes
    # Initialize a dictionary to store the maximum time length of EEG signals for each class ('L' and 'R').

    # Find the maximum size of EEG data for each class
    for field in idx:
        for i in range(S1[field].shape[1]):
            max_sizes[field] = max(max_sizes[field], S1[field][0, i].shape[0])
            # max_sizes[field] stores the sizes in time samples of each trial for each class (field)
            # S1[field][0,i].shape[0] indexes the time sample dimension of each trial i, and each class (field)
            # its consistently comparing old max to the latest variable in loop and seeing if its bigger than whats stored
            # Go through every trial of 'L' and 'R'
            # Find the longest time length across all trials of each class
            # Store it in max_sizes['L'] and max_sizes['R']

    # Initialize arrays to store aggregated EEG data
    l = np.zeros((max_sizes['L'], numElectrodes, S1['L'].shape[1]))
    r = np.zeros((max_sizes['R'], numElectrodes, S1['R'].shape[1]))

    # Loop through each sample
    # outer loop: over trial index i, inner loop: over classes 'L' and 'R'
    for i in range(S1['L'].shape[1]):
        for j, field in enumerate(idx):
            x = S1[field][0, i]  # EEG data for the current sample
            # Resize x to match the maximum size
            resized_x = np.zeros((max_sizes[field], 22))
            resized_x[:x.shape[0], :] = x
            # Add the resized EEG data to the respective array
            if field == 'L':
                l[:, :, i] += resized_x
            elif field == 'R':
                r[:, :, i] += resized_x

    l = l[..., np.newaxis]
    l = np.copy(l) * np.ones(len(band)-1) # expand the data along a new dimension: frequency bands

    r = r[..., np.newaxis]
    r = np.copy(r) * np.ones(len(band)-1)
    
    return l, r

band = list(range(8, 41, 4))
l,r = aggregate_eeg_data(S1,band)
l,r = np.transpose(l,[1,0,2,3]),np.transpose(r,[1,0,2,3])
# from (time, electrodes, trials, bands) to (electrodes, time, trials, bands)

In [32]:
fs = 256 # fs: sampling frequency, 256 EEG signals are sampled per second

def bandpass(data: np.ndarray, edges: list[float], sample_rate: float, poles: int = 5):
    '''apply the Butterworth bandpass filter to the input EEG signal, in order to remove the un-useful frequency sections:
       data: the raw EEG time-series signal (1D or 2D NumPy array)
       edges: a two-element list, [low_cutoff, high_cutoff] in Hz
       sample_rate: sampling frequency (e.g. 256 Hz)
       poles: the filter order (default is 5). Higher = sharper filter'''
    sos = sig.butter(poles, edges, 'bandpass', fs=sample_rate, output='sos')
    filtered_data = sig.sosfiltfilt(sos, data)
    return filtered_data

for i in range(l.shape[3]): # l.shape[3] is the No. of bands
    bp = [band[i],band[i+1]] # define the low and high frequency cutoffs
    for j in range(l.shape[2]): # l.shape[2] is the No. of trials
        l[:,:,j,i] = bandpass(l[:,:,j,i],bp,sample_rate=fs)
        r[:,:,j,i] = bandpass(r[:,:,j,i],bp,sample_rate=fs)

In [33]:
#% Convert data from 22x1288x150xF to 22xFx150 where nodes x features x sample
# features are BP of the band. 

def bandpower(data,low,high):
    '''a common EEG feature that quantifies the strength of brain rhythms, in a particular frequency range'''

    fs = 256
    # Define window length (2s)
    win = 2* fs
    freqs, psd = signal.welch(data, fs, nperseg=win)
    # Apply Welch’s method to estimate the power spectral density
    
    # Find intersecting values in frequency vector
    idx_delta = np.logical_and(freqs >= low, freqs <= high)
    
    # # Plot the power spectral density and fill the delta area
    # plt.figure(figsize=(7, 4))
    # plt.plot(freqs, psd, lw=2, color='k')
    # plt.fill_between(freqs, psd, where=idx_delta, color='skyblue')
    # plt.xlabel('Frequency (Hz)')
    # plt.ylabel('Power spectral density (uV^2 / Hz)')
    # plt.xlim([0, 40])
    # plt.ylim([0, psd.max() * 1.1])
    # plt.title("Welch's periodogram")
    
    # Frequency resolution
    freq_res = freqs[1] - freqs[0]  # = 1 / 4 = 0.25
    # for [8, 12] bandpass, we find which bins fall inside it by [8.0, 8.25, 8.5, ..., 12.0]
    
    # Compute the absolute power by approximating the area under the curve
    power = simps(psd[idx_delta], dx=freq_res) # Simpson's rule: numerical intergral
    
    return power

In [34]:
#print("Shape of l:", l.shape)

In [35]:
#from tqdm import tqdm

#def bandpowercalc(l, band, fs):
    #x = np.zeros([l.shape[0], l.shape[3], l.shape[2]])
    #for i in tqdm(range(l.shape[0]), desc="Subject"):
        #for j in range(l.shape[2]):  # sample
            #for k in range(l.shape[3]):  # band
                #data = l[i,:,j,k]
                #low = band[k]
                #high = band[k+1]
                #x[i,k,j] = bandpower(data, low, high)
    #return x


#l_test = l[:1]
#r_test = r[:1]

#l_feat = bandpowercalc(l_test, band, fs)
#r_feat = bandpowercalc(r_test, band, fs)

#x = np.concatenate([l_feat, r_feat], axis=2)
#x = torch.tensor(x, dtype=torch.float32)

In [36]:
def bandpowercalc(l,band,fs):  
    '''transforms the filtered EEG signals into node-wide bandpower features'''
    
    x = np.zeros([l.shape[0],l.shape[3],l.shape[2]])
    for i in range(l.shape[0]): #node
        for j in range(l.shape[2]): #sample
            for k in range(0,l.shape[3]): #band 
                # in this case, the loop order does not matter
                data = l[i,:,j,k]
                low = band[k]
                high = band[k+1] # define low and high band edges
                x[i,k,j] = bandpower(data,low,high)

    return x

In [37]:
# could overload memory
#l = bandpowercalc(l,band,fs)
#r = bandpowercalc(r,band,fs)

#x = np.concatenate([l,r],axis=2)
#x = torch.tensor(x,dtype=torch.float32)

#del r,l,S1,i,j,G,bp 



# try batch-wise bandpower features
x_list = []  # store tensors batch by batch

for i in range(l.shape[0]):  # loop over subjects
    # extract one subject's data
    l_i = l[i:i+1]
    r_i = r[i:i+1]
    
    # compute features only for that subject
    l_feat = bandpowercalc(l_i, band, fs)
    r_feat = bandpowercalc(r_i, band, fs)
    
    # concatenate features
    x_i = np.concatenate([l_feat, r_feat], axis=2)
    
    # convert to tensor and append
    x_i = torch.tensor(x_i, dtype=torch.float32)
    x_list.append(x_i)

# stack all mini-tensors together
x = torch.cat(x_list, dim=0)


In [38]:
#%
from torch_geometric.data import Data

data_list = [] # create an empty Python list to hold one Data object per EEG trial.
for i in range(np.size(adj,2)): # iterate through each trial
    data_list.append(Data(x=x[:, :, i], edge_index=edge_indices[:,:,i], y=y[i, 0]))


# def split(data_list, train_p, val_p, test_p):
#     num_samples = len(data_list)
#     a = num_samples // 3
#     class_splits = [
#         int(train_p * a), int(train_p * a), int(train_p * a),
#         int(val_p * a), int(val_p * a), int(val_p * a),
#         int(test_p * a), int(test_p * a), int(test_p * a)
#     ]
#     indices = list(range(num_samples))
#     #np.random.shuffle(indices)
    
#     train_indices = indices[:sum(class_splits[:3])]
#     val_indices = indices[sum(class_splits[:3]):sum(class_splits[:6])]
#     test_indices = indices[sum(class_splits[:6]):]

#     train_data = [data_list[i] for i in train_indices]
#     val_data = [data_list[i] for i in val_indices]
#     test_data = [data_list[i] for i in test_indices]

#     return train_data, val_data, test_data


# train,val,test = split(data_list,0.5,0,0.5)



In [39]:
#% split data so its sequentially 1,2,3
# now the data_list is grouped by class: [L, L, L, ..., R, R, R, ...]

datal =[] # store left-hand samples
datar =[] # store right-hand samples
size = len(data_list) # the total No. of trials
idx = size//2
c = [0,idx,idx*2,idx*3]

datal = data_list[c[0]:c[1]]
datar = data_list[c[1]:c[2]]

data_list = []

for i in range(idx):
    x = [datal[i],datar[i]] #datare[i]]
    data_list.extend(x)
    # ends up with [L, R, L, R, ...]


size = len(data_list)

In [1]:
# Initialize KFold with 5 splits
kf = KFold(n_splits=2, shuffle=True, random_state=42)
# use the K-Fold Cross-Validation to evaluate the Graph Neural Network (GNN)
# the dataset is split into K equal parts, called 'fold'; if K=2, we have two folds:
# fold 1: used for training in the first run, testing in the second;
# fold 2: used for testing in the first run, training in the second;


# List to store highest test accuracies of each fold
highest_test_accuracies = []

# Iterate over each fold
for fold, (train_idx, test_idx) in enumerate(kf.split(data_list)):
    # Create training and testing sets
    train = [data_list[i] for i in train_idx]
    test = [data_list[i] for i in test_idx]
    # create lists of data objects (PyTorch Geometric graphs) using the split indices
    
    # Print the number of samples in train and test sets for each fold
    print(f"Fold {fold + 1}")
    print(f"Train size: {len(train)}")
    print(f"Test size: {len(test)}")
    print("=" * 20)
    
    torch.manual_seed(12345) # torch.manual_seed(seed), use the same sequence of random numbers every time when running this script

    train_loader = DataLoader(train, batch_size=8, shuffle=False)
    test_loader = DataLoader(test, batch_size=8, shuffle=False)

    for step, data in enumerate(train_loader):
        print(f'Step {step + 1}:')
        print('=======')
        print(f'Number of graphs in the current batch: {data.num_graphs}')
        print(data)
        print()
        # print out the contents of the first few training batches — helpful for debugging.
    
    # class GCN(torch.nn.Module):
    #     def __init__(self, hidden_channels):
    #         super(GCN, self).__init__()
    #         torch.manual_seed(12345)
    #         self.conv1 = GCNConv(8, hidden_channels)  # num node features
    #         self.conv2 = GCNConv(hidden_channels, hidden_channels)
    #         self.conv3 = GCNConv(hidden_channels, hidden_channels)
    #         self.lin = Linear(hidden_channels, 2)  # num of classes
    
    # defines a Graph Attention Network with 3 graph attention layers + 1 linear output layer
    class GAT(torch.nn.Module):
        def __init__(self, hidden_channels,heads):
            super(GAT, self).__init__()
            torch.manual_seed(12345)
            
            # Graph Convolution = Aggregating features from a node’s neighbors
            # Each node updates its features based on:
            # Its own current features
            # The features of its neighbors
            # The connection strengths (edge weights or attention scores)
            
            self.conv1 = GATv2Conv(8, hidden_channels, heads=heads, concat=True)  # num node features
            # input: 8 features for each node
            # output: each node will get (hidden_channels * heads) features
            
            self.conv2 = GATv2Conv(hidden_channels*heads, hidden_channels, heads=heads, concat=True)
            self.conv3 = GATv2Conv(hidden_channels*heads, hidden_channels, heads=heads,concat=True)
            self.lin = Linear(hidden_channels*heads, 2)  # two outputs, num of classes: 2

        def forward(self, x, edge_index, batch):
            # 1. Obtain node embeddings 
            x = self.conv1(x, edge_index)
            x = x.relu()
            x = self.conv2(x, edge_index)
            x = x.relu()
            x = self.conv3(x, edge_index)

            # 2. Readout layer
            x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]
            # pools all node embeddings in each graph into a single graph embedding by averaging, 
            # to get the feature for each single graph

            # 3. Apply a final classifier
            x = F.dropout(x, p=0.5, training=self.training)
            x = self.lin(x)
            
            return x

    model = GAT(hidden_channels=64,heads=2)
    
    # use Adam optimizer and cross-entropy loss for classification (left vs right motor imagery)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.CrossEntropyLoss()

    def train():
        model.train()
        for data in train_loader:  # Iterate in batches over the training dataset.
            out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
            loss = criterion(out, data.y)  # Compute the loss.
            loss.backward()  # Derive gradients, 'backpropagation'
            optimizer.step()  # Update parameters based on gradients.
            optimizer.zero_grad()  # Clear gradients.
            # loops over all batches in the training set to:
            # 1. compute output
            # 2. calculate loss
            # 3. backpropagate
            # 4. update weights theta

    def test(loader):
        model.eval()
        correct = 0
        with torch.no_grad():
            for data in loader:  # Iterate in batches over the training/test dataset.
                out = model(data.x, data.edge_index, data.batch)  
                pred = out.argmax(dim=1)  # Use the class with highest probability.
                correct += int((pred == data.y).sum())  # Check against ground-truth labels.
        return correct / len(loader.dataset)  # Derive ratio of correct predictions.
    # evaluate model accuracy (no gradient update):
    # 1. take predictions out
    # 2. choose the most likely class argmax
    # 3. compare with ground-truth data.y


    optimal = [0, 0, 0] 
    for epoch in range(1, 10): # run the training set for 200 epochs on the current fold
        train()
        train_acc = test(train_loader)
        test_acc = test(test_loader)
        av_acc = np.mean([train_acc, test_acc])
        
        if test_acc > optimal[2]:
            optimal[0] = av_acc
            optimal[1] = train_acc
            optimal[2] = test_acc
        
        print(f'Opt_Av: {optimal[0]:.4f}, Opt_Train: {optimal[1]:.4f}, Opt_Test: {optimal[2]:.4f}')

    # Store the highest test accuracy of the current fold
    highest_test_accuracies.append(optimal[2])
    
    import gc
    
    # after each fold finishes
    del model, optimizer, criterion, train_loader, test_loader
    torch.cuda.empty_cache()
    gc.collect()


# Calculate and print the mean of the highest test accuracies across all folds
mean_highest_accuracy = np.mean(highest_test_accuracies)
print(f'Mean Highest Test Accuracy Across 5 Folds: {mean_highest_accuracy:.4f}')

NameError: name 'KFold' is not defined