# Loose EEGNet
This notebook is a neural network that is based as much off of the EEGNet paper as I could understand.



---



# Import/Install all necessary packages and libraries

In [None]:
!pip install mne



In [None]:
import numpy as np

from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable, gradcheck
from torch.utils.data import TensorDataset, DataLoader

import pandas as pd

from matplotlib import pyplot

import mne
from mne.io import concatenate_raws, read_raw_fif
import mne.viz

import math

from os import walk

# Check for GPU availability and set device

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Load and format the data

In [None]:
data_file = '/content/drive/My Drive/data/P_01.fif'

epochs = mne.read_epochs(data_file, verbose='error')
print(epochs)

<EpochsFIF  |   484 events (all good), 0 - 1.49609 sec, baseline off, ~90.9 MB, data loaded,
 'FN': 126
 'FP': 133
 'FU': 123
 'NN': 37
 'NP': 36
 'NU': 29>


In [None]:
epochs_UN = epochs['FU', 'FN'] # Unpleasant vs. Neutral
epochs_UP = epochs['FU', 'FP'] # Unpleasant vs. Pleasant
epochs_NP = epochs['FN', 'FP'] # Neutral vs. Pleasant

# Dataset with unpleasant and neutral events
print(epochs_UN)
data_UN = epochs_UN.get_data() #we will classify between unpleasant and neutral
labels_UN = epochs_UN.events[:,-1]
print(len(labels_UN))

<EpochsFIF  |   249 events (all good), 0 - 1.49609 sec, baseline off, ~46.9 MB, data loaded,
 'FN': 126
 'FU': 123>
249


In [None]:
torch.manual_seed(3)
torch.cuda.manual_seed(3)
train_data_UN, test_data_UN, labels_train_UN, labels_test_UN = train_test_split(data_UN, labels_UN, test_size=0.3, random_state=42)

In [None]:
print(labels_train_UN.shape, labels_test_UN.shape, train_data_UN.shape[-1])
chunk_train = labels_train_UN.shape[0]
chunk_test = labels_test_UN.shape[0]
channels = train_data_UN.shape[1]
timepoints = train_data_UN.shape[2]


(174,) (75,) 384


In [None]:
BATCH_SIZE = 8
BATCH_SIZE2 = chunk_test

eeg_data_scaler = StandardScaler()

X_train = eeg_data_scaler.fit_transform(train_data_UN.reshape(-1, train_data_UN.shape[-1])).reshape(train_data_UN.shape)
X_test = eeg_data_scaler.fit_transform(test_data_UN.reshape(-1, test_data_UN.shape[-1])).reshape(test_data_UN.shape)

labels_train_UN = np.array([1 if x > 0 else 0 for x in labels_train_UN])
labels_test_UN = np.array([1 if x > 0 else 0 for x in labels_test_UN])

labels_train_UN = labels_train_UN.reshape((chunk_train, 1))
labels_train_UN = labels_train_UN.astype(np.float32)
X_actual = torch.from_numpy(labels_train_UN)

labels_test_UN = labels_test_UN.reshape((chunk_test, 1))
labels_test_UN = labels_test_UN.astype(np.float32)
X_test_actual = torch.from_numpy(labels_test_UN)

X_train = torch.from_numpy(X_train)
X_train = X_train.unsqueeze(1)
X_test = torch.from_numpy(X_test)
X_test = X_test.unsqueeze(1)

train_batches = math.ceil(chunk_train / BATCH_SIZE)
test_batches = math.ceil(chunk_test / BATCH_SIZE2)
print(X_train.shape, X_actual.shape, X_test.shape, train_batches, test_batches)

torch.Size([174, 1, 64, 384]) torch.Size([174, 1]) torch.Size([75, 1, 64, 384]) 22 1


In [None]:
train_set = TensorDataset(X_train, X_actual)
test_set = TensorDataset(X_test, X_test_actual)

train_set_loader = DataLoader(train_set, batch_size = BATCH_SIZE, shuffle=False)
test_set_loader = DataLoader(test_set, batch_size = BATCH_SIZE2, shuffle=False)

# Build the model and train
A short breakdown of the paper:
> EEGNet is a CNN for classification and interpretation of EEG-based BCI's

> Benefits of EEGNet:
> 1. Can be applied to different BCI paradagims (MI, ERP, SSVEP)
> 2. Can be trained with very limited data
> 3. Can produce neurophysiologically interpretable features

