In [1]:
%%capture
!pip install mne
!pip install pytorch-lightning

In [2]:
from glob import glob
import scipy.io
import torch.nn as nn
import torch
import numpy as np
import mne

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
#Building the block architecture using OOP paradigm
class Block(nn.Module):
    def __init__(self,inplace):
        super().__init__()
        self.conv1=nn.Conv1d(in_channels=inplace,out_channels=32,kernel_size=2,stride=2,padding=0)
        self.conv2=nn.Conv1d(in_channels=inplace,out_channels=32,kernel_size=4,stride=2,padding=1)
        self.conv3=nn.Conv1d(in_channels=inplace,out_channels=32,kernel_size=8,stride=2,padding=3)
        self.relu=nn.ReLU()

    def forward(self,x):
        x1=self.relu(self.conv1(x))
        x2=self.relu(self.conv2(x))
        x3=self.relu(self.conv3(x))
        x=torch.cat([x1,x2,x3],dim=1)
        return x

In [6]:
#Creating an archictecture from all the modules (ChronoNet)
class ChronoNet(nn.Module):
    def __init__(self, channel):
        super().__init__()
        self.block1=Block(channel)
        self.block2=Block(96)
        self.block3=Block(96)
        self.gru1=nn.GRU(input_size=96,hidden_size=32,batch_first=True)
        self.gru2=nn.GRU(input_size=32,hidden_size=32,batch_first=True)
        self.gru3=nn.GRU(input_size=64,hidden_size=32,batch_first=True)
        self.gru4=nn.GRU(input_size=96,hidden_size=32,batch_first=True)
        self.gru_linear=nn.Linear(64,1)
        self.flatten=nn.Flatten()
        self.fcl=nn.Linear(32,1)
        self.relu=nn.ReLU()

    def forward(self,x):
        x=self.block1(x)
        x=self.block2(x)
        x=self.block3(x) #Note that x changes sequentially, feeding the next block
        x=x.permute (0,2,1) #Rearranging the input shape acording to what GRU layer expects
        gru_out1,_=self.gru1(x)
        gru_out2,_=self.gru2(gru_out1)
        gru_out=torch.cat([gru_out1,gru_out2],dim=2)
        gru_out3,_=self.gru3(gru_out)
        gru_out=torch.cat([gru_out1,gru_out2,gru_out3],dim=2)

        #print('gru_out',gru_out.shape) to know the final shape for the linear layer
        linear_out=self.relu(self.gru_linear(gru_out.permute(0,2,1)))
        gru_out4,_=self.gru4(linear_out.permute(0,2,1))
        x=self.flatten(gru_out4)
        x=self.fcl(x)
        return x

In [7]:
#Random test data, to validate the model architecture
input=torch.randn(3,14,512)
input.shape
model=ChronoNet(14)
out=model(input)
out.shape

torch.Size([3, 1])

In [8]:
#Accessing the dataset
IDD='/content/drive/MyDrive/Rest'
TDC='/content/drive/MyDrive/Rest (1)'

In [9]:
#Converting the matlab file to mne
def convertmat2mne(data):
    ch_names = ["AF3", "F7", "F3", "FC5", "T7", "P7", "O1", "O2", "P8", "T8", "FC6", "F4", "F8", "AF4"]
    ch_types = ["eeg"] * 14
    info = mne.create_info(ch_names, ch_types=ch_types, sfreq=128)
    info.set_montage("standard_1020")
    data=mne.io.RawArray(data, info)
    data.set_eeg_reference()
    data.filter(l_freq=1,h_freq=30)
    epochs=mne.make_fixed_length_epochs(data,duration=4,overlap=0)
    return epochs.get_data()

In [10]:
%%capture
#Importing and converting IDD files to mne
idd_subject=[]
for idd in glob(IDD+'/*.mat'):
    data=scipy.io.loadmat(idd)['clean_data']
    data=convertmat2mne(data)
    idd_subject.append(data)

