# CNN Classification

We'll use EEGNet to perform  binary classification on 32 channel ECog data.

https://arxiv.org/pdf/1611.08024


# 1. Load dataset and define training & test sets

In [32]:
import os
import torch
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
from torch.autograd import Variable
from torch.utils.data import DataLoader, TensorDataset, SubsetRandomSampler
import numpy as np
import optuna


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

data_dir = '../../../datasets/processed/shuffleboard/'

# inputs
signal_data_path = data_dir + 'normalized_dataset_500hz_500ms_consecutive_buckets.npz'
signal_data = np.load(signal_data_path, allow_pickle=True)

signal_array = signal_data['data']
signals_tensor = torch.tensor(signal_array, dtype=torch.float32)
signals_tensor = signals_tensor.unsqueeze(1)  # Add channel dimension


# Print dataset info 
print("Total Dataset Size:", len(dataset))
print("Shape of the signals:", signal_array.shape)
print("Shape of the labels:", labels_tensor.shape)

Total Dataset Size: 7591
Shape of the signals: (7591, 32, 250)
Shape of the labels: torch.Size([7591])


# Define Model

Torcheeg's implementation of EEGnet:

https://torcheeg.readthedocs.io/en/v1.1.0/generated/torcheeg.models.EEGNet.html#torcheeg.models.EEGNet

In [33]:
import torch
import torch.nn as nn

class Conv2dWithConstraint(nn.Conv2d):
    def __init__(self, *args, max_norm: int = 1, **kwargs):
        self.max_norm = max_norm
        super(Conv2dWithConstraint, self).__init__(*args, **kwargs)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        self.weight.data = torch.renorm(self.weight.data, p=2, dim=0, maxnorm=self.max_norm)
        return super(Conv2dWithConstraint, self).forward(x)


