<a href="https://colab.research.google.com/github/JoRE13/Neonatal-Seizure-Detection/blob/main/REI505M_Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Final Project - REI505M

The following notebook contains all code used for the project

In [None]:
# Mount the notebook
from google.colab import drive
drive.mount('/content/drive')

FOLDERNAME = 'Gervigreind/final-project/'
assert FOLDERNAME is not None, "[!] Enter the foldername."

import sys
sys.path.append('/content/drive/MyDrive/{}'.format(FOLDERNAME))

Mounted at /content/drive


## Data Porcessing

In [None]:
avg_seizure_len = np.zeros((79,3))
for i in range(79):
  annot = np.genfromtxt('drive/MyDrive/Gervigreind/final-project/neonat/annotation/id{:d}.txt'.format(i+1),\
                        delimiter='\t',skip_header=False)
  for j in range(3):
    num_seiz_ep = 0
    annotations = annot[:,j]
    for k in range(len(annotations)-1):
      if annotations[k] == 1 and annotations[k+1] == 0: # If there is a zero after a 1 a sizure eepisode has ended
        num_seiz_ep += 1
    if annotations[-1] == 1: # If ends in a seizure
      num_seiz_ep += 1
    if num_seiz_ep == 0:
      avg_seizure_len[i,j] = 0
    else:
      avg_seizure_len[i,j] = np.sum(annotations) / num_seiz_ep

print('Average duration of seizure episodes (expert A, expert B, expert C)\n')
for i in range(79):
  print('Recording ' + str(i+1) + ': ' + str(round(avg_seizure_len[i,0],2))+'s ' + str(round(avg_seizure_len[i,1],2)) + 's ' + str(round(avg_seizure_len[i,2],2))+'s')

Average duration of seizure episodes (expert A, expert B, expert C)

Recording 1: 64.08s 74.79s 27.69s
Recording 2: 32.5s 0.0s 0.0s
Recording 3: 0.0s 0.0s 0.0s
Recording 4: 462.5s 165.86s 329.0s
Recording 5: 665.0s 637.8s 454.86s
Recording 6: 0.0s 0.0s 122.75s
Recording 7: 104.17s 46.19s 101.33s
Recording 8: 32.0s 36.5s 0.0s
Recording 9: 294.0s 130.12s 287.33s
Recording 10: 0.0s 0.0s 0.0s
Recording 11: 33.0s 31.75s 40.0s
Recording 12: 0.0s 0.0s 31.0s
Recording 13: 254.2s 236.83s 224.33s
Recording 14: 47.6s 92.81s 60.89s
Recording 15: 77.42s 95.5s 65.4s
Recording 16: 20.67s 94.38s 22.21s
Recording 17: 42.75s 194.33s 20.0s
Recording 18: 0.0s 0.0s 0.0s
Recording 19: 244.89s 206.08s 218.2s
Recording 20: 44.18s 94.42s 43.92s
Recording 21: 43.0s 42.0s 39.0s
Recording 22: 87.12s 229.83s 91.43s
Recording 23: 141.5s 0.0s 14.11s
Recording 24: 0.0s 295.0s 0.0s
Recording 25: 23.67s 57.0s 18.25s
Recording 26: 0.0s 0.0s 13.71s
Recording 27: 0.0s 0.0s 0.0s
Recording 28: 0.0s 0.0s 0.0s
Recording 29: 0

In [None]:
def z_score_normalize(data):
  """
  Standardices individual signals

  inputs: data is a time series
  """
  return (data - np.mean(data)) / np.std(data)

In [None]:
import numpy as np
from neonat.utils.edf import read_edf
from neonat.utils.filter_signal import filter_signal
from scipy.signal import butter, filtfilt
from scipy.signal import decimate
import pickle

%load_ext autoreload
%autoreload 2