In [11]:
%%capture
#Importing and converting TDC files to mne
tdc_subject=[]
for tdc in glob(TDC+'/*.mat'):
    data=scipy.io.loadmat(tdc)['clean_data']
    data=convertmat2mne(data)
    tdc_subject.append(data)

In [12]:
len(idd_subject),len(tdc_subject)

(7, 7)

In [13]:
#Creating labels for the files
control_epochs_labels=[len(i)*[0] for i in tdc_subject] #label all epochs in each control file zero
patient_epochs_labels=[len(i)*[1] for i in idd_subject] #label all epochs in each patient file one
len(control_epochs_labels), len(patient_epochs_labels)

(7, 7)

In [14]:
#We need to combine both data and the labels, to form a dataset for the NN
data_list=tdc_subject+idd_subject #one dataset from TDC and IDD data
label_list=control_epochs_labels+patient_epochs_labels #one dataset from TDC and IDD labels
len(data_list), len(label_list)

(14, 14)

In [15]:
#Grouping the data - Avoid grouping based on epochs or the machine learning model would have seen the unseen data, group based on participants
groups=[[i]*len(j) for i,j in enumerate(data_list)]
len(groups)

14

In [16]:
#Create 5 Fold cross-validation loop and train the model for each loop
from sklearn.model_selection import GroupKFold, LeaveOneGroupOut
from sklearn.preprocessing import StandardScaler
gkf=GroupKFold()
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.preprocessing import StandardScaler

#Implementing a scaler for a 3D matrix
class StandardScaler3D(BaseEstimator, TransformerMixin):
    # batch, sequence, channels
    def __init__(self):
        self.scaler = StandardScaler()

    def fit(self, X, y=None):
        # Reshape to (batch*sequence, channels) and fit the scaler
        self.scaler.fit(X.reshape(-1, X.shape[2]))
        return self

    def transform(self, X):
        # Reshape to (batch*sequence, channels), transform, and reshape back
        X_scaled = self.scaler.transform(X.reshape(-1, X.shape[2]))
        return X_scaled.reshape(X.shape)


In [17]:
#Reshape data because CNN expects data in a particular shape. First check shape by converting list to array
data_array=np.vstack(data_list)
label_array=np.hstack(label_list)
group_array=np.hstack(groups)
data_array=np.moveaxis(data_array,1,2)

data_array.shape, label_array.shape, group_array.shape

((420, 512, 14), (420,), (420,))

In [18]:
#Splitting the train and val data for each group
accuracy=[]
for train_index,val_index in gkf.split(data_array,label_array,groups=group_array):
    train_features,train_labels=data_array[train_index],label_array[train_index]
    val_features,val_labels=data_array[val_index],label_array[val_index]
    scaler=StandardScaler3D()
    train_features=scaler.fit_transform(train_features)
    val_features=scaler.fit_transform(val_features)
    train_features=np.moveaxis(train_features,1,2) #rearranging to fit model input expectation
    val_features=np.moveaxis(val_features,1,2)

    break

In [19]:
train_features.shape, val_features.shape

((330, 14, 512), (90, 14, 512))

In [20]:
#Converting the arrays to tensors
train_features = torch.Tensor(train_features)
val_features = torch.Tensor(val_features)
train_labels = torch.Tensor(train_labels)
val_labels = torch.Tensor(val_labels)

In [21]:
len(val_features),len(val_labels)

(90, 90)

In [22]:
#Libraries needed for training
from pytorch_lightning import LightningModule,Trainer
import torchmetrics
from torch.utils.data import TensorDataset,DataLoader

