# Import Data and Train Model


In [1]:
import os
import tempfile
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, ConcatDataset, SubsetRandomSampler
from torch.utils.data.dataset import random_split
from transformers import get_scheduler, AdamW
from datasets import load_metric
from torchsummary import summary
from kfold_functions import reset_weights
from sklearn.model_selection import KFold
from sklearn import metrics

import numpy as np
import librosa

import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd;
import numpy as np
import math

from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

from tqdm.auto import tqdm
import random

seed_number = 69 # 69 for absence and tonic-clonic / 21 for general
# torch.manual_seed(seed_number)
np.random.seed(seed_number)
random.seed(seed_number)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
seizure_type = "general"
main_folder_path = "melspectrograms/"
path = main_folder_path + seizure_type + "/"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Import Dataset


In [3]:
class Dataset(Dataset):
    def __init__(self, data, transform=None, target_transform=None):
        self.x, self.y = data
        self.n_samples = len(self.x)

        self.transform = transform
        self.target_transform = target_transform

    def __getitem__(self, idx):
        sample_x, sample_y = self.x[idx], self.y[idx]

        if self.transform:
            sample_x = self.transform(sample_x)

        if self.target_transform:
            sample_y = self.target_transform(sample_y)

        return sample_x.float(), sample_y.float()

    def __len__(self):
        return self.n_samples

In [4]:
train_x = np.load(f"{path}/imb_{seizure_type}_x.npy")
train_y = np.load(f"{path}/imb_{seizure_type}_y.npy")

validation_x = np.load(f"{path}/validation_{seizure_type}_x.npy")
validation_y = np.load(f"{path}/validation_{seizure_type}_y.npy")

test_x = np.load(f"{path}/test_{seizure_type}_x.npy")
test_y = np.load(f"{path}/test_{seizure_type}_y.npy")

In [5]:
print(len(train_y), len(validation_y), len(test_y))
print(f"Total: {len(train_y) + len(validation_y) + len(test_y)}")

3745 1070 534
Total: 5349


In [6]:
train_dataset = Dataset(
    (train_x, train_y),
    transform=torch.tensor,
    target_transform=torch.tensor,
)
validation_dataset = Dataset(
    (validation_x, validation_y),
    transform=torch.tensor,
    target_transform=torch.tensor,
)
test_dataset = Dataset(
    (test_x, test_y),
    transform=torch.tensor,
    target_transform=torch.tensor,
)

# Model

Mobilenet


