# Title: CNN_LSTM Model (MSC/MPC Features)

This notebook demonstrates how to train and evaluate a CNN_LSTM model on EEG data (processed via MSC/MPC features). It includes steps for data loading, augmentation, dimensionality reduction (PCA), training, confusion matrix visualization, per-class bar charts, and an accuracy comparison with/without PCA.

In [4]:
import scipy.io
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import math

# Create CNN_LSTM directory if it doesn't exist
os.makedirs("CNN_LSTM", exist_ok=True)

# Choose device (GPU if available)
device = (
    "mps" if torch.backends.mps.is_available() 
    else "cuda" if torch.cuda.is_available() 
    else "cpu"
)
print("Using device:", device)



Using device: mps


# Loading EEG data
This section loads the EEG datasets, merges them if necessary, and displays initial information about the data structures. We then define some variables and dictionaries related to the classification labels (words).

In [32]:
print("Loading EEG data...")
import scipy.io
import os
import pandas as pd
import numpy as np

# Load data
data = pd.read_csv('EEG_data_subject4', sep="\t")  # example path
df_imagined = pd.read_csv('EEG_data_subject3', sep="\t")
df_imagined.drop(["Time"], axis=1, inplace=True)
df_inner = pd.read_csv('EEG_data_subject4', sep="\t")
df_inner.drop(["Time"], axis=1, inplace=True)


Loading EEG data...


In [None]:
df_imagined

In [None]:
df_combined = pd.concat([df_imagined, df_inner])

# Convert to numpy
data = np.array(df_imagined)
data = data.transpose()

# Load labels (words)
y_array = np.load("labels_word_list.npy")

print(f"Shape of data_array: {data.shape}, Labels shape: {y_array.shape}")
print("First 10 labels:", y_array[:10])

# Dictionary of words to label indices
num_dict = {
    'date': 0, 'goose': 1, 'spruce': 2, 'knight': 3, 'juice': 4, 'moose': 5, 'night': 6,
    'queen': 7, 'berry': 8, 'hedgehog': 9, 'water': 10, 'daughter': 11, 'gooseberry': 12,
    'waterfowl': 13, 'wilderness': 14, 'relative': 15, 'watermelon': 16, 'caterpillar': 17,
    'environment': 18, 'ambassador': 19
}

# Create reverse mapping
word_dict = {v: k for k, v in num_dict.items()}

# 2) Data Epoching / Segmenting the Data

In [None]:
# We break the EEG data into epochs of 256 samples, you are effectively segmenting the data into 1-second intervals.
print("Epoching data...")
n=int(data.shape[1]/256)
epoched_data=[]
for i in range(n):
    epoched_data.append(data[:,256*i:256*(i+1)])
epoched_data=np.array(epoched_data)


# Shuffling / random permutation
Ensures randomization of the epochs and their labels.

In [None]:
# Apply random permutation
p=np.random.RandomState(seed=42).permutation(len(y_array))
epoched_data=epoched_data[p]
y_array=y_array[p]
print(f"Epoched data shape: {epoched_data.shape}") #(num_epochs,num_channels,epoch_length)
print("Shuffled y_array shape:", y_array.shape)

# Data Augmentation
Creates additional “sub-epochs” by sliding windows within each original epoch.

Here, we create overlapping sub‐epochs (128 samples each, with a certain overlap) to artificially increase the number of training examples. Then we replicate labels accordingly.


In [None]:
print("Augmenting data...")
augmented_data=[]
n=epoched_data.shape[0]
length=128
overlap=32
for i in range(n):
    for j in range(0,length+1,overlap):
        augmented_data.append(epoched_data[i][:,j:j+length])
augmented_data=np.array(augmented_data)
print("Augmented data shape:", augmented_data.shape) #(num_epochs,num_channels,epoch_length)

In [None]:
epoched_data=augmented_data
print(f"Augmented data shape: {epoched_data.shape}")

In [None]:
epoched_data.shape

In [None]:
# Adjust Label for Augmented Data. i.e Replicate each label the appropriate number of times
# You must replicate each label 5 times

