In [1]:
from google.colab import drive
import zipfile
import os

In [2]:
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install pyedflib

Collecting pyedflib
  Downloading pyEDFlib-0.1.38-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.3 kB)
Downloading pyEDFlib-0.1.38-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.7 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m2.6/2.7 MB[0m [31m78.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m51.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyedflib
Successfully installed pyedflib-0.1.38


In [4]:
import pyedflib
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from scipy.fft import fft

In [5]:
!pip install pywavelets

Collecting pywavelets
  Downloading pywavelets-1.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Downloading pywavelets-1.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.5 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/4.5 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━[0m [32m3.3/4.5 MB[0m [31m99.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m73.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pywavelets
Successfully installed pywavelets-1.7.0


In [6]:
import pyedflib
import numpy as np
import torch
from torch.utils.data import Dataset
from scipy.signal import butter, lfilter, resample
import pywt
import scipy.stats as stats

In [7]:
!pip install joblib



In [30]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset
import pyedflib
import pywt

class EEGDataset(Dataset):
    def __init__(self, folder_path, labels_dict, target_length=160000, wavelet='sym2', wavelet_level=3):
        self.folder_path = folder_path
        self.labels_dict = labels_dict
        self.file_list = [f for f in os.listdir(folder_path) if f.endswith(".edf")]
        self.target_length = target_length
        self.wavelet = wavelet
        self.wavelet_level = wavelet_level

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        filename = self.file_list[idx]
        file_path = os.path.join(self.folder_path, filename)

        # Load the EDF file only when needed
        signals = self.load_edf_file_raw(file_path)

        # Preprocessing the signals using wavelet transform
        signals = self.preprocess_eeg_signal_wavelet(signals, wavelet=self.wavelet, level=self.wavelet_level)

        # Padding or truncating the signals to ensure all have the same length
        signals = self.pad_or_truncate(signals, self.target_length)

        # Extracting the file number from the filename
        file_number = filename.replace('.edf', '')

        # Get the label for the current file
        label = self.labels_dict.get(file_number, -1)  # Default to -1 if no label found

        # Convert signals and label to tensors
        signals_tensor = torch.Tensor(signals)
        label_tensor = torch.LongTensor([label]).squeeze()

        return signals_tensor, label_tensor

    def load_edf_file_raw(self, file_path):
        f = pyedflib.EdfReader(file_path)
        n = f.signals_in_file
        signals = np.zeros((n, f.getNSamples()[0]))  # Creating array for raw data

        # Reading all channels
        for i in range(n):
            signals[i, :] = f.readSignal(i)

        f.close()
        return signals

    def preprocess_eeg_signal_wavelet(self, raw_signal, wavelet='db4', level=3):
        coeffs = []
        for channel in raw_signal:
            # Performing a multilevel wavelet decomposition for each channel
            channel_coeffs = pywt.wavedec(channel, wavelet, level=level)
            coeffs.append(channel_coeffs)

        # Reconstruct signals from selected approximation and detail coefficients
        reconstructed_signal = []
        for channel_coeffs in coeffs:
            reconstructed_channel = pywt.waverec([channel_coeffs[0]] + [None] * (len(channel_coeffs) - 1), wavelet)
            reconstructed_signal.append(reconstructed_channel)

        return np.array(reconstructed_signal)

    def pad_or_truncate(self, signal, target_length):
        if signal.shape[1] > target_length:
            return signal[:, :target_length]
        else:
            padding = np.zeros((signal.shape[0], target_length - signal.shape[1]))
            return np.concatenate((signal, padding), axis=1)


In [8]:
import pandas as pd

csv_path = '/content/drive/MyDrive/Harshitha/annotation.xlsx'
labels_df = pd.read_excel(csv_path)

def map_labels(row):
    if row['dementia'] == 1:
        return 2  # Dementia
    elif row['mci'] == 1:
        return 1  # MCI
    else:
        return 0  # Normal

labels_df['Label'] = labels_df.apply(map_labels, axis=1)

labels_dict = dict(zip(labels_df['serial'].astype(str).str.zfill(5), labels_df['Label']))

In [31]:
folder_path = '/content/drive/MyDrive/Harshitha/final_edf'

# Creating the EEGDataset instance for lazy loading
eeg_dataset = EEGDataset(
    folder_path=folder_path,
    labels_dict=labels_dict,
    target_length=160000,
    wavelet='sym2',
    wavelet_level=3
)

dataset_size = len(eeg_dataset)
test_split = 0.2
test_size = int(dataset_size * test_split)
train_size = dataset_size - test_size

train_dataset, test_dataset = random_split(eeg_dataset, [train_size, test_size])

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [32]:
import torch.nn as nn

class EEG_CNN(nn.Module):
    def __init__(self, num_classes):
        super(EEG_CNN, self).__init__()

        self.conv1 = nn.Conv1d(in_channels=21, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.fc1 = None
        self.fc2 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(p=0.5)
        self.relu = nn.ReLU()

    def forward(self, x):
        # Forward pass through the CNN
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.relu(self.conv3(x))
        x = self.pool(x)

        x = x.view(x.size(0), -1)  # Flatten the output

        if self.fc1 is None:
            print(f"Flattened output size: {x.size(1)}")
            self.fc1 = nn.Linear(x.size(1), 128)

        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [33]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast

def train_model(model, train_loader, num_epochs=10):
    criterion = nn.CrossEntropyLoss()

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    scaler = GradScaler()

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        # Loop over batches of data
        for inputs, labels in train_loader:
            optimizer.zero_grad()

            # Forward pass with autocast for mixed precision
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            # Backward pass and optimization with mixed precision scaling
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # Calculate statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Calculation of epoch metrics
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = 100 * correct / total

        # Get the current learning rate
        current_lr = optimizer.param_groups[0]['lr']

        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Loss: {epoch_loss:.4f}, '
              f'Accuracy: {epoch_accuracy:.2f}%, '
              f'LR: {current_lr:.6f}')

    return model

In [34]:
num_classes = 3  # (Normal, MCI, Dementia)
input_sequence_length = 160000
model = EEG_CNN(num_classes=num_classes)

# Train the model
train_model(model, train_dataloader, num_epochs=10)

  scaler = GradScaler()
  with autocast():


Flattened output size: 2560000
Epoch 1/10, Loss: 1.5635, Accuracy: 29.17%, LR: 0.001000
Epoch 2/10, Loss: 1.2136, Accuracy: 34.17%, LR: 0.001000
Epoch 3/10, Loss: 1.0207, Accuracy: 50.83%, LR: 0.001000
Epoch 4/10, Loss: 0.8748, Accuracy: 59.17%, LR: 0.001000
Epoch 5/10, Loss: 0.8579, Accuracy: 60.83%, LR: 0.001000
Epoch 6/10, Loss: 0.7755, Accuracy: 70.00%, LR: 0.001000
Epoch 7/10, Loss: 0.7159, Accuracy: 74.17%, LR: 0.001000
Epoch 8/10, Loss: 0.6814, Accuracy: 75.83%, LR: 0.001000
Epoch 9/10, Loss: 0.6584, Accuracy: 74.17%, LR: 0.001000
Epoch 10/10, Loss: 0.5544, Accuracy: 86.67%, LR: 0.001000


EEG_CNN(
  (conv1): Conv1d(21, 32, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv3): Conv1d(64, 128, kernel_size=(3,), stride=(1,), padding=(1,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc2): Linear(in_features=128, out_features=3, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (relu): ReLU()
  (fc1): Linear(in_features=2560000, out_features=128, bias=True)
)

In [35]:
!pip install torchmetrics

Collecting torchmetrics
  Downloading torchmetrics-1.4.2-py3-none-any.whl.metadata (19 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.11.7-py3-none-any.whl.metadata (5.2 kB)
Downloading torchmetrics-1.4.2-py3-none-any.whl (869 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/869.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m869.2/869.2 kB[0m [31m26.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lightning_utilities-0.11.7-py3-none-any.whl (26 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.11.7 torchmetrics-1.4.2


In [41]:
from torchmetrics import Precision, Recall, F1Score
from sklearn.preprocessing import label_binarize

def evaluate_model(model, dataloader, num_classes=3):
    model.eval()
    all_preds = []
    all_probs = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            all_probs.append(outputs.cpu())
            all_preds.append(preds.cpu())
            all_labels.append(labels.cpu())

    all_preds = torch.cat(all_preds)
    all_labels = torch.cat(all_labels)
    all_probs = torch.cat(all_probs)

    # Computation of precision, recall, F1-score
    precision_value = Precision(task='multiclass', num_classes=num_classes, average='macro')(all_preds, all_labels)
    recall_value = Recall(task='multiclass', num_classes=num_classes, average='macro')(all_preds, all_labels)
    f1_value = F1Score(task='multiclass', num_classes=num_classes, average='macro')(all_preds, all_labels)

    print(f"Precision: {precision_value.item():.4f}")
    print(f"Recall: {recall_value.item():.4f}")
    print(f"F1 Score: {f1_value.item():.4f}")

# Evaluatation of the model on the test set
evaluate_model(model, test_dataloader, num_classes=3)


Precision: 0.3168
Recall: 0.3611
F1 Score: 0.3317