def loadEdfDataset (number_of_recordings = 79, class_type='consensus'):
  """
  Function that reads in the eeg files and their annotations and preproceesses the data.

  Inputs: number_of_recordings is the number of eeg files
          class_type is type of labeling that should be used

  Outputs: recordings is a list containing n segments where each segment is a (21,512) array that is a 16 second eeg segment
           y is a n long list containing the corresponding labels for seegment in recordings
           number_samples_per_rec is the number of segments for each recording.
  """
  recordings = []
  y = []
  number_of_samples_per_rec = np.zeros(number_of_recordings) # to sort the segments by recording

  for recording_id in range(1,number_of_recordings+1):
    file_name = 'drive/MyDrive/Gervigreind/final-project/neonat/eeg/eeg{}.edf'.format(recording_id)
    sampling_freq, downsample, dsf, data, channel_labels, n, start_time, _ = read_edf(file_name, 256)
    sampling_freq = int(sampling_freq)
    annot = np.genfromtxt('drive/MyDrive/Gervigreind/final-project/neonat/annotation/id{:d}.txt'.format(recording_id),\
                        delimiter='\t',skip_header=False)
    duration = int(data.shape[1] / sampling_freq)

    # Filter the data with band filter and downsample the data from 256 Hz sampling rate to 32 Hz i.e. a factor of 8
    n_channels = int(data.shape[0])
    n_samples = int(data.shape[1])
    new_sampling_freq = 32
    factor = 256/32

    data_filtered = np.zeros((n_channels, int(n_samples/factor)))
    for i in range(n_channels):
      single_channel = data[i,:].reshape(1,-1)
      data_filtered[i,:] = decimate(filter_signal(single_channel)[0,:], int(factor)) # filter signal
      data_filtered[i,:] = z_score_normalize(data_filtered[i,:]) # standardize individual signal

    sec = 0
    while (4*sec+16 < duration):
      segment = data_filtered[:, 4*sec*new_sampling_freq:(4*sec+16)*new_sampling_freq] #16 sec segment of recording
      classA = np.sum(annot[4*sec: 4*sec+16, 0])
      classB = np.sum(annot[4*sec: 4*sec+16, 1])
      classC = np.sum(annot[4*sec: 4*sec+16, 2])
      if ( classA + classB + classC == 0): # all classified as 0
        recordings.append(segment)
        y.append(0)
        number_of_samples_per_rec[recording_id-1] += 1
      if(class_type == 'consensus' and classA + classB + classC == 48): # consensus labeling
        recordings.append(segment)
        y.append(1)
        number_of_samples_per_rec[recording_id-1] += 1
      if(class_type == 'majority' and (classA + classB == 32 or classA + classC == 32 or classB + classC == 32)): # majority labeling
        recordings.append(segment)
        y.append(1)
        number_of_samples_per_rec[recording_id-1] += 1
      if(class_type == 'contains' and (classA == 16 or classB == 16 or classC == 16)): # contains labeling
        recordings.append(segment)
        y.append(1)
        number_of_samples_per_rec[recording_id-1] += 1
      sec = sec + 1

  return recordings, y, number_of_samples_per_rec


In [None]:
# Load in the dataset
recordings_consensus, y_consensus, number_of_samples_per_rec_consensus = loadEdfDataset()
#recordings_majority, y_majority, number_of_samples_per_rec_majority = loadEdfDataset(class_type='majority')
#recordings_contains, y_contains, number_of_samples_per_rec_contains = loadEdfDataset(class_type='contains')

In [None]:
pos_seiz_cons = np.sum(y_consensus)
pos_seiz_maj = np.sum(y_majority)
pos_seiz_cont = np.sum(y_contains)
neg_seiz_cons = len(y_consensus) - pos_seiz_cons
neg_seiz_maj = len(y_majority) - pos_seiz_maj
neg_seiz_cont = len(y_contains) - pos_seiz_cont

print('Number of seizure segments in consensus {}'.format(pos_seiz_cons))
print('Number of negative segments in consenus {}\n'.format(neg_seiz_cons))

print('Number of seizure segments in majority {}'.format(pos_seiz_maj))
print('Number of negative segments in majority {}\n'.format(neg_seiz_maj))

print('Number of seizure segments in contains {}'.format(pos_seiz_cont))
print('Number of negative segments in contains {}\n'.format(neg_seiz_cont))

Number of seizure segments in consensus 8563
Number of negative segments in consenus 80106

Number of seizure segments in majority 10847
Number of negative segments in majority 80106