augmented_labels = []
for label in y_array:
    for _ in range(5):  # X= 5 Because from each epoch, we created 5 sub-epochs
        augmented_labels.append(label)

y_array = np.array(augmented_labels)
print(f"Augmented labels Y_shape: {y_array.shape}")    # should now be (5000,)


# Creating continuous data from epoched data

We reshape/concatenate the epochs so we can perform subsequent filtering (alpha, beta, gamma extraction) using MNE functions.


In [None]:
#creating continuous data from epoched data
continuous_data=[]
trials=epoched_data.shape[0]
channels=epoched_data.shape[1]
sample_size=epoched_data.shape[2]
for i in range(channels):
    continuous_data.append(epoched_data[:,i].reshape(trials*sample_size))
continuous_data=np.array(continuous_data)
print("Continuous data shape:", continuous_data.shape)


# **Extracting Alpha, Beta, Gamma Bands with MNE from continuous data**

We apply notch filtering at 60 Hz, downsample, and specifically extract alpha, beta, and gamma bands for each channel, then re‐epoch them.


In [None]:
pip install mne --quiet

In [None]:
# Setup MNE for filtering

import mne
sfreq=128
ch_names=["F3","FC5","AF3","F7","T7","P7","O1","O2","P8","T8","F8","AF4","FC6","F4"] 
info=mne.create_info(ch_names,sfreq=sfreq)
raw=mne.io.RawArray(continuous_data,info)
raw.plot(scalings = 'auto');

print("Filtering EEG bands...")

# Notch filter at 60 Hz
raw.notch_filter(60,picks='all')

# (Optional) downsampling
raw.resample(120, npad='auto')

# Extracting the alpha, beta, gamma from EEG
# Uses MNE‐Python’s FIR filter to isolate each band.

import mne.filter
alpha_continuous=mne.filter.filter_data(continuous_data,128,8,12)
beta_continuous=mne.filter.filter_data(continuous_data,128,12,30)
gamma_continuous=mne.filter.filter_data(continuous_data,128,30,50)

print("alpha_continuous shape:", alpha_continuous.shape)

# Re-epoch the filtered signals

trial_duration=epoched_data.shape[2] #trial duration
n=epoched_data.shape[0]

alpha_epoched=[]
beta_epoched=[]
gamma_epoched=[]

for i in range(n):
    alpha_epoched.append(alpha_continuous[:,i*trial_duration:(i+1)*trial_duration])
    beta_epoched.append(beta_continuous[:,i*trial_duration:(i+1)*trial_duration])
    gamma_epoched.append(gamma_continuous[:,i*trial_duration:(i+1)*trial_duration])

alpha_epoched=np.array(alpha_epoched)
beta_epoched=np.array(beta_epoched)
gamma_epoched=np.array(gamma_epoched)

print("alpha_epoched shape:", alpha_epoched.shape)  #(num_epochs,num_channels,epoch_length)

# Feature Extraction Techniques

# **Type 2: Feature Extraction (MPC & MSC)**

We compute Mean Phase Coherence (MPC) and Magnitude Squared Coherence (MSC) for alpha, beta, gamma bands, and assemble them into a feature vector. This process can be computationally heavy, so a progress indicator (`print(i,end=' ')`) is shown.


In this second technique, the features,  **mean phase coherence (MPC)** is extracted along with **magnitude-squared coherence (MSC)** from the augmented data \citep{panachakel2021decoding}. MPC between two EEG channels is described as a measure of their phase synchronisation. The mean phase coherence (MPC) between two EEG signals with instantaneous phase difference $
    \phi(t)=\phi_1(t)-\phi_2(t)
$ can be estimated via,

$
    \lambda = \frac{1}{N}\Bigg|\sum_{n=0}^{N-1}e^{j(\hat{\phi}_i(n))}\Bigg|
$
where $
    {(\hat{\phi}_i(n))}_{n=0}^{N-1}
$ is the estimation of $
    \phi(t)
$,
where N is the number of samples,and the instantaneous phases are computed using Hilbert transform.
Where as, if a pair of signals are in spectral domain, MSC computes the linear relationship between them. Hamming window is used for this process. Let the auto-spectral densities and the cross-spectral density  of $
    x(t)
