##### Create dictionary of labels:urchan from Channels.csv

In [1]:
import pandas as pd

df = pd.DataFrame(pd.read_csv("Channels.csv"))

channels_dict=df.set_index('labels')['urchan']
channels_dict

labels
Fp1     1
Fp2     2
F3      3
F4      4
C3      5
       ..
PO8    63
Fpz    64
CPz    65
POz    66
Oz     67
Name: urchan, Length: 66, dtype: int64

##### Actual code

In [2]:
import mne
import os
from pathlib import Path
import cupy
import numpy as np

In [3]:
data_dir = Path("data")
import numpy as np

data_all = []
labels_all = []

os.environ['MNE_USE_CUDA'] = 'true' 
mne.utils.set_config('MNE_USE_CUDA', 'true') 
mne.cuda.init_cuda(ignore_config=True)  

Now using CUDA device 0
Enabling CUDA with 10.83 GB available memory


In [4]:
MIN_POINTS = 100  #Minimum number of time points per trial

for file_path in data_dir.glob("*.set"):
    file_name = file_path.stem
    if "PREP" not in file_name:
        number = int(file_name.split('_')[0])
        label = 1 if number % 2 == 1 else 0  #1=Parkinson's, 0=Non-Parkinson's
        raw = mne.io.read_raw_eeglab(file_path, preload=True) 
        #Bandpass filter
        raw.filter(1., 30., fir_design='firwin', n_jobs='cuda')

        #Apply ICA
        ica = mne.preprocessing.ICA(n_components=15, random_state=22, max_iter=1000, method='picard')
        ica.fit(raw)
        raw = ica.apply(raw)

        #Convert to numpy format
        data = raw.get_data()#(n_channels, n_times)
        data = data.T

        if data.shape[0] >= MIN_POINTS:
            data_all.append(data)
            labels_all.append(label)


print("Valid Trials:",len(data_all))


#minimum length of filtered data
target_length = min(data.shape[0] for data in data_all)

#Standardize the shape of each array in data_all
for i, data in enumerate(data_all):
    if data.shape[0] > target_length:
        #Shorten long arrays
        data_all[i] = data[:target_length, :]
    elif data.shape[0] < target_length:
        #Lengthen short arrays
        padding = target_length - data.shape[0]
        data_all[i] = np.pad(data, ((0, padding), (0, 0)), mode="constant")

data_all = np.stack(data_all)#(n_trials, n_times, n_channels)
labels_all = np.array(labels_all)

# Check the shapes of the final dataset
print(f"Final dataset shape: {data_all.shape} (trials, time points, channels)")
print(f"Labels shape: {labels_all.shape}")

Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 1 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 1651 samples (3.302 s)

Using CUDA for FFT FIR filtering
Fitting ICA to data using 66 channels (please be patient, this may take a while)
Selecting by number: 15 components
Fitting ICA took 2.8s.
Applying ICA to Raw instance
    Transforming to ICA space (15 components)
    Zeroing out 0 ICA components
    Projecting back using 66 PCA components
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 1 - 30 Hz

FIR filter parameters
------

  ica.fit(raw)


Fitting ICA to data using 66 channels (please be patient, this may take a while)
Selecting by number: 15 components
Fitting ICA took 3.0s.
Applying ICA to Raw instance
    Transforming to ICA space (15 components)
    Zeroing out 0 ICA components
    Projecting back using 66 PCA components
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 1 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 1651 samples (3.302 s)

Using CUDA for FFT FIR filtering
Fitting ICA to data using 66 channels (please be patient, this may take a while)
Selecting by number: 15 components

In [5]:
import torch
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader

In [6]:
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(data_all, labels_all, test_size=0.2, random_state=42)
X_train_tensor = torch.from_numpy(X_train).float()
X_test_tensor = torch.from_numpy(X_test).float()
y_train_tensor = torch.from_numpy(y_train).long()
y_test_tensor = torch.from_numpy(y_test).long()


# Create TensorDatasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create DataLoaders
train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=32, shuffle=False)


In [7]:
def train(model, train_dataloader, optimizer, criterion, print_freq=10):
    model.train()
    train_loss = 0
    
    for batch_index, (data, target) in enumerate(train_dataloader):
        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        if batch_index % print_freq == 0:
            print(f"Batch {batch_index}, Loss: {loss.item():.4f}")
    
    return train_loss / len(train_dataloader)

def test(model, test_dataloader, criterion):
    model.eval()

    test_loss = 0
    correct = 0

    with torch.no_grad():
        for batch_index, (data, target) in enumerate(test_dataloader):
            data, target = data.cuda(), target.cuda()

            output = model(data)
            test_loss += criterion(output, target).item()

            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_dataloader.dataset)
    test_accuracy = correct / len(test_dataloader.dataset)

    return test_loss, test_accuracy

def train_model(model, train_dataloader, test_dataloader, optimizer, criterion, num_epochs):
    loss_results = []
    accuracy_results = []
    for epoch in range(num_epochs):
        train_loss = train(model, train_dataloader, optimizer, criterion)
        test_loss, test_accuracy = test(model, test_dataloader, criterion)

        print(
            f"Epoch: {epoch + 1} | Train loss: {train_loss:.5f} |",
            f"Test loss: {test_loss:.5f} | Test accuracy: {test_accuracy:.5f}"
        )
        accuracy_results.append([epoch + 1, test_accuracy])
        loss_results.append([epoch + 1, test_loss])
    return accuracy_results, loss_results