Number of seizure segments in contains 16455
Number of negative segments in contains 80106



## Feature classifier

In [None]:
from neonat.pyeeg import hjorth_mobility_complexity as hmc
from neonat.pyeeg import entropy
from scipy.integrate import simps
from scipy import signal

# Calculate the Hjorth parameters

def calc_hjorth_features(x):
    """
    Calculates the Hjorth parameters for signal x

    inputs: x contains EEG data from a single channel

    output: Activity, Mobility, Complexity
    """
    Mobility, Complexity = hmc.hjorth(x)
    Activity = np.var(x)
    return Activity, Mobility, Complexity


def band_power(x, fs, low=0.5, high=4, win=1):
    """
    Calculate the power spectrum of a signal
    Input: x contains EEG data from a single channel
            fs sampling rate
            low, high are the frequency band limits
            win is the window length
    Output: Absolute and relative power in the [low,high] band

    Code is based on: https://raphaelvallat.com/bandpower.html
    """

    # Welch's averaged periodogram is based on the Fast Fourier Transform (FFT)
    freqs, psd = signal.welch(x, fs, nperseg=win*fs)
    idx_delta = np.logical_and(freqs >= low, freqs <= high)
    freq_res = freqs[1] - freqs[0]

    # Compute the absolute power by approximating the area under the curve
    abs_power = simps(psd[idx_delta], dx=freq_res)
    #print('Absolute delta power: %.3f uV^2' % abs_power)

    # Relative delta power (expressed as a percentage of total power)
    total_power = simps(psd, dx=freq_res)
    rel_power = abs_power / total_power
    #print('Relative delta power: %.3f' % rel_power)
    return abs_power, rel_power

In [None]:
# Creating the feature based matrices
def calc_features(recordings,y,class_type):
  """
  Creates a feature based training set from the training data

  Inputs: recordings are the raw eeg seegments
          y are the corresponding labels
          class_type is the kind of labeling that was used

  Output: X is an (n,5) array where each row contains the features for corresponding segment of recordings
  """
  sampling_freq = 32
  X = np.zeros((len(recordings),5))
  counter = 0
  for segment in recordings:
    hjort_features = np.apply_along_axis(calc_hjorth_features, arr=segment, axis=1)
    activity = hjort_features[:,0]
    mobility = hjort_features[:,1]
    complexity = hjort_features[:,2]
    band_power_1 = np.apply_along_axis(band_power, arr=segment, low=2, high=4, fs=sampling_freq, axis=1)
    band_power_2 = np.apply_along_axis(band_power, arr=segment, low=4, high=6, fs=sampling_freq, axis=1)
    X[counter] = [np.mean(activity), np.mean(mobility), np.mean(complexity), np.mean(band_power_1[:,0]), np.mean(band_power_2[:,0])]
    counter += 1
    if (counter % 1000 == 0):
      print(counter)
  with open('drive/MyDrive/Gervigreind/final-project/features-{}.pkl'.format(class_type), 'wb') as f:
    pickle.dump(X, f)
  with open('drive/MyDrive/Gervigreind/final-project/labels-{}.pkl'.format(class_type), 'wb') as f:
    pickle.dump(y, f)
  return X


In [None]:
def X_y_k_split(X,y, number_of_samples_per_rec,i, k):
  """
  Returns the i-th split for k-fold cross validation

  Inputs: X is the features array of the eeg segments
          y contains the labels for X
          number_of_samples_per_rec is the number of samples from each recording so we can split the data on a patient basis
          i is thee fold number
          k is the number of folds

  Output: X_train, X_test are the train and test sets
          y_train, y_test are the corresponding labels
  """
  np.random.seed(21) # we always get the same shuffle
  size = int(79/k)
  testing_indiced = np.random.randint(0, 79, size=79)[i*size:(i+1)*size]

  X_train = []
  X_test = []
  y_train = []
  y_test = []

  counter = 0
  for i in range(79):
    num = int(number_of_samples_per_rec[i])
    if i in testing_indiced:
      for j in range(num):
        X_test.append(X[counter+j,:])
        y_test.append(y[counter+j])
    else:
      for j in range(num):
        X_train.append(X[counter+j,:])
        y_train.append(y[counter+j])
    counter += num

  return X_train, X_test, y_train, y_test