> The model architecture:
> 1. Fit *conv1_neurons* 2D convolutional filters of size (1, sampling rate//2)
> 2. Use a depthwise convolution of size (# of channels, 1)
> 3. Add a separable convolution of size (1, 16)
> 4. Flatten the data and feed it through a classification layer







In [None]:
# hyperparameters
freq, avg1stride, avg2stride = 256, (1, 4), (1, 8)
convstride = 1 # stride for each conv2D
conv1_neurons = 4
conv2_neurons = 8
conv3_neurons = 12
flat1_out = 12
kern1size = freq // 2

In [None]:
conv1outx, conv1outy = (channels, (timepoints - kern1size)/convstride + 1)

conv2outx, conv2outy = ((conv1outx - channels)/convstride + 1, conv1outy)
conv2outx, conv2outy = conv2outx // avg1stride[0], conv2outy // avg1stride[1]

conv3outx, conv3outy = (conv2outx, (conv2outy - 16)/convstride + 1)
conv3outx, conv3outy = (conv3outx // avg2stride[0], conv3outy // avg2stride[1])
flat1_in = int(conv3outx * conv3outy * conv3_neurons)

In [None]:
CNNPoor = nn.Sequential(
    nn.Conv2d(1, conv1_neurons, (1, kern1size), bias=False),
    nn.ELU(),
    nn.BatchNorm2d(conv1_neurons),
    
    nn.Conv2d(conv1_neurons, conv2_neurons, (channels, 1), bias=False),
    nn.ELU(),
    nn.BatchNorm2d(conv2_neurons),
    nn.AvgPool2d(avg1stride),
    nn.Dropout(),
    
    nn.Conv2d(conv2_neurons, conv3_neurons, (1, 16), bias=False),
    nn.ELU(),
    nn.BatchNorm2d(conv3_neurons),
    nn.AvgPool2d(avg2stride),
    nn.Dropout(),
    
    nn.Flatten(),

    nn.Linear(flat1_in, 1),
    nn.Sigmoid(),
)

CNNPoor.to(device)

Sequential(
  (0): Conv2d(1, 4, kernel_size=(1, 128), stride=(1, 1), bias=False)
  (1): ELU(alpha=1.0)
  (2): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (3): Conv2d(4, 8, kernel_size=(64, 1), stride=(1, 1), bias=False)
  (4): ELU(alpha=1.0)
  (5): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (6): AvgPool2d(kernel_size=(1, 4), stride=(1, 4), padding=0)
  (7): Dropout(p=0.5, inplace=False)
  (8): Conv2d(8, 12, kernel_size=(1, 16), stride=(1, 1), bias=False)
  (9): ELU(alpha=1.0)
  (10): BatchNorm2d(12, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (11): AvgPool2d(kernel_size=(1, 8), stride=(1, 8), padding=0)
  (12): Dropout(p=0.5, inplace=False)
  (13): Flatten()
  (14): Linear(in_features=72, out_features=1, bias=True)
  (15): Sigmoid()
)

In [None]:
loss_function = nn.BCELoss()
optimizer = optim.Adam(CNNPoor.parameters(), lr = 0.001)

In [None]:
def test(model, device, test_loader):
    
    model.eval()
    correct = 0
    tot_loss = 0
    acc_score, prec_score, rec_score = 0, 0, 0
    with torch.no_grad():
        for (data, labels) in test_loader:
            data, labels = data.to(device), labels.to(device)
            
            classification = model(data.float())
            loss = loss_function(classification, labels)

            pred = torch.round(classification)
            correct += pred.eq(labels.view_as(pred)).sum().item()
            tot_loss += loss.item()

            acc_score += accuracy_score(labels, pred)
            prec_score += precision_score(labels, pred)
            rec_score += recall_score(labels, pred)

        print("\nTest set: Average loss: {:.6f}, Accuracy: {:.6f}".format(tot_loss / test_batches, 
                                                                          correct / len(test_loader.dataset)))
        print("sklearn accuracy: {:.6f} precision: {:.6f} recall: {:.6f}\n".format(acc_score / test_batches,
                                                                                   prec_score / test_batches,
                                                                                   rec_score / test_batches))

In [None]:
def train(model, device, train_loader, optimizer, epoch):

    model.train()
    correct = 0
    batch = 0
    tot_loss = 0
    for (data, labels) in train_loader:
        data, labels = data.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        classification = model(data.float())
        loss = loss_function(classification, labels)

        loss.backward()
        optimizer.step()

        pred = torch.round(classification)
        correct += pred.eq(labels.view_as(pred)).sum().item()
        tot_loss += loss.item()

        batch += 1

        if batch == train_batches:
            print("Epoch: {}".format(epoch))
            print("\tAverage loss: {:.6f}".format(tot_loss / batch))
            print("\tAccuracy: {:.6f}".format(correct / len(train_loader.dataset)))

In [None]:
for epoch in range(13):
    train(CNNPoor, device, train_set_loader, optimizer, epoch)
    test(CNNPoor, device, test_set_loader)

Epoch: 0
	Average loss: 0.719555
	Accuracy: 0.494253

Test set: Average loss: 0.686913, Accuracy: 0.506667
sklearn accuracy: 0.506667 precision: 0.529412 recall: 0.461538

Epoch: 1
	Average loss: 0.653402
	Accuracy: 0.643678

Test set: Average loss: 0.647121, Accuracy: 0.626667
sklearn accuracy: 0.626667 precision: 0.627907 recall: 0.692308

Epoch: 2
	Average loss: 0.646683
	Accuracy: 0.597701

Test set: Average loss: 0.630558, Accuracy: 0.666667
sklearn accuracy: 0.666667 precision: 0.645833 recall: 0.794872

Epoch: 3
	Average loss: 0.573064
	Accuracy: 0.695402

Test set: Average loss: 0.607657, Accuracy: 0.613333
sklearn accuracy: 0.613333 precision: 0.631579 recall: 0.615385

Epoch: 4
	Average loss: 0.489218
	Accuracy: 0.787356

Test set: Average loss: 0.573788, Accuracy: 0.680000
sklearn accuracy: 0.680000 precision: 0.666667 recall: 0.769231

Epoch: 5
	Average loss: 0.448292
	Accuracy: 0.804598

Test set: Average loss: 0.556812, Accuracy: 0.706667
sklearn accuracy: 0.706667 precis