In [1]:
import os
import warnings
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
#os.environ['CUDA_LAUNCH_BLOCKING'] = str(1)
#os.environ["TORCH_USE_CUDA_DSA"]= str(0)
warnings.filterwarnings('ignore') 


In [2]:
import copy
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset
import torch.optim as optim
from torch.autograd import Variable
from tqdm import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import math
from collections import OrderedDict
import random
from torchsummary import summary
from torchvision import transforms
from torch.utils.data import Dataset
import sys
import torch
import numpy as np
from tqdm import trange
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import mne
from sklearn.preprocessing import StandardScaler
import logging
from torch.nn.utils.rnn import pack_sequence, pad_packed_sequence
import statistics
import torch.optim.lr_scheduler as lr_scheduler
from scipy.special import softmax
from sklearn.model_selection import StratifiedKFold

In [3]:
logging.getLogger('mne').setLevel(logging.WARNING)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import import_ipynb
#from Model import net
from Dataloader2 import EEGDataset

importing Jupyter notebook from Dataloader2.ipynb


In [4]:
def Accuracy(y_pred, y, train_count):
    max_values, _ = torch.max(y_pred, dim=1, keepdim=True)
    mask = y_pred == max_values
    y_pred = mask.int()
    correct_num = torch.sum(torch.all(torch.eq(y, y_pred), dim=1)).item()
    accuracy = correct_num / train_count
    return accuracy * 100

In [5]:
def test(model, test_path, test_class, verbose=True):
    

    x_test = mne.read_epochs(test_path, preload=False).get_data(picks='eeg');
    normals = []
    scaler = StandardScaler()
    for idx in range(len(x_test)):
        normals.append(scaler.fit_transform(x_test[idx]))
    normals = torch.tensor(normals).cuda().float()
    result = torch.argmax(model(normals), axis=1)
    unique_elements, counts = torch.unique(result, return_counts=True)

    votes = np.zeros([4])
    for i in range(len(unique_elements)):
        votes[unique_elements[i]] = counts[i]


    if(verbose):
        print(f"Test Accuracy: {(votes[test_class] / result.shape[0]) * 100}")
    return votes


In [6]:
class net(nn.Module):
    def __init__(self, T, C, input_size, hidden_size, num_layers, spatial_num, dropout, pool):
        super(net, self).__init__()
        
        self.T = T
        self.C = C
        self.spatial_num = spatial_num
        self.dropout = dropout
        self.pool = pool

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.cell_count = self.T // self.input_size

        self.fcn_in = (spatial_num * self.hidden_size)

        self._lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)

        self.lstm = nn.ModuleList([self._lstm for i in range(self.C)])

        self.cnn_block = nn.Sequential(nn.Conv2d(1, self.spatial_num, (self.C, 1)),
                                       nn.BatchNorm2d(self.spatial_num),
                                       nn.ELU(),



                                       nn.Dropout(self.dropout))

        
        self.fcn = nn.Sequential(nn.Linear(self.fcn_in, 128), 
                                 nn.ReLU(),
                                 nn.Linear(128, 16),
                                 nn.Dropout(self.dropout),
                                 nn.ReLU(),
                                 nn.Linear(16, 4))

        #self.fcn = nn.Linear(self.fcn_in, 4)
        self.results = nn.Softmax(dim=1)
    def forward(self, x):
        x = x.reshape(-1, 1, 19, 3000)
        self.N = x.shape[0]
        x = x.reshape(self.N, self.C, self.cell_count, self.input_size)
        _x = None

        for index, cell in enumerate(self.lstm):
            cell_out, _ = cell(x[:, index, :, :], None)
            last_layer_out = cell_out[:, -1, :]
            
            last_layer_out = last_layer_out.unsqueeze(0)
            if _x is None:
                _x = last_layer_out
            else:
                _x = torch.cat((_x, last_layer_out), dim=0)
            

        x = _x.permute(1, 0, 2).unsqueeze(1)

        x = self.cnn_block(x)


        x = x.reshape(self.N, -1)

        x = self.fcn(x)
        x = self.results(x)

        return x