In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import classification_report
from sklearn.metrics import roc_auc_score


def xgb_model(X,y,k,number_of_samples_per_rec):
  """
  Train and tests a XGBBoost classifier with k-fold cross validation

  Inputs: X and y are the features and labels matrices
          k is the number of folds
          number_of_samples_per_rec is the number of samples from each recording so we can split the data on a patient basis

  Outputs: The output is the mean of the following metrics over the random splits:
              precision
              recall
              f1-score
              support
              auc
  """
  precision = np.zeros((2,k))
  recall = np.zeros((2,k))
  f1 = np.zeros((2,k))
  support = np.zeros((2,k))
  auc = np.zeros(k)

  for i in range(k):
    X_train, X_test, y_train, y_test = X_y_k_split(X,y,number_of_samples_per_rec,i,k)

    num_positives = np.sum(y_train)
    num_negatives = len(y_train) - num_positives
    scale_pos_weight = num_negatives / num_positives

    # default values
    xgb_model = XGBClassifier(
        scale_pos_weight=scale_pos_weight,
    )

    xgb_model.fit(X_train, y_train)
    y_pred = xgb_model.predict(X_test)
    c_rep = classification_report(y_test, y_pred, output_dict=True)
    y_pred_proba = xgb_model.predict_proba(X_test)[:, 1]
    auc_score = roc_auc_score(y_test, y_pred_proba)

    precision[0,i] = c_rep['0']['precision']
    precision[1,i] = c_rep['1']['precision']
    recall[0,i] = c_rep['0']['recall']
    recall[1,i] = c_rep['1']['recall']
    f1[0,i] = c_rep['0']['f1-score']
    f1[1,i] = c_rep['1']['f1-score']
    support[0,i] = c_rep['0']['support']
    support[1,i] = c_rep['1']['support']
    auc[i] = auc_score

  return np.mean(precision, axis=1), np.mean(recall, axis=1), np.mean(f1, axis=1), np.mean(support, axis=1), np.mean(auc)


In [None]:
# Load features matrices
with open('drive/MyDrive/Gervigreind/final-project/features-consensus.pkl', 'rb') as input_file:
    X_consensus = pickle.load(input_file)
with open('drive/MyDrive/Gervigreind/final-project/features-majority.pkl', 'rb') as input_file:
    X_majority = pickle.load(input_file)
with open('drive/MyDrive/Gervigreind/final-project/features-contains.pkl', 'rb') as input_file:
    X_contains = pickle.load(input_file)

# Load labels
with open('drive/MyDrive/Gervigreind/final-project/labels-consensus.pkl', 'rb') as input_file:
    y_consensus = pickle.load(input_file)
with open('drive/MyDrive/Gervigreind/final-project/labels-majority.pkl', 'rb') as input_file:
    y_majority = pickle.load(input_file)
with open('drive/MyDrive/Gervigreind/final-project/labels-contains.pkl', 'rb') as input_file:
    y_contains = pickle.load(input_file)

# Load number of samples
with open('drive/MyDrive/Gervigreind/final-project/number-samples-consensus.pkl', 'rb') as input_file:
    number_of_samples_per_rec_cons = pickle.load(input_file)
with open('drive/MyDrive/Gervigreind/final-project/number-samples-majority.pkl', 'rb') as input_file:
    number_of_samples_per_rec_maj = pickle.load(input_file)
with open('drive/MyDrive/Gervigreind/final-project/number-samples-contains.pkl', 'rb') as input_file:
    number_of_samples_per_rec_cont = pickle.load(input_file)


In [None]:
# Train and fit the model for three kinds of labelings
# We do 10-fold cross validation
k = 10
mean_precision_cons, mean_recall_cons, mean_f1_cons, mean_support_cons, mean_auc_cons = xgb_model(X_consensus,y_consensus,k,number_of_samples_per_rec_cons)
mean_precision_maj, mean_recall_maj, mean_f1_maj, mean_support_maj, mean_auc_maj = xgb_model(X_majority,y_majority,k,number_of_samples_per_rec_maj)
mean_precision_cont, mean_recall_cont, mean_f1_cont, mean_support_cont, mean_auc_cont = xgb_model(X_contains,y_contains,k, number_of_samples_per_rec_cont)

