In [1]:
import warnings
# import timm
import gc

# from fastai.vision.all import *
# from fastcore.parallel import *


In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import pandas as pd
import os
import numpy as np
from scipy.signal import butter, filtfilt
import librosa
import torch
import torch.nn as nn
import torch.nn.functional as F
# import timm
from tqdm import tqdm,trange,tqdm_notebook
# from multiprocessing import Pool

import matplotlib.pyplot as plt

## Create `DataLoaders`

Instead of processing the parquet files, I'm now using images directly which I have saved here after preprocessing and converting into the spectrogram. 

In [7]:
class CustomDataset(Dataset):
    def __init__(self, csv_path, transform=None,root='..'):
        '''
        csv_path: path to your metadata
        \n
        USE_WAVELET:
        \n
        \t'LPF10'-> for low pass with 10 Hz \n
        \t'LPF20'-> for low pass with 20 Hz \n
        \t'USE_WAVELET'-> use other default wavelets \n
        root: path to 'hms-harmful-brain-activity-classification'
        specMethod: 'mel', 'SL','CWT'
        '''

        self.metadata = pd.read_csv(csv_path)
        self.transform = transform
        self.label_map = {'Seizure': 0, 'GPD': 1, 'LRDA': 2, 'Other': 3, 'GRDA': 4, 'LPD': 5}


        self.root = root


    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        
        subSet = self.metadata.iloc[idx]
        eeg_id = subSet['eeg_id']
        offset_seconds = subSet['eeg_label_offset_seconds']
        image_name = f"{eeg_id}-{offset_seconds}.png"
        print(f'{image_name}')
        image_path = os.path.join(self.root_dir, image_name)
        print(f'{image_path}')
        image = Image.open(image_path)

        image = self.transform(image)

        # Extract labels
        labels = np.array([subSet['seizure_vote'], subSet['lpd_vote'], subSet['gpd_vote'],
                           subSet['lrda_vote'], subSet['grda_vote'], subSet['other_vote']], dtype=float)
        total_labels = np.sum(labels)
        labels /= total_labels

        # Extract consensus
        consensus = self.label_map[subSet['expert_consensus']]

        return image, labels, consensus
    

In [8]:
# class Upload_Dataset(Dataset):
#     def __init__(self, csv_file,  root_dir,label_map,transform=None ):
#         self.metadata = pd.read_csv(csv_file)
        
#         self.root_dir = root_dir
#         self.label_map = {'Seizure': 0, 'GPD': 1, 'LRDA': 2, 'Other': 3, 'GRDA': 4, 'LPD': 5}
#         self.transform = transform
        

#     def __len__(self):
#         return len(self.metadata)

#     def __getitem__(self, idx):
#         print('test')
#         subSet = self.metadata.iloc[idx]
#         eeg_id = subSet['eeg_id']
#         offset_seconds = subSet['eeg_label_offset_seconds']
#         image_name = f"{eeg_id}-{offset_seconds}.png"
#         print(f'{image_name}')
#         image_path = os.path.join(self.root_dir, image_name)
#         print(f'{image_path}')
#         image = Image.open(image_path)

#         image = self.transform(image)

#         # Extract labels
#         labels = np.array([subSet['seizure_vote'], subSet['lpd_vote'], subSet['gpd_vote'],
#                            subSet['lrda_vote'], subSet['grda_vote'], subSet['other_vote']], dtype=float)
#         total_labels = np.sum(labels)
#         labels /= total_labels

#         # Extract consensus
#         consensus = self.label_map[subSet['expert_consensus']]

#         return image, labels, consensus

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform = transforms.Compose([
    transforms.ToTensor(),
])

labelMap = {'Seizure': 0, 'GPD': 1, 'LRDA': 2, 'Other': 3, 'GRDA': 4, 'LPD': 5}

batch_size = 1

# train_path = 'hms-harmful-brain-activity-classification-LPF10-MEL/test_mine.csv'
test_path = 'hms-harmful-brain-activity-classification-LPF10-MEL/test_mine.csv'
# val_path = '/kaggle/input/hms-split/val_mine.csv'
# root = '/kaggle/input/hms-Spectrogram'
# filter_method = ''
# tranform_method = ''
# pre_spec = os.path.join(root, f"{filter_method}_{transform_method}")

# train_dataset = Upload_Dataset(csv_path=train_path, transform=transform,root=os.path.join(pre_spec,f"train"), labelMap)
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)


test_dataset = CustomDataset(csv_path=test_path, transform=transform,root=os.path.join('hms-harmful-brain-activity-classification-LPF10-MEL',f"test"))
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=2)


# val_dataset = Upload_Dataset(csv_path=val_path, transform=transform,root=os.path.join(pre_spec,f"val"), labelMap)
# val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

In [11]:
image, labels, consensus = next(iter(test_loader))

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/miniconda3/envs/ivp/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/miniconda3/envs/ivp/lib/python3.8/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'CustomDataset' on <module '__main__' (built-in)>


KeyboardInterrupt: 

## Create 'ConvNextModel'

Here we have tried out basic ConvNext Model. Similarly try out rest of the models also with different methods and variantions in parameters.