In [7]:
class EEGClassifier(nn.Module):
    def __init__(self, num_classes):
        super(EEGClassifier, self).__init__()

        # CNN layers
        self.conv1 = nn.Conv1d(19, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool1d(kernel_size=2, stride=2)

        # LSTM layer
        self.lstm = nn.LSTM(64, 128, batch_first=True)

        # Fully connected layer
        self.fc = nn.Linear(128, num_classes)
        self.results = nn.Softmax(dim=1)
        
        

    def forward(self, x):
        x = x.reshape(-1, 19, 3000)
        # x: (batch_size, 19, 3000)

        # CNN forward pass
        #x = x.permute(0, 2, 1)  # (batch_size, 3000, 19)
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        # Reshape for LSTM input
        x = x.permute(0, 2, 1)  # (batch_size, sequence_length, 64)

        # LSTM forward pass
        output, _ = self.lstm(x)

        # Take the output from the last time step of the LSTM
        output = output[:, -1, :]

        # Fully connected layer
        output = self.fc(output)
        output = self.results(output)

        return output

In [8]:
MNE_Data = EEGDataset(root_dir=r"C:\Users\admin\Desktop\MNE Data")
labels = MNE_Data.labels
#MNE_Data = EEGDataset(root_dir=r"D:\TEST MNE")
test_path = r"C:\Users\admin\Desktop\TEST\amirifateme.fif"
test_class = 0

In [9]:
train_dataloader = DataLoader(MNE_Data, batch_size=1, shuffle=True)

In [10]:
#model = net(T = 3000, C = 19, input_size = 3000, hidden_size = 100, num_layers=2, spatial_num= 300, dropout=0.5, pool=1).to(device)
model = EEGClassifier(num_classes=4).cuda()
num_params = sum(p.numel() for p in model.parameters())
print("Number of parameters:", num_params)

Number of parameters: 107908


In [11]:
criterion = nn.CrossEntropyLoss(weight = torch.Tensor([5.3125, 2.8333, 3.5417, 5.6667]).cuda())

optimizer = optim.Adam(model.parameters(), lr=0.001)
#optimizer = optim.Adagrad(model.parameters(), lr=0.001)


scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.1)
epochs = 30
kf = StratifiedKFold(n_splits=5)

In [12]:
for train_indices, val_indices in kf.split(MNE_Data, labels):
    train_dataset = torch.utils.data.Subset(MNE_Data, train_indices)
    val_dataset = torch.utils.data.Subset(MNE_Data, val_indices)

    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=1, shuffle=False)

    test_log = []
    log = []

    for epoch in trange(epochs):

        model.train()   
        for index, data in enumerate(train_dataloader):
                
                x, y = data
                y = y.to(torch.float64)
                x = x.reshape(-1, 1, 19, 3000).float()
                #x = x[torch.randperm(x.shape[0])]
                y = F.one_hot(torch.tensor(torch.tensor([y.item()]).to(torch.int64)), num_classes=4).expand(x.shape[0], -1).float()
                
                train_count = x.shape[0]
                x, y = x.to(device), y.to(device)
                y_pred = model(x)
                loss = criterion(y_pred, y)
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                #scheduler.step()
                torch.cuda.empty_cache()


        model.eval()
        val_loss = 0
        val_log = []
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(val_dataloader):
                        data = data.reshape(-1, 1, 19, 3000).float().cuda()
                        target = F.one_hot(torch.tensor(torch.tensor([target.item()]).to(torch.int64)), num_classes=4).expand(data.shape[0], -1).float().cuda()
                        

                        output = model(data)

                        

                        loss = criterion(output, target)
                        
                        val_loss += loss.item()

                        val_log.append(Accuracy(output, target, target.shape[0]))

                        
        print('Epoch: {}, Validation Loss: {:.4f}, Validation Accuracy: {:.2f}%'.format(
                    epoch, val_loss / len(val_dataloader), statistics.mean(val_log)))

        


    


  3%|▎         | 1/30 [01:22<39:46, 82.28s/it]

Epoch: 0, Validation Loss: 5.5169, Validation Accuracy: 35.12%


  7%|▋         | 2/30 [02:42<37:56, 81.31s/it]

Epoch: 1, Validation Loss: 5.5155, Validation Accuracy: 17.76%


 10%|█         | 3/30 [04:02<36:14, 80.53s/it]

Epoch: 2, Validation Loss: 5.5160, Validation Accuracy: 18.72%


 10%|█         | 3/30 [04:12<37:54, 84.26s/it]


KeyboardInterrupt: 

In [None]:
model(x.cuda())


tensor([[-0.1114, -0.0381, -0.0061, -0.0525],
        [-0.1099, -0.0433,  0.0025, -0.0599],
        [-0.1069, -0.0431,  0.0041, -0.0574],
        ...,
        [-0.1115, -0.0247, -0.0065, -0.0502],
        [-0.1090, -0.0361, -0.0055, -0.0549],
        [-0.1021, -0.0300, -0.0165, -0.0510]], device='cuda:0',
       grad_fn=<AddmmBackward0>)