In [None]:
# Print the results

print('Results for consensus labeling after {}-fold cross validation'.format(k))
print('          precision --- recall --- f1-score --- support')
print('Label 0:   ' + ' ' + str(mean_precision_cons[0])[0:5] + '       ' + str(mean_recall_cons[0])[0:5] + '       ' + str(mean_f1_cons[0])[0:5] + '        ' + str(mean_support_cons[0])[0:5])
print('Label 1:   ' + ' ' + str(mean_precision_cons[1])[0:5] + '       ' + str(mean_recall_cons[1])[0:5] + '       ' + str(mean_f1_cons[1])[0:5] + '        ' + str(mean_support_cons[1])[0:4])
print('AUC: {}\n'.format(mean_auc_cons))

print('Results for majority labeling after {}-fold cross validation'.format(k))
print('          precision --- recall --- f1-score --- support')
print('Label 0:   ' + ' ' + str(mean_precision_maj[0])[0:5] + '       ' + str(mean_recall_maj[0])[0:5] + '       ' + str(mean_f1_maj[0])[0:5] + '        ' + str(mean_support_maj[0])[0:5])
print('Label 1:   ' + ' ' + str(mean_precision_maj[1])[0:5] + '       ' + str(mean_recall_maj[1])[0:5] + '       ' + str(mean_f1_maj[1])[0:5] + '        ' + str(mean_support_maj[1])[0:4])
print('AUC: {}\n'.format(mean_auc_maj))

print('Results for contains labeling after {}-fold cross validation'.format(k))
print('          precision --- recall --- f1-score --- support')
print('Label 0:   ' + ' ' + str(mean_precision_cont[0])[0:5] + '       ' + str(mean_recall_cont[0])[0:5] + '       ' + str(mean_f1_cont[0])[0:5] + '        ' + str(mean_support_cont[0])[0:5])
print('Label 1:   ' + ' ' + str(mean_precision_cont[1])[0:5] + '       ' + str(mean_recall_cont[1])[0:5] + '       ' + str(mean_f1_cont[1])[0:5] + '        ' + str(mean_support_cont[1])[0:4])
print('AUC: {}\n'.format(mean_auc_cont))

Results for consensus labeling after 10-fold cross validation
          precision --- recall --- f1-score --- support
Label 0:    0.968       0.851       0.905        7104.
Label 1:    0.270       0.656       0.361        564.
AUC: 0.8290790430455368

Results for majority labeling after 10-fold cross validation
          precision --- recall --- f1-score --- support
Label 0:    0.961       0.817       0.882        7104.
Label 1:    0.284       0.680       0.392        711.
AUC: 0.8150817586111223

Results for contains labeling after 10-fold cross validation
          precision --- recall --- f1-score --- support
Label 0:    0.927       0.776       0.843        7104.
Label 1:    0.333       0.634       0.428        1160
AUC: 0.7615874038793173



# Neural Network hluti

In [None]:
import random

def eeg_split(recordings, y, number_of_samples_per_rec, random_seed):
  """
  Splits the raw eeg data and labels into training, validation and testing sets on a patient basis.

  Inputs: recordings is the raw eeg data
          y is the corresponding labels
          number_of_samples_per_rec is the number of samples from each recording so we can split the data on a patient basis

  Output: eeg_train, eeg_test, eeg_val are the train, test and validation sets
          y_train, y_test, y_val are the corresponding labels
  """
  np.random.seed(random_seed)
  testing_indiced = np.random.randint(0, 79, size=16)[:8]
  val_indiced = np.random.randint(0, 79, size=16)[8:]

  eeg_train = []
  eeg_test = []
  eeg_val = []
  y_train = []
  y_test = []
  y_val = []


  counter = 0
  for i in range(79):
    num = int(number_of_samples_per_rec[i])
    if i in testing_indiced:
      for j in range(num):
        eeg_test.append(recordings[counter+j])
        y_test.append(y[counter+j])
    elif i in val_indiced:
      for j in range(num):
        eeg_val.append(recordings[counter+j])
        y_val.append(y[counter+j])
    else:
      for j in range(num):
        eeg_train.append(recordings[counter+j])
        y_train.append(y[counter+j])
    counter += num
  return eeg_train, eeg_test, eeg_val, y_train, y_test, y_val

