In [31]:
import torch
print(torch.cuda.is_available())
print(torch.version.cuda)
print(torch.cuda.get_device_name(0))

True
12.1
NVIDIA GeForce RTX 4060 Laptop GPU


In [32]:
import mne
import numpy as np

import os

import matplotlib.pyplot as plt

from sklearn.utils import resample

In [33]:
directory = './eeg-during-mental-arithmetic-tasks-1.0.0/'

rest_filepaths = []
task_filepaths = []

for filename in os.listdir(directory):
    filepath = os.path.join(directory, filename)
    if filename.endswith('.edf'):
        label = filename.split('_')[-1].split('.')[0]

        if label == '1':
            rest_filepaths.append(filepath)
        else:
            task_filepaths.append(filepath)

In [34]:
import mne
import numpy as np
from sklearn.model_selection import train_test_split

# Example function to read and process data
def process_data(filepath):
    data = mne.io.read_raw_edf(filepath, preload=True)
    data.set_eeg_reference()
    data.filter(l_freq=0.5, h_freq=45)
    
    min_t, max_t = 0, 61.99
    data.crop(tmin=min_t, tmax=max_t)
    
    tmin, tmax = 0, 1.0  # Epoch duration of 1 second
    epochs = mne.make_fixed_length_epochs(data, duration=tmax, overlap = 0.5, preload=True)
    
    return epochs

# Function to process all 'task' labeled files
def process_task_files(filepaths):
    epochs_data = []  # List to store epoch data
    labels = []  # List to store corresponding labels
    
    for filepath in filepaths:
        epochs = process_data(filepath)
        epochs_data.extend(epochs.get_data())
        labels.extend([1] * len(epochs))  # Assign label 1 for 'task' (assuming 'task' label)
    
    return np.array(epochs_data), np.array(labels)

def process_rest_files(filepaths):
    epochs_data = []  # List to store epoch data
    labels = []  # List to store corresponding labels
    
    for filepath in filepaths:
        epochs = process_data(filepath)
        epochs_data.extend(epochs.get_data())
        labels.extend([0] * len(epochs))  # Assign label 1 for 'task' (assuming 'task' label)
    
    return np.array(epochs_data), np.array(labels)


In [35]:
%%capture
rest_epochs_data, rest_labels = process_rest_files(rest_filepaths)

task_epochs_data, task_labels = process_task_files(task_filepaths)

In [36]:
rest_epochs_data.shape, task_epochs_data.shape

((4392, 21, 500), (4392, 21, 500))

In [37]:
rest_labels.shape , task_labels.shape

((4392,), (4392,))

In [38]:
epochs_data_combined = np.concatenate([rest_epochs_data, task_epochs_data], axis=0)

# Concatenate labels
labels_combined = np.concatenate([rest_labels, task_labels], axis=0)

# Shuffle (optional)
# Use the same random seed for synchronizing shuffle across data and labels
random_state = 42
np.random.seed(random_state)
shuffle_indices = np.random.permutation(len(labels_combined))
data = epochs_data_combined[shuffle_indices]
label = labels_combined[shuffle_indices]

In [39]:
print("Epochs shape:", data.shape)
print("Labels shape:", label.shape)

Epochs shape: (8784, 21, 500)
Labels shape: (8784,)


In [40]:
# data = np.load('data.npy')
# label = np.load('label.npy')

# # # Convert numpy arrays to PyTorch tensors
# # data = torch.tensor(data, dtype=torch.float32)
# # label = torch.tensor(label, dtype=torch.long)  # Assuming labels are integers (dtype=torch.long)

# # Print shapes to verify
# print("Epochs shape:", data.shape)
# print("Labels shape:", label.shape)

In [41]:
from torch.utils.data import  TensorDataset

# Choosing Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



# # Normalizing Labels to [0, 1, 2, 3]
# y = labels - np.min(labels)
y = label

# Normalizing Input features: z-score(mean=0, std=1)
X = (data - np.mean(data)) / np.std(data)

# Checking the existance of null & inf in the dataset
if np.any(np.isnan(X)) or np.any(np.isinf(X)):
    raise ValueError("Data contains NaNs or infinities after normalization.")