In [None]:
class ConvNext(nn.Module):
    def __init__(self, n_classes, num_input_channels=1, pretrained=True):

        super(ConvNext, self).__init__()

        self.model = timm.create_model("convnext_small_384_in22ft1k", pretrained=False, in_chans=num_input_channels)
        if pretrained:
            self.model.load_state_dict(torch.load("../input/..."))
        self.model.head.fc = nn.Linear(self.model.head.fc.in_features, n_classes)
        
    def forward(self, x):
        x = self.model(x)
        return x

epoch = 10
if torch.cuda.device_count() > 1:
    print("Using", torch.cuda.device_count(), "GPUs!")
    convnext_model = nn.DataParallel(ConvNext(6, 1, False)).to(device)
else:
    convnext_model = ConvNext(6, 1, False).to(device)

loss_history = [[], []]
accuracy_history = [[], []]
acc_epoch_history = [[],[]]
loss_epoch_history = [[],[]]

optimizer = torch.optim.Adam(convnext_model.parameters(), lr=2e-04)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

precision_history = [[], []]
recall_history = [[], []]
f1_history = [[], []]

for e in trange(epoch):
    convnext_model.train()
    print(f"====================== EPOCH {e+1} ======================")
    print("Training.....")
    for i, (data, labels, target) in enumerate(train_loader):
        optimizer.zero_grad()
        data, labels, target = data.to(device), labels.to(device), target.to(device)
        output = convnext_model(data.float())
        labels = labels.float()
        loss = F.kl_div(F.log_softmax(output, dim=1), labels, reduction='batchmean')
        
        loss.backward()
        
        nn.utils.clip_grad_norm_(convnext_model.parameters(), 3)

        loss_history[0].append(loss.item())
        
        optimizer.step()
        
        print(f"MINIBATCH {i+1}/{len(train_loader)} TRAIN LOSS : {loss_history[0][-1]}")
    
    print("Validation.....")
    convnext_model.eval()
    
    with torch.no_grad():
        true_positives = 0
        false_positives = 0
        false_negatives = 0
        for i, (data, labels, target) in enumerate(val_loader):
            data, labels, target = data.to(device), labels.to(device), target.to(device)
            output = convnext_model(data)
            labels = labels.float()
            loss = F.kl_div(F.log_softmax(output, dim=1), labels, reduction='batchmean')
            
            predicted_labels = output.argmax(dim=1)
            true_positives += ((predicted_labels == target) & (predicted_labels == 1)).sum().item()
            false_positives += ((predicted_labels == 1) & (target == 0)).sum().item()
            false_negatives += ((predicted_labels == 0) & (target == 1)).sum().item()

            accuracy = (output.argmax(dim=1) == target).float().mean()
            loss_history[1].append(loss.item())
            accuracy_history[1].append(accuracy)
        
        precision = precision_score(target.cpu(), predicted_labels.cpu())
        recall = recall_score(target.cpu(), predicted_labels.cpu())
        f1 = f1_score(target.cpu(), predicted_labels.cpu())

        precision_history[1].append(precision)
        recall_history[1].append(recall)
        f1_history[1].append(f1)
        
    acc_epoch_history[0].append(sum(accuracy_history[0][-1:-len(train_loader):-1])/len(train_loader))
    acc_epoch_history[1].append(sum(accuracy_history[1][-1:-len(val_loader):-1])/len(val_loader))
    
    loss_epoch_history[0].append(sum(loss_history[0][-1:-len(train_loader):-1])/len(train_loader))
    loss_epoch_history[1].append(sum(loss_history[1][-1:-len(val_loader):-1])/len(val_loader))
    
    print("====================================================")
    print(f"TRAIN ACC : {acc_epoch_history[0][-1]}  TRAIN LOSS : {loss_epoch_history[0][-1]}")
    print(f"VAL ACC : {acc_epoch_history[1][-1]}  VAL LOSS : {loss_epoch_history[1][-1]}")
    print(f"VAL PRECISION: {precision}  RECALL: {recall}  F1: {f1}")
    print("====================================================")
    
    torch.save({
            'epoch': e,
            'model_state_dict': convnext_model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss_epoch_history[0][-1],
            'acc' : acc_epoch_history[0][-1]
            }, './model_checkpoint.pt')

convnext_model.eval()

with torch.no_grad():
    true_positives = 0
    false_positives = 0
    false_negatives = 0
    for i, (data, labels, target) in enumerate(test_loader):
        data, labels, target = data.to(device), labels.to(device), target.to(device)
        output = convnext_model(data)
        labels = labels.float()
        loss = F.kl_div(F.log_softmax(output, dim=1), labels, reduction='batchmean')

        predicted_labels = output.argmax(dim=1)
        true_positives += ((predicted_labels == target) & (predicted_labels == 1)).sum().item()
        false_positives += ((predicted_labels == 1) & (target == 0)).sum().item()
        false_negatives += ((predicted_labels == 0) & (target == 1)).sum().item()

    precision = true_positives / (true_positives + false_positives)
    recall = true_positives / (true_positives + false_negatives)
    f1 = 2 * (precision * recall) / (precision + recall)

    avg_test_loss = test_loss / len(test_loader)
    test_accuracy = correct / total

print("====================================================")
print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")
print(f"Test Precision: {precision}, Recall: {recall}, F1: {f1}")
print("====================================================")