class EEGNet(nn.Module):
    r'''
    A compact convolutional neural network (EEGNet). For more details, please refer to the following information.

    - Paper: Lawhern V J, Solon A J, Waytowich N R, et al. EEGNet: a compact convolutional neural network for EEG-based brain-computer interfaces[J]. Journal of neural engineering, 2018, 15(5): 056013.
    - URL: https://arxiv.org/abs/1611.08024
    - Related Project: https://github.com/braindecode/braindecode/tree/master/braindecode
    '''
    def __init__(self,
                 chunk_size: int = 151,
                 num_electrodes: int = 60,
                 F1: int = 8,
                 F2: int = 16,
                 D: int = 2,
                 num_classes: int = 2,
                 kernel_1: int = 64,
                 kernel_2: int = 16,
                 dropout: float = 0.25):
        super(EEGNet, self).__init__()
        self.F1 = F1
        self.F2 = F2
        self.D = D
        self.chunk_size = chunk_size
        self.num_classes = num_classes
        self.num_electrodes = num_electrodes
        self.kernel_1 = kernel_1
        self.kernel_2 = kernel_2
        self.dropout = dropout

        self.block1 = nn.Sequential(
            nn.Conv2d(1, self.F1, (1, self.kernel_1), stride=1, padding=(0, self.kernel_1 // 2), bias=False),
            nn.BatchNorm2d(self.F1, momentum=0.01, affine=True, eps=1e-3),
            Conv2dWithConstraint(self.F1,
                                 self.F1 * self.D, (self.num_electrodes, 1),
                                 max_norm=1,
                                 stride=1,
                                 padding=(0, 0),
                                 groups=self.F1,
                                 bias=False), nn.BatchNorm2d(self.F1 * self.D, momentum=0.01, affine=True, eps=1e-3),
            nn.ELU(), nn.AvgPool2d((1, 4), stride=4), nn.Dropout(p=dropout))

        self.block2 = nn.Sequential(
            nn.Conv2d(self.F1 * self.D,
                      self.F1 * self.D, (1, self.kernel_2),
                      stride=1,
                      padding=(0, self.kernel_2 // 2),
                      bias=False,
                      groups=self.F1 * self.D),
            nn.Conv2d(self.F1 * self.D, self.F2, 1, padding=(0, 0), groups=1, bias=False, stride=1),
            nn.BatchNorm2d(self.F2, momentum=0.01, affine=True, eps=1e-3), nn.ELU(), nn.AvgPool2d((1, 8), stride=8),
            nn.Dropout(p=dropout))

        self.lin = nn.Linear(self.feature_dim(), num_classes, bias=False)

    def feature_dim(self):
        with torch.no_grad():
            mock_eeg = torch.zeros(1, 1, self.num_electrodes, self.chunk_size)

            mock_eeg = self.block1(mock_eeg)
            mock_eeg = self.block2(mock_eeg)

        return self.F2 * mock_eeg.shape[3]

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        r'''
        Args:
            x (torch.Tensor): EEG signal representation, the ideal input shape is :obj:`[n, 60, 151]`. Here, :obj:`n` corresponds to the batch size, :obj:`60` corresponds to :obj:`num_electrodes`, and :obj:`151` corresponds to :obj:`chunk_size`.

        Returns:
            torch.Tensor[number of sample, number of classes]: the predicted probability that the samples belong to the classes.
        '''
        x = self.block1(x)
        x = self.block2(x)
        x = x.flatten(start_dim=1)
        x = self.lin(x)

        return x

# Train Models

In [38]:
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, classification_report
import optuna
import os
import warnings

# Don't want to see all of those warnings ... 
optuna.logging.set_verbosity(optuna.logging.CRITICAL)


# features where the output from from extraction was 32 values (one per channel) before we took the average value over channels
features = [
    "envelopes",
    "rms",
    "variance",
    "std_dev",
    "spectral_edge_density",
    "derivatives",
    "centroids",
    "phases",
    "beta_band_power",
    "average_signal_shapes",
    "analytic_signals",
    "fft_results",
    "magnitudes",
    "average_distance",
    "average_peak_height",
    "peak_counts",
    "spectral_entropy",
    "evolution_rate"
]

for feature_name in features:

    # output labels 
    labels_tensor = torch.load(os.path.join(data_dir, '{}_labels_500ms_500hz_tensor.pt'.format(feature_name)))
    labels_tensor = labels_tensor.long()


    dataset = TensorDataset(signals_tensor, labels_tensor)

    print("--- Training {} Model ---".format(feature_name))
    print(f"Distribution of Labels: Percentage of 1's: {labels_tensor.float().mean()*100:.2f}%, Percentage of 0's: {100 - labels_tensor.float().mean()*100:.2f}%")


    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    total_size = len(dataset)
    train_size = int(0.7 * total_size)
    val_size = int(0.15 * total_size)
    test_size = total_size - train_size - val_size

    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

    def create_dataloaders(batch_size):
        return (
            DataLoader(train_dataset, batch_size=batch_size, shuffle=True),
            DataLoader(val_dataset, batch_size=batch_size),
            DataLoader(test_dataset, batch_size=batch_size)
        )

    def create_model(params):
        return EEGNet(
            chunk_size=250,
            num_electrodes=32,
            num_classes=2,
            dropout=params['dropout']
        ).to(device)

    def objective(trial):
        params = {
            'dropout': trial.suggest_discrete_uniform('dropout', 0.1, 0.7, 0.1),
            'num_epochs': trial.suggest_int('num_epochs', 20, 100, step=10)
        }

        model = create_model(params)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        train_loader, val_loader, _ = create_dataloaders(32)

        best_val_accuracy = 0
        for epoch in range(params['num_epochs']):
            model.train()
            for inputs, labels in train_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

            model.eval()
            correct = total = 0
            with torch.no_grad():
                for inputs, labels in val_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    _, predicted = outputs.max(1)
                    total += labels.size(0)
                    correct += predicted.eq(labels).sum().item()

            val_accuracy = 100. * correct / total
            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                torch.save(model.state_dict(), '{}_best_model_in_trial.pth'.format(feature_name))

            trial.report(val_accuracy, epoch)
            if trial.should_prune():
                raise optuna.exceptions.TrialPruned()

        return best_val_accuracy

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=50)

    best_params = study.best_params


    # Evaluate best feature model on test set 
    best_model = create_model(best_params)
    best_model.load_state_dict( torch.load('{}_best_model_in_trial.pth'.format(feature_name)))
    best_model.eval()

   
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for inputs, labels in DataLoader(test_dataset, batch_size=32):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = best_model(inputs)
            _, predicted = outputs.max(1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    cm = confusion_matrix(all_labels, all_predictions)

    
    report = classification_report(all_labels, all_predictions, target_names=['0', '1'], digits=2)
    print("\nClassification Report:")
    print(report)

    
    print("\nConfusion Matrix:")
    print("----------------")
    print("TN | FP")
    print("---+---")
    print(f"{cm[0][0]:2d} | {cm[0][1]:2d}")
    print("---+---")
    print(f"{cm[1][0]:2d} | {cm[1][1]:2d}")
    print("FN | TP")
    
    model_save_path = f'best_{feature_name}_model.pth'
    torch.save(best_model.state_dict(), model_save_path)
    print(f'\nBest model saved to {model_save_path}')
    print("----\n")

--- Training envelopes Model ---
Distribution of Labels: Percentage of 1's: 49.39%, Percentage of 0's: 50.61%

Classification Report:
              precision    recall  f1-score   support

           0       0.75      1.00      0.86       559
           1       0.99      0.68      0.81       581

    accuracy                           0.84      1140
   macro avg       0.87      0.84      0.83      1140
weighted avg       0.87      0.84      0.83      1140


Confusion Matrix:
----------------
TN | FP
---+---
557 |  2
---+---
186 | 395
FN | TP

Best model saved to best_envelopes_model.pth
----

--- Training rms Model ---
Distribution of Labels: Percentage of 1's: 49.39%, Percentage of 0's: 50.61%

Classification Report:
              precision    recall  f1-score   support

           0       0.93      0.96      0.94       557
           1       0.96      0.93      0.94       583

    accuracy                           0.94      1140
   macro avg       0.94      0.94      0.94      1140


# Summary of Results


| Feature Model | Accuracy |
|---------------|----------|
| rms | 0.94 |
| average_signal_shapes | 0.93 |
| analytic_signals | 0.86 |
| envelopes | 0.84 |
| magnitudes | 0.74 |
| derivatives | 0.73 |
| fft_results | 0.70 |
| spectral_edge_density | 0.57 |
| spectral_entropy | 0.56 |
| variance | 0.54 |
| std_dev | 0.54 |
| beta_band_power | 0.53 |
| peak_counts | 0.52 |
| phases | 0.52 |
| evolution_rate | 0.51 |
| average_distance | 0.49 |
| centroids | 0.49 |

### Label Distributions

For each feature model, the distribution of labels was balanced.

### Excellent

The EEGNet architecture performs very well for predicting increases/decreases associated with the rms and average_signal_shapes features, with 94% and 96% accuracy respectively after basic hyper-parameter tuning. 

### Good

We saw good performance with analytic_signals (86%) and envelopes (84%). 

With the analytic_signals model, there were a significant amount false positive degrading its performance, and for the envelopes model, the amount of false negatives degraded its performance.


### Moderate

magnitudes	0.74

derivatives	0.73

fft_results	0.70

### Poor

The following features performed close or at the performance of random guessing:

spectral_edge_density	0.57

spectral_entropy	0.56

variance	0.54

std_dev	0.54

beta_band_power	0.53

peak_counts	0.52

phases	0.52

evolution_rate	0.51

average_distance	0.49

centroids	0.49


### Next Steps 
1. One option is to continue hyper-parameter tuning because I've done very basic tuning.
2. Feature engineer the input signals. At the moment, each input signal is a 500ms block of concatenated 250ms blocks; maybe we can try different ways of creating the inputs other than concatenation, which may yield better performance. 
3. Explore other models