if np.any(np.isnan(y)) or np.any(np.isinf(y)):
    raise ValueError("Labels contain NaNs or infinities.")

# Making the X,y tensors for K-Fold Cross Validation
X_tensor = torch.Tensor(X).unsqueeze(1)
y_tensor = torch.LongTensor(y)

# Spliting  Data: 80% for Train and 20% for Test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Converting to Tensor
X_train = torch.Tensor(X_train).unsqueeze(1).to(device)
X_test = torch.Tensor(X_test).unsqueeze(1).to(device)
y_train = torch.LongTensor(y_train).to(device)
y_test = torch.LongTensor(y_test).to(device)

# Creating Tensor Dataset
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Printing the sizes
print("Size of X_train:", X_train.size())
print("Size of X_test:", X_test.size())
print("Size of y_train:", y_train.size())
print("Size of y_test:", y_test.size())


Size of X_train: torch.Size([7027, 1, 21, 500])
Size of X_test: torch.Size([1757, 1, 21, 500])
Size of y_train: torch.Size([7027])
Size of y_test: torch.Size([1757])


# New Model


In [42]:
import torch 
import torch.nn as nn

In [43]:
class PatchEmbeddings(nn.Module):
    """Split image into patches and then embed them.
    Parameters
    ----------
    img_size : int, Size of the image (it is a square).

    patch_size : int, Size of the patch (it is a square).

    in_chans : int, Number of input channels.

    embed_dim : int, The emmbedding dimension.

    Attributes
    ----------
    n_patches : int, Number of patches inside of our image.

    proj : nn.Conv2d, Convolutional layer that does both the splitting into patches
        and their embedding.
    """
    def __init__(self, img_size=(21,500), patch_size = (1,16),  in_chans = 1, embed_dim = 768):
        super().__init__()
        
        self.n_patches = (img_size[0]//patch_size[0]) * (img_size[1]//patch_size[1])
        self.proj = nn.Conv2d(in_chans,
                              embed_dim,
                              kernel_size=patch_size,
                              stride = (patch_size[0], patch_size[1]))
        
    def forward(self, x):
        x = self.proj(x)
        x= x.flatten(2)
        x=x.transpose(1,2)
        
        return x
    

In [44]:
class Attention(nn.Module):
    """Attention mechanism.

    Parameters
    ----------
    dim : int The input and out dimension of per token features.

    n_heads : int Number of attention heads.

    qkv_bias : bool If True then we include bias to the query, key and value projections.

    attn_p : float Dropout probability applied to the query, key and value tensors.

    proj_p : float Dropout probability applied to the output tensor.


    Attributes
    ----------
    scale : float Normalizing constant for the dot product.

    qkv : nn.Linear Linear projection for the query, key and value.

    proj : nn.Linear Linear mapping that takes in the concatenated output of all attention
        heads and maps it into a new space.

    attn_drop, proj_drop : nn.Dropout Dropout layers.
    """
    def __init__(self, dim, n_heads = 12, qkv_bias= True, attn_p=0., proj_p= 0.):
        super().__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.
        
        self.qkv = nn.Linear(dim, dim*3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_p)
    
    def forward(self,x):
        n_samples, n_tokens, dim = x.shape
        
        if dim != self.dim:
            raise ValueError(f'Input dim {dim} is not equal to the model dim {self.dim}')
        qkv = self.qkv(x)
        qkv = qkv.reshape(n_samples, n_tokens, 3, self.n_heads, self.head_dim)
        qkv = qkv.permute(2,0,3,1,4)
        
        q, k, v = qkv[0], qkv[1], qkv[2]
        k_t = k.transpose(-2,-1)
        dp = (q @ k_t)*self.scale
        attn = dp.softmax(dim=-1)
        attn = self.attn_drop(attn)
        
        weighted_avg = attn @ v 
        weighted_avg = weighted_avg.transpose(1,2)
        weighted_avg = weighted_avg.flatten(2)
        
        x=self.proj(weighted_avg)
        x= self.proj_drop(x)
        
        return x

In [45]:
class MLP(nn.Module):
    """Multilayer perceptron.

    Parameters
    ----------
    in_features : int Number of input features.

    hidden_features : int Number of nodes in the hidden layer.

    out_features : int Number of output features.

    p : float Dropout probability.

    Attributes
    ----------
    fc : nn.Linear The First linear layer.

    act : nn.GELU GELU activation function.

    fc2 : nn.Linear The second linear layer.

    drop : nn.Dropout Dropout layer.
    """
    def __init__(self, in_features, hidden_features, out_features, p=0.):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(hidden_features,out_features)
        self.drop = nn.Dropout(p)
        
    def forward(self,x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        
        return x

In [46]:
class Block(nn.Module):
    """Transformer block.

    Parameters
    ----------
    dim : int Embeddinig dimension.

    n_heads : int Number of attention heads.

    mlp_ratio : float Determines the hidden dimension size of the `MLP` module with respect
        to `dim`.

    qkv_bias : bool If True then we include bias to the query, key and value projections.

    p, attn_p : float Dropout probability.

    Attributes
    ----------
    norm1, norm2 : LayerNorm Layer normalization.

    attn : Attention Attention module.

    mlp : MLP MLP module.
    """
    def __init__(self, dim, n_heads, mlp_ratio = 4.0, qkv_bias = True, p=0., attn_p=0.):
        super().__init__()
        self.norm1 = nn.LayerNorm(dim, eps=1e-6) 
        self.attn = Attention(dim,
                              n_heads = n_heads,
                              qkv_bias=qkv_bias,
                              attn_p=attn_p,
                              proj_p=p
                              )
        self.norm2 = nn.LayerNorm(dim, eps=1e-6) 
        hidden_features = int(dim*mlp_ratio)
        self.mlp = MLP(
            in_features=dim,
            hidden_features = hidden_features,
            out_features=dim
        )
    
    def forward(self,x):
        x = x + self.attn(self.norm1(x))
        x = x+ self.mlp(self.norm2(x))
        
        return x
        

In [47]:
class VisionTransformer(nn.Module):
    """Simplified implementation of the Vision transformer.

    Parameters
    ----------
    img_size : int Both height and the width of the image (it is a square).

    patch_size : int oth height and the width of the patch (it is a square).

    in_chans : int Number of input channels.

    n_classes : int Number of classes.

    embed_dim : int Dimensionality of the token/patch embeddings.

    depth : int Number of blocks.

    n_heads : int Number of attention heads.

    mlp_ratio : float Determines the hidden dimension of the `MLP` module.

    qkv_bias : bool If True then we include bias to the query, key and value projections.

    p, attn_p : float Dropout probability.

    Attributes
    ----------
    patch_embed : PatchEmbed Instance of `PatchEmbed` layer.

    cls_token : nn.Parameter Learnable parameter that will represent the first token in the sequence.
        It has `embed_dim` elements.

    pos_emb : nn.Parameter Positional embedding of the cls token + all the patches.
        It has `(n_patches + 1) * embed_dim` elements.

    pos_drop : nn.Dropout Dropout layer.

    blocks : nn.ModuleList List of `Block` modules.

    norm : nn.LayerNorm Layer normalization.
    """
    def __init__(
        self,
        img_size=(21,500),
        patch_size = (1,16),
        in_chans = 1,
        n_classes = 2,
        embed_dim = 768,
        depth = 12,
        n_heads = 12,
        mlp_ratio = 4,
        qkv_bias = True,
        p = 0.0,
        attn_p = 0.0,
    ):
        super().__init__()
        
        self.patch_embed = PatchEmbeddings(
            img_size=img_size,
            patch_size = patch_size, 
            in_chans = in_chans, 
            embed_dim = embed_dim
        )
        self.cls_token = nn.Parameter(torch.zeros(1,1,embed_dim))
        self.pos_embed = nn.Parameter(torch.zeros(1,1+self.patch_embed.n_patches,embed_dim))      
        self.pos_drop = nn.Dropout(p=p)
        
        self.blocks = nn.ModuleList(
            [
                Block(
                    dim = embed_dim,
                    n_heads = n_heads,
                    mlp_ratio = mlp_ratio,
                    qkv_bias = qkv_bias,
                    p = p,
                    attn_p = attn_p,
                )
                for _ in range(depth)
            ]
        )
        
        self.norm = nn.LayerNorm(embed_dim, eps = 1e-6)
        self.head = nn.Linear(embed_dim, n_classes)
        
        
    def forward(self,x):
        n_samples = x.shape[0]
        x = self.patch_embed(x)
        
        cls_token = self.cls_token.expand(n_samples, -1,-1)
        x = torch.cat((cls_token,x), dim=1)
        x= x+ self.pos_embed
        x = self.pos_drop(x)
        
        for block in self.blocks:
            x = block(x)
            
        x = self.norm(x)
        
        cls_token_final = x[:,0] # only cls token
        x = self.head(cls_token_final)
        
        return x
        
    

In [48]:
import torch.optim as optim

from sklearn.utils import shuffle

In [50]:
custom_config = {
    "patch_size": (1, 10),
    "in_chans": 1,
    "img_size": (21,500),
    "n_classes": 2,
    "embed_dim": 768,
    "depth": 12,
    "n_heads": 12,
    "mlp_ratio": 4,
    "p":0.5,
    "attn_p":0.5
}

ViT_model = VisionTransformer(**custom_config).to(device)
learning_rate = 0.001
optimizer = optim.Adam(ViT_model.parameters(), lr=learning_rate)

# Loss Function
criterion = nn.CrossEntropyLoss()

num_epochs = 500
batch_size = 16
for epoch in range(num_epochs):
    ViT_model.train()
    X_train, y_train = shuffle(X_train, y_train)
    running_loss = 0.0
    correct = 0
    total = 0
    # print(epoch)
    for i in range(0, len(X_train), batch_size):
        inputs = X_train[i:i+batch_size].to(device)
        labels = y_train[i:i+batch_size].to(device)

        # print(epoch , i)
        
        optimizer.zero_grad()
        outputs = ViT_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(X_train)
    epoch_accuracy = correct / total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {(epoch_accuracy*100):.2f}%")
average_loss = running_loss / len(X_train)
print("Average Loss:", average_loss)

# Saving model
torch.save(ViT_model, 'ViT_model.pth')

OutOfMemoryError: CUDA out of memory. Tried to allocate 810.00 MiB. GPU 0 has a total capacity of 8.00 GiB of which 0 bytes is free. Of the allocated memory 13.94 GiB is allocated by PyTorch, and 335.26 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
custom_config = {
    "patch_size": (1, 100),
    "in_chans": 1,
    "img_size": (21,500),
    "n_classes": 2,
    "embed_dim": 768,
    "depth": 12,
    "n_heads": 12,
    "mlp_ratio": 4,
    "p":0.2,
    "attn_p":0.2
}

ViT_model = VisionTransformer(**custom_config).to(device)
ViT_model = torch.load('ViT_model.pth')

  ViT_model = torch.load('ViT_model.pth')


In [None]:

ViT_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for i in range(len(X_test)):
        inputs = X_test[i:i+1].to(device)
        labels = y_test[i:i+1].to(device)
        outputs = ViT_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = (correct / total)*100
print(f"Test Accuracy: {accuracy:.2f}%")

Test Accuracy: 50.06%


: 

In [None]:
import pandas as pd
ViT_model.eval()
y_pred = []
y_true = []
classes = ['rest', 'task']

with torch.no_grad():
    for inputs, labels in zip(X_test, y_test):
        outputs = ViT_model(inputs.unsqueeze(0))  # Forward pass
        _, predicted = torch.max(outputs.data, 1)
        y_pred.append(predicted.item())
        y_true.append(labels.item())

cf_matrix = ViT_model(y_true, y_pred)
cf_matrix = cf_matrix.astype('float') / cf_matrix.sum(axis=1)[:, np.newaxis]

# Create DataFrame for visualization
df_cm = pd.DataFrame(cf_matrix, index=classes, columns=classes)

# Plot confusion matrix
plt.figure(figsize=(10, 7))
sn.heatmap(df_cm, annot=True, cmap='Blues', fmt='.2f')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.savefig('confusion_matrix_eegnet.png')
plt.show()