def downsample(eeg_train, y_train):
  """
  Downsamples the majority class of the training data

  Inputs: eeg_train is the training data
          y_train is the corresponding labels

  Output: eeg_downsampled is the downsampled training data
          y_downsampled is the corresponding labels
  """
  majority_indices = [i for i, label in enumerate(y_train) if label == 0]
  minority_indices = [i for i, label in enumerate(y_train) if label == 1]

  random.seed(21)
  downsampled_majority_indices = random.sample(majority_indices, len(minority_indices))

  selected_indices = downsampled_majority_indices + minority_indices

  random.shuffle(selected_indices)

  eeg_downsampled = [eeg_train[i] for i in selected_indices]
  y_downsampled = [y_train[i] for i in selected_indices]

  return eeg_downsampled, y_downsampled

Þetta downsamplar majority klasann í training settinu

In [None]:
from tensorflow.keras.utils import to_categorical

# Load the data
eeg_train, eeg_test, eeg_val, y_train, y_test, y_val = eeg_split(recordings_consensus, y_consensus, number_of_samples_per_rec_consensus,21)
eeg_downsampled, y_downsampled = downsample(eeg_train, y_train)

# Turn into (n_samples, n_channels, n_measurements) np array and labels to (n_samples,) np arrays
training_data = np.array(eeg_downsampled, dtype=np.float32)
validation_data = np.array(eeg_val, dtype=np.float32)
test_data = np.array(eeg_test, dtype=np.float32)
training_label = np.array(y_downsampled, dtype=np.int32)
validation_label = np.array(y_val, dtype=np.int32)
test_label = np.array(y_test, dtype=np.int32)

# We want the data on the format (n_samples,n_measurements, n_channels)
training_data = np.transpose(training_data, (0, 2, 1))
validation_data = np.transpose(validation_data, (0, 2, 1))
test_data = np.transpose(test_data, (0, 2, 1))

# Confirm the data shapes
print('Training data shape {}'.format(training_data.shape))
print('Training label shape {}'.format(training_label.shape))
print('Validation data shape {}'.format(validation_data.shape))
print('Validation label shape {}'.format(validation_label.shape))
print('Test data shape {}'.format(test_data.shape))
print('Test label shape {}'.format(test_label.shape))

Training data shape (13516, 512, 21)
Training label shape (13516,)
Validation data shape (10767, 512, 21)
Validation label shape (10767,)
Test data shape (9145, 512, 21)
Test label shape (9145,)


### Baseline 1d conv network

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers, callbacks


def ConvBlock(input_channels, output_channels, kernel_size):
  """
  Creates a ConvBlock.
  """
  return models.Sequential([
        layers.Conv1D(filters=output_channels, kernel_size=kernel_size, padding="same"),
        layers.BatchNormalization(),
        layers.ReLU()
    ])

In [None]:
def baseline_cnn_model():
  """
  Creates a baseline CNN model.
  """
  inputs = layers.Input(shape=(512,21))
  conv_block = ConvBlock(1, 32, 16)(inputs)  # ConvBlock1
  global_avg_pool = layers.GlobalAveragePooling1D()(conv_block)  # Global Average Pooling
  outputs = layers.Dense(1, activation="sigmoid")(global_avg_pool)

  model = models.Model(inputs=inputs, outputs=outputs)
  return model

early_stopping = callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
lr_reduction = callbacks.ReduceLROnPlateau(monitor='val_loss', patience=3, factor=0.5, min_lr=1e-6)

model = baseline_cnn_model()
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy","auc","recall"])

# Model summary
model.summary()

model.fit(training_data, training_label, validation_data=(validation_data, validation_label), epochs=10, batch_size=32, callbacks=[early_stopping, lr_reduction])