$ and $
    y(t)
$  be denoted by $
    P_{xx}(f)
$, $
    P_{yy}(f)
$  and $
    P_{xy}(f)
$ respectively at frequency f. The MSC between them is given by:

$
    \gamma _{xy}^{2} (f) = {{\left\vert {P_{xy} (f)} \right\vert^{2} } \over {P_{xx} (f)P_{yy} (f)}} 
$

In [None]:
#calculation of hilbert array from augmented array

**MSC (Magnitude Squared Coherence)**:  Measures the strength of correlation between two EEG signals in the frequency domain.

**MPC (Mean Phase Coherence)**:  Measures how well two EEG signals stay in phase over time.

In [None]:
from scipy.signal import hilbert , welch, csd, coherence

def msc(arr1, arr2):
    #Magnitude‐Squared Coherence (MSC)
    #Measures the linear correlation in the frequency domain.
    fs = 128
    f, Cxy = coherence(arr1, arr2, fs=fs, window="hamm", nperseg=8)
    return np.mean(Cxy)

def mpc(arr1, arr2):
    #Mean Phase Coherence (MPC)
    # Measures phase synchronization between two signals.
    imag_1 = np.imag(hilbert(arr1))
    imag_2 = np.imag(hilbert(arr2))
    phase_1 = np.arctan2(imag_1, arr1, out=np.zeros_like(imag_1), where=arr1!=0)
    phase_2 = np.arctan2(imag_2, arr2, out=np.zeros_like(imag_2), where=arr2!=0)
    phase_diff = (phase_1 - phase_2)
    return np.linalg.norm(np.sum(np.exp(1j*phase_diff))) / len(arr1)

print("Calculating coherence matrices ...")

n = alpha_epoched.shape[0]
m = alpha_epoched.shape[1]
l=alpha_epoched.shape[2]

# Initialize matrices
mpc_alpha = np.zeros([n,m,m])
mpc_beta  = np.zeros([n,m,m])
mpc_gamma = np.zeros([n,m,m])

msc_alpha = np.zeros([n,m,m])
msc_beta  = np.zeros([n,m,m])
msc_gamma = np.zeros([n,m,m])

for i in range(n):
    for j in range(m):
        for k in range(m):
            mpc_alpha[i][j][k] = mpc(alpha_epoched[i][j], alpha_epoched[i][k])
            msc_alpha[i][j][k] = msc(alpha_epoched[i][j], alpha_epoched[i][k])

            mpc_beta[i][j][k]  = mpc(beta_epoched[i][j],  beta_epoched[i][k])
            msc_beta[i][j][k]  = msc(beta_epoched[i][j],  beta_epoched[i][k])

            mpc_gamma[i][j][k] = mpc(gamma_epoched[i][j], gamma_epoched[i][k])
            msc_gamma[i][j][k] = msc(gamma_epoched[i][j], gamma_epoched[i][k])
    print(i, end=' ')

print("\nMPC/MSC calculations complete!")

# **Creating the Feature Vector**

We pack MPC (above the diagonal) and MSC (below the diagonal) for alpha, beta, gamma into a single 3D array, then flatten.


In [None]:
mpc_alpha_copy=mpc_alpha
mpc_beta_copy=mpc_beta
mpc_gamma_copy=mpc_gamma

msc_alpha_copy=msc_alpha
msc_beta_copy=msc_beta
msc_gamma_copy=msc_gamma

print("MSC Copy Shape",msc_alpha_copy.shape,"\nMPC Copy Shape", mpc_alpha.shape)

files=[mpc_alpha,mpc_beta,mpc_gamma,msc_alpha,msc_beta,msc_gamma]
file_names=["mpc_alpha","mpc_beta","mpc_gamma","msc_alpha","msc_beta","msc_gamma"]

#Saving the files
part="01"
file_type="inner"
aug_type="non-aug"

for i in range(len(files)):
    path=''+part+'/'+part+'_'+file_type+'/'+aug_type+'/'+file_names[i]+'.npy'
    # np.save(path, files[i])  # Uncomment if you want to save to disk