In [8]:
import torch.nn as nn
import torch.nn.functional as F

class EEGClassifier(nn.Module):
    def __init__(self, input_channels=66):
        super(EEGClassifier, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.flatten = nn.Flatten()        
        self.adaptive_pool = nn.AdaptiveAvgPool1d(100)

        self.fc1 = nn.Linear(64 * 100, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 64)
        self.fc4 = nn.Linear(64, 64)
        self.fc5 = nn.Linear(64, 2)
        
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = x.permute(0, 2, 1)#New shape: (batch_size, channels, time_points)
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = self.adaptive_pool(x)

        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        x = F.relu(self.fc4(x))
        x = self.dropout(x)
        x = self.fc5(x)
        
        return x


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EEGClassifier(input_channels=66).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


accuracy_results, loss_results = train_model(model=model,train_dataloader=train_loader,test_dataloader=test_loader,optimizer=optimizer,criterion=criterion,num_epochs=100)

Batch 0, Loss: 0.6904
Epoch: 1 | Train loss: 0.68996 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, Loss: 0.6868
Epoch: 2 | Train loss: 0.69204 | Test loss: 0.04333 | Test accuracy: 0.50000
Batch 0, Loss: 0.6962
Epoch: 3 | Train loss: 0.69744 | Test loss: 0.04333 | Test accuracy: 0.50000
Batch 0, Loss: 0.6931
Epoch: 4 | Train loss: 0.69499 | Test loss: 0.04333 | Test accuracy: 0.50000
Batch 0, Loss: 0.6928
Epoch: 5 | Train loss: 0.69198 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, Loss: 0.6941
Epoch: 6 | Train loss: 0.69762 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, Loss: 0.6920
Epoch: 7 | Train loss: 0.69494 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, Loss: 0.6912
Epoch: 8 | Train loss: 0.69244 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, Loss: 0.6875
Epoch: 9 | Train loss: 0.69129 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, Loss: 0.6955
Epoch: 10 | Train loss: 0.69366 | Test loss: 0.04332 | Test accuracy: 0.50000
Batch 0, 

In [15]:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import classification_report
def preprocess_data(X_train, X_test):
    #Flatten the data
    X_train_flat = X_train.reshape(X_train.shape[0], -1)
    X_test_flat = X_test.reshape(X_test.shape[0], -1)
    
    #Scale the data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_flat)
    X_test_scaled = scaler.transform(X_test_flat)
    
    return X_train_scaled, X_test_scaled

def train_logistic_regression(X_train_scaled, y_train, X_test_scaled, y_test):
    lr_model = LogisticRegression(max_iter=1000, class_weight='balanced')
    lr_model.fit(X_train_scaled, y_train)
    
    y_pred = lr_model.predict(X_test_scaled)
    print("\nLogistic Regression Results:")
    print(classification_report(y_test, y_pred))
    return lr_model

def train_logistic_regression_with_cv(X, y, cv_folds=5):
    lr_model = LogisticRegression(max_iter=1000, class_weight='balanced')
    
    cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)
    
    scores = cross_validate(
        lr_model, X, y, cv=cv, scoring=['accuracy', 'precision','f1','recall'],
        return_train_score=True, return_estimator=True
    )
    
    print("Cross-Validation Results:")
    for key in scores.keys():
        print(key,":",scores[key])
    #Return the model with the best performance on validation data
    best_model_index = np.argmax(scores['test_accuracy'])
    best_model = scores['estimator'][best_model_index]
    
    return best_model

In [16]:
def run_comparison(X_train, X_test, y_train, y_test):
    #Logistic Regression
    print("Training Logistic Regression...")
    X_train_scaled, X_test_scaled = preprocess_data(X_train, X_test)
    lr_model = train_logistic_regression(X_train_scaled, y_train, X_test_scaled, y_test)
    
    #Logistic Regression with cv
    print("Training Logistic Regression with cv...")
    X_combined = np.vstack((X_train_scaled, X_test_scaled))
    y_combined = np.hstack((y_train, y_test))
    best_model= train_logistic_regression_with_cv(X_combined, y_combined, cv_folds=15)
    y_pred = best_model.predict(X_test_scaled)
    print("\nTest Set Results:")
    print(classification_report(y_test, y_pred))
        
    
    return lr_model, best_model

In [17]:
lr_model, best_model = run_comparison(X_train, X_test, y_train, y_test)

Training Logistic Regression...

Logistic Regression Results:
              precision    recall  f1-score   support

           0       0.57      1.00      0.73         8
           1       1.00      0.25      0.40         8

    accuracy                           0.62        16
   macro avg       0.79      0.62      0.56        16
weighted avg       0.79      0.62      0.56        16

Training Logistic Regression with cv...


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Cross-Validation Results:
fit_time : [10.95226598 21.97943115 15.32769108  4.82355475  4.83686161  4.51948833
  4.5274229   4.60337353  8.82789016  4.65933967  4.63182545  4.95996404
  4.87358165  4.52346754  4.55280423]
score_time : [0.05252767 0.05406165 0.02045727 0.02092981 0.01794028 0.01912308
 0.01808333 0.01847029 0.01906061 0.0189352  0.01853585 0.01794004
 0.01893497 0.01744914 0.01893687]
estimator : [LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', max_iter=1000), LogisticRegression(class_weight='balanced', 