Epoch 1/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 59ms/step - accuracy: 0.7464 - auc: 0.8406 - loss: 0.5734 - recall: 0.8618 - val_accuracy: 0.9838 - val_auc: 0.9183 - val_loss: 0.3646 - val_recall: 0.7222 - learning_rate: 0.0010
Epoch 2/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 60ms/step - accuracy: 0.8575 - auc: 0.9322 - loss: 0.4063 - recall: 0.8110 - val_accuracy: 0.9837 - val_auc: 0.9332 - val_loss: 0.3142 - val_recall: 0.7685 - learning_rate: 0.0010
Epoch 3/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 55ms/step - accuracy: 0.8770 - auc: 0.9488 - loss: 0.3417 - recall: 0.8332 - val_accuracy: 0.9811 - val_auc: 0.9093 - val_loss: 0.2982 - val_recall: 0.7426 - learning_rate: 0.0010
Epoch 4/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 58ms/step - accuracy: 0.8875 - auc: 0.9562 - loss: 0.3111 - recall: 0.8507 - val_accuracy: 0.8567 - val_auc: 0.9368 - val_loss: 0.4432 - val_recall:

<keras.src.callbacks.history.History at 0x7fe2fe196a40>

In [None]:
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix

def model_eval(model, test_data, test_label):
    """
    Evaluates a model on test_data.

    Inputs:
        model: The model to evaluate.
        test_data: The data for evaluation.
        test_label: The true labels (not one-hot encoded).
    """
    predictions = model.predict(test_data)
    predicted_labels = (predictions > 0.5).astype(int)
    true_labels = test_label

    # Print the classification report
    report = classification_report(true_labels, predicted_labels, target_names=["Non seizure", "Seizure"])
    print(report)

    # Print the AUC
    auc_score = roc_auc_score(true_labels, predictions)
    print(f"Test AUC: {auc_score:.4f}\n")

    # Print the confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)
    print(f"Confusion matrix:\n{cm}")


In [None]:
model_eval(model, test_data, test_label)

### Simple 2-dimensional convnet

In [None]:
# Reshape the data for 2d cnn
training_data = training_data.reshape(training_data.shape[0],training_data.shape[1],training_data.shape[2],1)
validation_data = validation_data.reshape(validation_data.shape[0], validation_data.shape[1], validation_data.shape[2], 1)
test_data = test_data.reshape(test_data.shape[0],test_data.shape[1],test_data.shape[2],1)

print('Training data shape {}'.format(training_data.shape))
print('Training label shape {}'.format(training_label.shape))
print('Validation data shape {}'.format(validation_data.shape))
print('Validation label shape {}'.format(validation_label.shape))
print('Test data shape {}'.format(test_data.shape))
print('Test label shape {}'.format(test_label.shape))

Training data shape (13516, 512, 21, 1)
Training label shape (13516, 2)
Validation data shape (10767, 512, 21, 1)
Validation label shape (10767, 2)
Test data shape (9145, 512, 21, 1)
Test label shape (9145, 2)


In [None]:
# We create a 2d cnn similar to our baseline 1d cnn
input_shape = (512, 21, 1)

model_a = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=input_shape),
    layers.MaxPooling2D((2, 2), padding='same'),
    layers.Flatten(),
    layers.Dense(2, activation='softmax')
])

# Compile the model
model_a.compile(optimizer='adam',
                loss='categorical_crossentropy',
                metrics=['accuracy', 'auc'])

# Model summary
model_a.summary()

# Train the model
history_a = model_a.fit(training_data, training_label, epochs=10, batch_size=32,
                        validation_data=(validation_data, validation_label))