#loading the files

# Example re-loading (disabled by default)
for i in range(len(files)):
    path=''+part+'/'+part+'_'+file_type+'/'+aug_type+'/'+file_names[i]+'.npy'
    # files[i] = np.load(path) # Uncomment if you want to load from disk

mpc_alpha = files[0]
mpc_beta  = files[1]
mpc_gamma = files[2]
msc_alpha = files[3]
msc_beta  = files[4]
msc_gamma = files[5]

print("MSC Shape",msc_alpha.shape,"\nMPC Shape", mpc_alpha.shape,"\n")

#Creating feature vectors
print("\nCreating feature vectors...")

n_1=msc_alpha.shape[0]
m_1=msc_alpha.shape[1]
x_array_2=np.zeros([n_1,m_1,m_1,3])


print("mpc_alpha shape:", mpc_alpha.shape)
print("mpc_beta shape:", mpc_beta.shape)
print("mpc_gamma shape:", mpc_gamma.shape)

print("msc_alpha shape:", msc_alpha.shape)
print("msc_beta shape:", msc_beta.shape)
print("msc_gamma shape:", msc_gamma.shape)

for i in range(n_1):
    for j in range(m_1):
        for k in range(m_1):
            if j<k:
                x_array_2[i][j][k]=[mpc_alpha[i][j][k],mpc_beta[i][j][k],mpc_gamma[i][j][k]]
            elif j>k:
                x_array_2[i][j][k]=[msc_alpha[i][j][k],msc_beta[i][j][k],msc_gamma[i][j][k]]



# Reshape to 2D
a=x_array_2.reshape(n_1,m_1*m_1*3)

# remove the zero diagonal
a=a[a!=0.0]
a.shape
x_array_2=a.reshape(n_1,m_1*m_1*3-m_1*3)

print("Final feature shape (x_array_2):", x_array_2.shape)

# Save X, Y if desired
# np.save(f"{part}/{part}_{file_type}_X.npy", x_array_2)
# np.save(f"{part}/{part}_{file_type}_Y.npy", y_array)

x_array = x_array_2

# Reshaping x_array for a CNN_LSTM


In [None]:
import numpy as np



N, channels, time = epoched_data.shape  # e.g. (1000, 14, 128)


# Prepare data for CNN_LSTM model
print("Preparing data for CNN_LSTM model...")
X_seq = np.transpose(epoched_data, (0, 2, 1))  # => (N, 128, 14)


print("X_seq shape:", X_seq.shape)       # (1000, 128, 14)
print("y_array shape:", y_array.shape)   # (1000,)

In [None]:
import numpy as np

N, F = x_array.shape
seq_len = 1
input_dim = F

# Reshape from (N, F) -> (N, 1, F)
X_seq = x_array.reshape(N, seq_len, input_dim)

print("X_seq shape:", X_seq.shape)  # (N, 1, F)


**In order to reduce the dimension of the feature vector Principal component analysis (PCA) was used.**

# **Principal component analysis (PCA) / Standardization**
Standard scaling is also used to zero‐mean/unit‐variance the features.
We apply PCA to capture 95% variance. This yields the “with PCA” scenario. We will compare it to a “without PCA” scenario later.


In [None]:
from sklearn.decomposition import PCA

# Let's store a copy of x_array BEFORE PCA for the no-PCA scenario
x_array_no_pca = x_array.copy()

print("Applying PCA for feature reduction...")
pca = PCA(0.95)
pca.fit(x_array)
x_array_pca = pca.transform(x_array)

print("After PCA, shape:", x_array_pca.shape)
print("Without PCA, shape:", x_array_no_pca.shape)
print("Labels shape:", y_array.shape)

# We'll do 2 separate runs below:
# 1) x_array_pca => "with PCA"
# 2) x_array_no_pca => "without PCA"


# **Building a PyTorch Dataset & DataLoader**

We create a custom dataset class for our (X, y) data and split into train/val/test sets.  
We’ll define a helper function to build the dataset for either PCA or no‐PCA inputs.


In [None]:
data_flattened = data.reshape(data.shape[0], -1)  # Shape: (5000, 14 * 128)