In [27]:
#Class for training the model
class ChronoModel(LightningModule):
    def __init__(self):
        super(ChronoModel, self).__init__()
        self.model = ChronoNet(14)  # Network to be used for training
        self.lr = 1e-3  # learning rate
        self.bs = 12  # batch size
        self.worker = 2  # number of workers for data loading
        self.acc = torchmetrics.Accuracy(task="binary")  # To evaluate the model's performance
        self.criterion = nn.BCEWithLogitsLoss()  # Loss function suitable for binary classification
        self.training_step_outputs = []
        self.validation_step_outputs = []

    def forward(self, x):
        x = self.model(x)  # Apply the network to the data for training
        return x

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

    def train_dataloader(self):
        dataset = TensorDataset(train_features, train_labels)
        dataloader = DataLoader(dataset, batch_size=self.bs, num_workers=self.worker, shuffle=True)
        return dataloader

    def training_step(self, batch, batch_idx):
        signal, label = batch
        out = self(signal.float())  # you can also use self.forward(signal.float())
        loss = self.criterion(out.flatten(), label.float().flatten())
        acc = self.acc(out.flatten(), label.long().flatten())
        self.log('train_loss', loss, prog_bar=True)
        self.log('train_acc', acc, prog_bar=True)
        self.training_step_outputs.append({"loss": loss, "acc": acc})
        return {'loss': loss, 'acc': acc}

    def on_train_epoch_end(self):
        acc = torch.stack([output["loss"] for output in self.training_step_outputs]).mean().detach().cpu().numpy().round(2)
        loss = torch.stack([output["acc"] for output in self.training_step_outputs]).mean().detach().cpu().numpy().round(2)
        self.log('train_loss', loss)

        # Clear the stored outputs to free memory
        self.training_step_outputs.clear()
        print('train acc loss', acc, loss)

    def val_dataloader(self):
        dataset = TensorDataset(val_features, val_labels)
        dataloader = DataLoader(dataset, batch_size=self.bs, num_workers=self.worker, shuffle=False)
        return dataloader

    def validation_step(self, batch, batch_idx):
        signal, label = batch
        out = self(signal.float())
        loss = self.criterion(out.flatten(), label.float().flatten())
        acc = self.acc(out.flatten(), label.long().flatten())
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        self.validation_step_outputs.append({"loss": loss, "acc": acc})
        return {'loss': loss, 'acc': acc} # Return a dictionary containing loss and accuracy

    def on_validation_epoch_end(self):
      loss=torch.stack([output["loss"] for output in self.validation_step_outputs]).mean().detach().cpu().numpy().round(2)
      acc=torch.stack([output["acc"] for output in self.validation_step_outputs]).mean().detach().cpu().numpy().round(2)
      self.log('val_loss', loss)
      print('val acc loss', acc, loss)
      self.validation_step_outputs.clear()

In [29]:
#Using the Training Class
model=ChronoModel() #create an instance of the model

In [30]:
#Create an instance of the trainer
trainer=Trainer(max_epochs=10) #You can specify CPU,GPU or TPU Usage

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [31]:
trainer.fit(model) #fit the trainer to the model

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type              | Params | Mode 
--------------------------------------------------------
0 | model     | ChronoNet         | 133 K  | train
1 | acc       | BinaryAccuracy    | 0      | train
2 | criterion | BCEWithLogitsLoss | 0      | train
--------------------------------------------------------
133 K     Trainable params
0         Non-trainable params
133 K     Total params
0.534     Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

val acc loss 1.0 0.61


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 0.31 0.73
train acc loss 0.69 0.55


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 1.0 0.22
train acc loss 0.56 0.69


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 1.0 0.03
train acc loss 0.12 0.98


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 0.92 0.22
train acc loss 0.02 1.0


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 0.97 0.09
train acc loss 0.01 1.0


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 0.99 0.05
train acc loss 0.01 1.0


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 1.0 0.01
train acc loss 0.01 1.0


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 1.0 0.01
train acc loss 0.01 1.0


Validation: |          | 0/? [00:00<?, ?it/s]

val acc loss 1.0 0.0
train acc loss 0.0 1.0


Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=10` reached.


val acc loss 1.0 0.0
train acc loss 0.0 1.0