In [7]:
class SZModel(nn.Module):
    def __init__(self, l1=128):
        super(SZModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, (3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        self.bn1 = nn.BatchNorm2d(
            16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True
        )
        self.hardswish = nn.Hardswish()
        self.conv1_2 = nn.Conv2d(
            16, 16, (3, 3), stride=(2, 2), padding=(1, 1), bias=False
        )
        self.ReLU = nn.ReLU()

        self.block1 = nn.Sequential(
            nn.Conv2d(
                16, 16, (3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False
            ),
            nn.BatchNorm2d(
                16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True
            ),
            nn.ReLU(),
            nn.Conv2d(16, 16, (1, 1), stride=(1, 1), padding=(0, 0), bias=False),
            nn.BatchNorm2d(
                16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True
            ),
        )

        self.conv2 = nn.Conv2d(
            16, 16, (3, 3), stride=(2, 2), padding=(1, 1), bias=False
        )
        self.bn2 = nn.BatchNorm2d(
            16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True
        )
        self.hardswish = nn.Hardswish()

        self.block2 = nn.Sequential(
            nn.Conv2d(
                16, 16, (3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False
            ),
            nn.BatchNorm2d(
                16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True
            ),
            nn.ReLU(),
            nn.Conv2d(16, 16, (1, 1), stride=(1, 1), padding=(0, 0), bias=False),
            nn.BatchNorm2d(
                16, eps=0.001, momentum=0.1, affine=True, track_running_stats=True
            ),
        )

        self.globalaveragepool = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.classif_block = nn.Sequential(
            nn.Linear(16, l1),
            nn.Hardswish(),
            nn.Dropout(p=0.3, inplace=True),
            nn.Linear(l1, 2),
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.hardswish(x)
        x = self.conv1_2(x)
        x = self.ReLU(x)

        x = self.block1(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.hardswish(x)

        x = self.block2(x)

        x = self.globalaveragepool(x)
        x = self.flatten(x)

        x = self.classif_block(x)
        return x

In [8]:
model = SZModel()
model = model.to(device)
# model
summary(model, (1, 40, 26), batch_size=-1, device="cuda")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 20, 13]             144
       BatchNorm2d-2           [-1, 16, 20, 13]              32
         Hardswish-3           [-1, 16, 20, 13]               0
            Conv2d-4            [-1, 16, 10, 7]           2,304
              ReLU-5            [-1, 16, 10, 7]               0
            Conv2d-6            [-1, 16, 10, 7]             144
       BatchNorm2d-7            [-1, 16, 10, 7]              32
              ReLU-8            [-1, 16, 10, 7]               0
            Conv2d-9            [-1, 16, 10, 7]             256
      BatchNorm2d-10            [-1, 16, 10, 7]              32
           Conv2d-11             [-1, 16, 5, 4]           2,304
      BatchNorm2d-12             [-1, 16, 5, 4]              32
        Hardswish-13             [-1, 16, 5, 4]               0
           Conv2d-14             [-1, 1

# K-Fold Cross Validation


In [9]:
# https://github.com/christianversloot/machine-learning-articles/blob/main/how-to-use-k-fold-cross-validation-with-pytorch.md

In [10]:
n_folds = 10
n_fold_epochs = 200
k_fold_batch_size = 512

loss_fn = nn.CrossEntropyLoss()

acc_results = {}  # For accuracy
cm_results = {}  # For confusion matrix
metric_results = {}
actual_labels = {}
predicted_labels = {}
roc_auc_results = {}
gmeans_results = {}

dataset = ConcatDataset([train_dataset, validation_dataset, test_dataset])

a, b, c = 0, 0, 0
for j in dataset:
    if j[1] == (1.0):
        b += 1
    else:
        a += 1
    c += 1
print(f"Background: {a}, {seizure_type} Seizure: {b}, Total: {c}")

kfold = KFold(n_splits=n_folds, shuffle=True)

Background: 4100, general Seizure: 1249, Total: 5349



In [11]:
for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):
    # Print
    print(f"FOLD {fold}")
    print("--------------------------------")

    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = SubsetRandomSampler(train_ids)
    test_subsampler = SubsetRandomSampler(test_ids)

    # Define data loaders for training and testing data in this fold
    trainloader = DataLoader(
        dataset, batch_size=k_fold_batch_size, sampler=train_subsampler
    )
    testloader = DataLoader(dataset, batch_size=1, sampler=test_subsampler)

    print(f"Total before SMOTE: {len(train_subsampler) + len(test_subsampler)}")
    print(f"Length of trainloader before SMOTE: {len(train_subsampler)}")
    # Copy trainloader to another variable
    trainloader_copy = trainloader
    del trainloader

    # Get trainloader and apply smote
    train_x, train_y = [], []
    for i, data in enumerate(trainloader_copy):
        inputs, targets = data
        for i in range(len(inputs)):
            train_x.append(inputs[i].cpu().numpy())
            train_y.append(targets[i].cpu().numpy())
    train_x = np.array(train_x)
    train_y = np.array(train_y)

    one_dim = []
    for i in train_x:
        one_dim.append(i)
    one_dim = np.array(one_dim)

    # Flatten to make it one dim
    shape = one_dim.shape
    one_dim_reshaped = np.reshape(one_dim, (shape[0], shape[1] * shape[2]))

    # Apply Smote
    x, y = SMOTE(sampling_strategy="not majority", random_state=42).fit_resample(
        one_dim_reshaped, train_y
    )

    # Reshape back to 2D
    x = np.reshape(x, (x.shape[0], shape[1], shape[2]))

    one_channel = []
    for i in x:
        one_channel.append([i])
    x = np.array(one_channel)

    # Back into dataloader
    train_dataset = Dataset(
        (x, y),
        transform=torch.tensor,
        target_transform=torch.tensor,
    )

    # Make test set one channel
    test_x, test_y = [], []

    testloader_copy = testloader
    del testloader

    for i, data in enumerate(testloader_copy):
        inputs, targets = data
        for i in range(len(inputs)):
            test_x.append(inputs[i].cpu().numpy())
            test_y.append(targets[i].cpu().numpy())
    test_x = np.array(test_x)
    test_y = np.array(test_y)

    test_one_channel = []
    for i in test_x:
        test_one_channel.append([i])
    test_x = np.array(test_one_channel)

    test_dataset = Dataset(
        (test_x, test_y),
        transform=torch.tensor,
        target_transform=torch.tensor,
    )

    trainloader = DataLoader(train_dataset, batch_size=k_fold_batch_size, shuffle=True)
    testloader = DataLoader(test_dataset, batch_size=1, shuffle=True)
    print(f"Length of trainloader: {len(train_dataset)}")
    print(f"Length of testloader: {len(test_subsampler)}")
    print(f"Total: {len(train_dataset) + len(test_subsampler)}")

    # Init the neural network
    network = SZModel()
    network.to(device)
    network.apply(reset_weights)

    # Initialize optimizer
    optimizer = torch.optim.Adam(network.parameters(), lr=5e-5)

    # Run the training loop for defined number of epochs
    network.train()
    for epoch in range(0, n_fold_epochs):

        # Print epoch
        # print(f"Starting epoch {epoch+1}")

        # Set current loss value
        current_loss = 0.0

        # Iterate over the DataLoader for training data
        for i, data in enumerate(trainloader, 0):

            # Get inputs
            inputs, targets = data
            inputs, targets = inputs.to(device), targets.to(device)
            targets = targets.to(torch.int64)

            # Zero the gradients
            optimizer.zero_grad()

            # Perform forward pass
            outputs = network(inputs)

            # Compute loss
            loss = loss_fn(outputs, targets)

            # Perform backward pass
            loss.backward()

            # Perform optimization
            optimizer.step()

            # Print statistics
            current_loss += loss.item()
            if i % 500 == 499:
                print("Loss after mini-batch %5d: %.3f" % (i + 1, current_loss / 500))
                current_loss = 0.0

    # Process is complete.
    print("Training process has finished")

    # Print about testing
    print("Starting testing")
    print(" ")

    # Saving the model
    save_path = f"model/kfold/{seizure_type}/{seizure_type}-fold-{fold}.pth"
    torch.save(network.state_dict(), save_path)

    # Evaluationfor this fold
    correct, total = 0, 0
    actual_test, predicted_test = [], []
    tp_fn_tn_fp = [0, 0, 0, 0]
    network.eval()
    with torch.no_grad():
        # Iterate over the test data and generate predictions
        for i, data in enumerate(testloader, 0):

            # Get inputs
            inputs, targets = data
            inputs, targets = inputs.to(device), targets.to(device)
            targets = targets.to(torch.int64)

            # Generate outputs
            outputs = network(inputs)

            # Set total and correct
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

            outputs = nn.functional.softmax(outputs, dim=1)

            # print(targets)
            # print(outputs)
            # print(" ")

            actual_test.append(targets[0].cpu().numpy())
            predicted_test.append(outputs[0][1].cpu().numpy())

            # print(outputs, targets)
            if targets[0] == torch.tensor([1]).to(device):
                if torch.round(outputs[0][1], decimals=4) > torch.round(
                    outputs[0][0], decimals=4
                ):
                    tp_fn_tn_fp[0] += 1
                else:
                    tp_fn_tn_fp[1] += 1
            elif targets[0] == torch.tensor([0]).to(device):
                if torch.round(outputs[0][0], decimals=4) > torch.round(
                    outputs[0][1], decimals=4
                ):
                    tp_fn_tn_fp[2] += 1
                else:
                    tp_fn_tn_fp[3] += 1

        # print(tp_fn_tn_fp)
        # Calculate Results
        tp, fn, tn, fp = (
            tp_fn_tn_fp[0],
            tp_fn_tn_fp[1],
            tp_fn_tn_fp[2],
            tp_fn_tn_fp[3],
        )

        sensitivity = round((tp) / (tp + fn), 5)
        specificity = round((tn) / (tn + fp), 5)
        try:
            precision = round((tp) / (tp + fp), 5)
        except:
            precision = 0
        accuracy = round((tp + tn) / (tp + tn + fp + fn), 5)

        y_score = []
        for i in predicted_test:
            y_score.append(i.item())

        y_true = []
        for i in actual_test:
            y_true.append(i.item())

        print(y_true)
        print(y_score)
        fpr, tpr, thresholds = metrics.roc_curve(y_true=y_true, y_score=y_score)
        roc_auc = metrics.auc(fpr, tpr)
        roc_auc_score = metrics.roc_auc_score(y_true=y_true, y_score=y_score)

        print("Roc Auc")
        print(roc_auc_score)

        # Print Results
        print("Accuracy for fold %d: %d %%" % (fold, 100.0 * correct / total))
        print(f"tp_fn_tn_fp for fold {fold} is {tp_fn_tn_fp}")
        print(
            f"sensitivity {sensitivity}, specificity {specificity}, precision {precision}"
        )
        print(f"roc_auc {roc_auc}")
        print(f"gmean {math.sqrt(sensitivity * specificity)}")
        print("--------------------------------")

        # Store Results
        acc_results[fold] = 100.0 * (correct / total)
        cm_results[fold] = tp_fn_tn_fp
        metric_results[fold] = [sensitivity, specificity, precision]
        predicted_labels[fold] = predicted_test
        actual_labels[fold] = actual_test
        roc_auc_results[fold] = roc_auc
        gmeans_results[fold] = math.sqrt(sensitivity * specificity)

# Print fold results
print(f"K-FOLD CROSS VALIDATION RESULTS FOR {n_folds} FOLDS")
print("--------------------------------")
sum = 0.0
for key, value in acc_results.items():
    print(f"Fold {key}:")
    sum += value
    print(f"    (tp_fn_tn_fp) {cm_results[key]}")
    print(
        f"  sensitivity {metric_results[key][0]*100}%, specificity {metric_results[key][1]*100}%, precision {metric_results[key][2]*100}%, accuracy {round(value, 2)}%"
    )
    print(" ")
    print(f"  roc_auc {roc_auc_results[key]}")
    print(f"  gmean {gmeans_results[key]}")
print(f"Average Accuracy: {sum/len(acc_results.items())} %")

FOLD 0
--------------------------------
Total before SMOTE: 5349
Length of trainloader before SMOTE: 4814
Length of trainloader: 7386
Length of testloader: 535
Total: 7921
Training process has finished
Starting testing
 


IndexError: too many indices for array: array is 0-dimensional, but 1 were indexed

# Create Tables and Save


In [47]:
rows = []
for key, value in acc_results.items():
    row = [
        key,
        round(value, 2),
        round(metric_results[key][0] * 100, 2),
        round(metric_results[key][1] * 100, 2),
        round(metric_results[key][2] * 100, 2),
        round(roc_auc_results[key] * 100, 2),
        round(gmeans_results[key] * 100, 2),
        f"{cm_results[key][0]} / {cm_results[key][1]} / {cm_results[key][2]} / {cm_results[key][3]}",
    ]
    rows.append(row)

columns = [
    "fold",
    "accuracy",
    "sensitivity",
    "specificity",
    "precision",
    "roc_auc",
    "gmean",
]


df = pd.DataFrame(
    rows,
    columns=[
        "fold",
        "accuracy",
        "sensitivity",
        "specificity",
        "precision",
        "roc_auc",
        "gmean",
        "tp / fn /tn / fp",
    ],
)

acc_avg, sen_avg, spe_avg, pre_avg, roc_avg, gmean_avg = 0, 0, 0, 0, 0, 0
for key, value in acc_results.items():
    acc_avg += value
    sen_avg += metric_results[key][0]
    spe_avg += metric_results[key][1]
    pre_avg += metric_results[key][2]
    roc_avg += roc_auc_results[key]
    gmean_avg += gmeans_results[key]
acc_avg /= len(acc_results.items())
sen_avg /= len(acc_results.items())
spe_avg /= len(acc_results.items())
pre_avg /= len(acc_results.items())
roc_avg /= len(acc_results.items())
gmean_avg /= len(acc_results.items())
avg_row = [
    "Average",
    round(acc_avg, 2),
    round(sen_avg * 100, 2),
    round(spe_avg * 100, 2),
    round(pre_avg * 100, 2),
    round(roc_avg * 100, 2),
    round(gmean_avg * 100, 2),
    " ",
]

df.loc[len(df)] = avg_row

In [48]:
df

Unnamed: 0,fold,accuracy,sensitivity,specificity,precision,roc_auc,gmean,tp / fn /tn / fp
0,0,89.35,80.47,92.14,76.3,92.2,86.11,103 / 25 / 375 / 32
1,1,90.47,79.31,93.56,77.31,92.1,86.14,92 / 24 / 392 / 27
2,2,91.03,74.42,96.3,86.49,90.29,84.66,96 / 33 / 391 / 15
3,3,91.4,70.49,97.58,89.58,91.1,82.94,86 / 36 / 403 / 10
4,4,91.03,69.34,98.49,94.06,91.82,82.64,95 / 42 / 392 / 6
5,5,89.35,76.87,93.52,79.84,91.16,84.78,103 / 31 / 375 / 26
6,6,82.8,81.75,83.13,59.88,90.34,82.44,103 / 23 / 340 / 69
7,7,92.15,75.44,96.67,86.0,91.52,85.4,86 / 28 / 407 / 14
8,8,83.55,81.89,84.07,61.54,89.26,82.97,104 / 23 / 343 / 65
9,9,91.01,66.38,97.85,89.53,88.34,80.59,77 / 39 / 409 / 9


In [49]:
df.to_csv(f"results/10_fold/{seizure_type}_2_results.csv", index=False)