# Apply PCA
pca = PCA(n_components=71)
data_pca = pca.fit_transform(data_flattened)  # Shape: (5000, 71)

# Reshape data for CNN-LSTM input (num_samples, sequence_length, num_features)
data_seq = data.reshape(data.shape[0], data.shape[2], data.shape[1])  # Shape: (5000, 128, 14)
data_pca_seq = data_pca.reshape(data_pca.shape[0], 1, data_pca.shape[1])  # Shape: (5000, 1, 71)

# Split the data into training, validation, and test sets
X_train, X_test, y_train, y_test = train_test_split(data_seq, labels, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.125, random_state=42)

X_train_pca, X_test_pca, y_train_pca, y_test_pca = train_test_split(data_pca_seq, labels, test_size=0.2, random_state=42)
X_train_pca, X_val_pca, y_train_pca, y_val_pca = train_test_split(X_train_pca, y_train_pca, test_size=0.125, random_state=42)

In [None]:
pip install torch torchvision torchaudio --quiet

# Define the CNN-LSTM Model

In [None]:
class CNN_LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes, kernel_size=3, stride=1, padding=1):
        super(CNN_LSTM, self).__init__()
        
        # CNN layers
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=input_dim, out_channels=32, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=kernel_size, stride=stride, padding=padding),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        
        # LSTM layers
        self.lstm = nn.LSTM(input_size=64, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, x):
        # CNN
        x = self.cnn(x)
        
        # Permute for LSTM input (batch_size, seq_len, features)
        x = x.permute(0, 2, 1)
        
        # LSTM
        h_lstm, _ = self.lstm(x)
        
        # Fully connected layer
        out = self.fc(h_lstm[:, -1, :])
        
        return out

## Define the Dataset and DataLoader



In [None]:
class EEGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Create datasets
train_dataset = EEGDataset(X_train, y_train)
val_dataset = EEGDataset(X_val, y_val)
test_dataset = EEGDataset(X_test, y_test)

train_dataset_pca = EEGDataset(X_train_pca, y_train_pca)
val_dataset_pca = EEGDataset(X_val_pca, y_val_pca)
test_dataset_pca = EEGDataset(X_test_pca, y_test_pca)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

train_loader_pca = DataLoader(train_dataset_pca, batch_size=batch_size, shuffle=True)
val_loader_pca = DataLoader(val_dataset_pca, batch_size=batch_size, shuffle=False)
test_loader_pca = DataLoader(test_dataset_pca, batch_size=batch_size, shuffle=False)

# Training Loop

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss/len(val_loader)}, Val Acc: {100 * correct / total}%')

# Initialize the model
input_dim = X_train.shape[2]  # Number of features (channels)
hidden_dim = 128
num_layers = 2
num_classes = len(np.unique(labels))

model = CNN_LSTM(input_dim, hidden_dim, num_layers, num_classes)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=20)

# **Evaluate the Model on Test Data**



In [None]:
def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    print(f'Test Accuracy: {100 * correct / total}%')

# Evaluate the model
evaluate_model(model, test_loader)

# Repeat for PCA Data

In [None]:
# Initialize the model for PCA data
input_dim_pca = X_train_pca.shape[2]  # Number of features after PCA
model_pca = CNN_LSTM(input_dim_pca, hidden_dim, num_layers, num_classes)

# Define loss function and optimizer
criterion_pca = nn.CrossEntropyLoss()
optimizer_pca = optim.Adam(model_pca.parameters(), lr=0.001)

# Train the model on PCA data
train_model(model_pca, train_loader_pca, val_loader_pca, criterion_pca, optimizer_pca, num_epochs=20)

# Evaluate the model on PCA data
evaluate_model(model_pca, test_loader_pca)

# **Compare Result**



In [None]:
# Confusion Matrix for Non-PCA Data
def plot_confusion_matrix(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    
    y_true = []
    y_pred = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())
    
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.show()

# Plot confusion matrix for Non-PCA data
plot_confusion_matrix(model, test_loader)

# Plot confusion matrix for PCA data
plot_confusion_matrix(model_pca, test_loader_pca)