model_eval(model_a, test_data, test_label)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m126s[0m 291ms/step - accuracy: 0.6874 - auc: 0.7413 - loss: 0.7951 - val_accuracy: 0.8726 - val_auc: 0.9189 - val_loss: 0.4472
Epoch 2/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 292ms/step - accuracy: 0.8477 - auc: 0.9234 - loss: 0.3641 - val_accuracy: 0.9169 - val_auc: 0.9484 - val_loss: 0.3328
Epoch 3/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m143s[0m 339ms/step - accuracy: 0.8818 - auc: 0.9522 - loss: 0.2880 - val_accuracy: 0.9480 - val_auc: 0.9654 - val_loss: 0.2474
Epoch 4/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m179s[0m 285ms/step - accuracy: 0.9188 - auc: 0.9752 - loss: 0.2111 - val_accuracy: 0.7897 - val_auc: 0.8523 - val_loss: 0.5648
Epoch 5/10
[1m423/423[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m146s[0m 295ms/step - accuracy: 0.9215 - auc: 0.9770 - loss: 0.1988 - val_accuracy: 0.8965 - val_auc: 0.9366 - val_loss: 0.3820
Epoch 6/10

## Final CNN

In [None]:
from keras import regularizers
from sklearn.utils import class_weight
from sklearn.metrics import precision_recall_curve, roc_curve
import matplotlib.pyplot as plt

def improved_cnn_model():
    """
    Function for our improved CNN model.

    output: model is a keras cnn model
    """
    inputs = layers.Input(shape=(512, 21))

    x = layers.Conv1D(64, 16, padding='same', activation='relu',
                      kernel_regularizer=regularizers.l2(0.001))(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Conv1D(128, 8, padding='same', activation='relu',
                      kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Conv1D(256, 4, padding='same', activation='relu',
                      kernel_regularizer=regularizers.l2(0.001))(x)
    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling1D()(x)

    outputs = layers.Dense(1, activation='sigmoid')(x)

    model = models.Model(inputs, outputs)
    return model

model = improved_cnn_model()
optimizer = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer,
              loss='binary_crossentropy',
              metrics=['accuracy', 'AUC', 'Recall', 'Precision'])

early_stopping = callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
lr_reduction = callbacks.ReduceLROnPlateau(monitor='val_loss', patience=3, factor=0.5, min_lr=1e-6)

training_labels = training_label.argmax(axis=1)
validation_labels = validation_label.argmax(axis=1)
test_labels = test_label.argmax(axis=1)

# Adjust the weights for better recall
class_weights = {0: 1.0, 1: 3.0}

history = model.fit(
    training_data, training_labels,
    validation_data=(validation_data, validation_labels),
    epochs=50,
    batch_size=64,
    class_weight=class_weights,
    callbacks=[early_stopping, lr_reduction],
    verbose=1
)

y_test_probs = model.predict(test_data).ravel()
y_val_probs = model.predict(validation_data).ravel()
y_val_true = validation_labels
precision, recall, thresholds = precision_recall_curve(y_val_true, y_val_probs)

# Plot precision and recall against thresholds
plt.figure(figsize=(10, 6))
plt.plot(thresholds, precision[:-1], label='Precision', linewidth=2)
plt.plot(thresholds, recall[:-1], label='Recall', linewidth=2)
plt.xlabel('Threshold', fontsize=14)
plt.ylabel('Score', fontsize=14)
plt.title('Precision and Recall vs Threshold', fontsize=16)
plt.legend(fontsize=12)
plt.grid(True)
plt.show()

desired_recall = 0.90
recall_diff = recall - desired_recall
threshold_indices = np.where(recall_diff >= 0)[0]
if len(threshold_indices) > 0:
    threshold_index = threshold_indices[-1]
else:
    threshold_index = np.argmin(np.abs(recall_diff))
optimal_threshold = thresholds[threshold_index]
print(f"Optimal Threshold for Recall {desired_recall}: {optimal_threshold:.4f}")

y_test_pred = (y_test_probs >= optimal_threshold).astype(int)

# Evaluate performance
print("\nClassification Report on Test Set:")
print(classification_report(test_labels, y_test_pred, digits=4))
print("Confusion Matrix:")
print(confusion_matrix(test_labels, y_test_pred))
print(f"AUC: {roc_auc_score(test_labels, y_test_probs):.4f}")


fpr, tpr, roc_thresholds = roc_curve(test_labels, y_test_probs)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'AUC = {roc_auc_score(test_labels, y_test_probs):.4f}', linewidth=2)
plt.plot([0, 1], [0, 1], 'k--', linewidth=2)
plt.xlabel('False Positive Rate', fontsize=14)
plt.ylabel('True Positive Rate', fontsize=14)
plt.title('ROC Curve', fontsize=16)
plt.legend(fontsize=12)
plt.grid(True)
plt.show()
