In [1]:
# Importing the libraries and data files

import numpy as np
import matplotlib.pyplot as plt
import random
from sklearn.preprocessing import StandardScaler

import tensorflow as tf

import tensorflow.data as tf_data
import tensorflow.strings as tf_strings
from sklearn.metrics import accuracy_score
from tqdm import tqdm


import keras
import keras_nlp
from keras import layers
#from keras import ops
from keras.layers import TextVectorization
import tensorflow.keras.backend as K

from tensorflow.python.keras import metrics
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report


Using TensorFlow backend


In [2]:
eeg_non_seizure_data = np.load('./eeg_data_non_pre_seizure.npy')
eeg_pre_seizure_data = np.load('./eeg_data_pre_seizure_chl60_pad_60.npy')
eeg_pre_seizure_data_zero_padding = np.load('./eeg_data_pre_seizure_chl60_pad_0.npy')
signal_labels = ['EEG Fp1-REF', 'EEG Fp2-REF', 'EEG F3-REF', 'EEG F4-REF', 'EEG C3-REF', 'EEG C4-REF', 'EEG P3-REF', 'EEG P4-REF', 'EEG O1-REF', 'EEG O2-REF', 'EEG F7-REF', 'EEG F8-REF', 'EEG T3-REF', 'EEG T4-REF', 'EEG T5-REF', 'EEG T6-REF', 'EEG Fz-REF', 'EEG Cz-REF', 'EEG Pz-REF', 'ECG EKG-REF', 'Resp Effort-REF']

In [3]:
def vizualize_eeg_channels(signal,offset=0.0005):
    # Set the number of channels and the offset between each channel
    np.random.seed(42)
    n_channels = len(signal)
    offset = 0.0005
    
    # Set the left and right margins for the window in which we want to plot the data at
    
    
    # Create the plot
    fig, ax = plt.subplots(figsize=(20, 10))
    
    # Plot each signal with an offset
    for i in range(n_channels-2): # Remove the EKG and Effor
      ax.plot(signal[i] + i * offset, label=signal_labels[i])
    
    # Set the y-axis labels to the signal labels
    ax.set_yticks(np.arange(n_channels-2) * offset)
    ax.set_yticklabels(signal_labels[:-2])
    
    # Set the x-axis label
    ax.set_xlabel("Time")
    
    # Add a legend
    #plt.legend()
    
    # Show the plot
    plt.show()

# Pairwise data curation

In [4]:
X_eeg_pre_seizure_data=eeg_pre_seizure_data[:,:19]

In [5]:
X_eeg_non_seizure_data=eeg_non_seizure_data[:232,:19]

In [6]:
def get_second_wise_stats_patient_channel(X,patient):
    patient_embedding=[]
    for channel in range(19):
        mean=np.mean(np.array(np.split(X[patient][channel],np.arange(int(X.shape[2]/256))*256)[1:]), axis=1)
        std=np.mean(np.array(np.split(X[patient][channel],np.arange(int(X.shape[2]/256))*256)[1:]), axis=1)
        min=np.min(np.array(np.split(X[patient][channel],np.arange(int(X.shape[2]/256))*256)[1:]), axis=1)
        max=np.max(np.array(np.split(X[patient][channel],np.arange(int(X.shape[2]/256))*256)[1:]), axis=1)
        median=np.median(np.array(np.split(X[patient][channel],np.arange(int(X.shape[2]/256))*256)[1:]), axis=1)
        info=np.array([mean,std,min,max,median])
        patient_embedding.append(info)
    patient_embedding=np.array(patient_embedding)
    return np.concatenate(patient_embedding)

In [7]:
X_reduced_len_eeg_pre_seizure_data=[]
for i in tqdm(range(len(X_eeg_pre_seizure_data))):
    X_reduced_len_eeg_pre_seizure_data.append(get_second_wise_stats_patient_channel(X_eeg_pre_seizure_data,i))
X_reduced_len_eeg_pre_seizure_data=np.array(X_reduced_len_eeg_pre_seizure_data)

100%|██████████| 232/232 [00:01<00:00, 136.51it/s]


In [8]:
X_reduced_len_eeg_non_seizure_data=[]
for i in tqdm(range(len(X_eeg_non_seizure_data))):
    X_reduced_len_eeg_non_seizure_data.append(get_second_wise_stats_patient_channel(X_eeg_non_seizure_data,i))
X_reduced_len_eeg_non_seizure_data=np.array(X_reduced_len_eeg_non_seizure_data)

100%|██████████| 232/232 [00:01<00:00, 135.22it/s]


In [9]:
X_reduced_len_eeg_pre_seizure_data.shape

(232, 95, 59)

In [10]:
X_reduced_len_eeg_non_seizure_data.shape

(232, 95, 59)

In [11]:
def scale_eeg(X):
    return np.array([np.array([(X[index][channel]-np.min(X[index][channel]))/(np.max(X[index][channel])-np.min(X[index][channel])+0.000001) for channel in range(X.shape[1])]) for index in range(len(X))])

In [12]:
X_reduced_len_eeg_pre_seizure_data=scale_eeg(X_reduced_len_eeg_pre_seizure_data)

In [13]:
X_reduced_len_eeg_non_seizure_data=scale_eeg(X_reduced_len_eeg_non_seizure_data)

In [14]:
X_reduced_len_eeg_pre_seizure_data=np.moveaxis(X_reduced_len_eeg_pre_seizure_data,-1,-2)

In [15]:
X_reduced_len_eeg_non_seizure_data=np.moveaxis(X_reduced_len_eeg_non_seizure_data,-1,-2)

In [16]:
print(X_reduced_len_eeg_pre_seizure_data.shape)
print(X_reduced_len_eeg_non_seizure_data.shape)

(232, 59, 95)
(232, 59, 95)


In [17]:
def unison_shuffled_copies(a, b):
    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p]

In [18]:
def get_pairs(num_pairs_per_class,class0_data,class1_data):

    data=[]
    labels=[]
    
    for i in range(num_pairs_per_class):
        get_class=np.random.randint(2)
        if get_class==0:
            sample_pair=class0_data[np.random.choice(len(class0_data),2)]
            data.append(sample_pair)
            labels.append(np.array([0,1]))
        else:
            sample_pair=class1_data[np.random.choice(len(class1_data),2)]
            data.append(sample_pair)
            labels.append(np.array([0,1]))
            
    for i in range(num_pairs_per_class):
       class0_sample=class0_data[np.random.choice(len(class0_data))]
       class1_sample=class1_data[np.random.choice(len(class1_data))]
       sample_pair=np.array([class0_sample,class1_sample])
       data.append(sample_pair)
       labels.append(np.array([1,0]))

    data,labels=unison_shuffled_copies(np.array(data), np.array(labels))

    return data,labels

In [19]:
X,y=get_pairs(10000,X_reduced_len_eeg_pre_seizure_data,X_reduced_len_eeg_non_seizure_data)

In [20]:
X.shape

(20000, 2, 59, 95)

In [21]:
y.shape

(20000, 2)

In [22]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True, test_size=0.1)

In [23]:
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

(18000, 2, 59, 95) (18000, 2)
(2000, 2, 59, 95) (2000, 2)


In [24]:
#import keras.ops as ops
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:,tf.newaxis, tf.newaxis, :], dtype='int32')
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "dense_dim": self.dense_dim,
                "num_heads": self.num_heads,
            }
        )
        return config


class Token_and_PositionalEmbedding(layers.Layer):
    #def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
    def __init__(self, sequence_length, **kwargs):
        super().__init__(**kwargs)
       
        #self.position_embeddings = layers.Embedding(
        #    input_dim=sequence_length, output_dim=embed_dim
        #)

        self.position_embeddings = keras_nlp.layers.SinePositionEncoding(
           sequence_length, **kwargs
        )

        self.sequence_length = sequence_length
        #self.vocab_size = vocab_size
        #self.embed_dim = embed_dim

    def call(self, inputs):
    
        positional_encoding = self.position_embeddings(inputs)

        return positional_encoding

    def compute_mask(self, inputs, mask=None):
        if mask is None:
            return None
        else:
            return tf.math.not_equal(inputs,0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config


In [25]:
def weighted_binary_crossentropy(target, output, weights):
    target = tf.convert_to_tensor(target)
    output = tf.convert_to_tensor(output)
    weights = tf.convert_to_tensor(weights, dtype=target.dtype)

    epsilon_ = tf.constant(tf.keras.backend.epsilon(), output.dtype.base_dtype)
    output = tf.clip_by_value(output, epsilon_, 1.0 - epsilon_)

    # Compute cross-entropy from probabilities.
    bce = weights[1] * target * tf.math.log(output + epsilon_)
    bce += weights[0] * (1 - target) * tf.math.log(1 - output + epsilon_)
    return -bce

class WeightedBinaryCrossentropy:
    def __init__(
        self,
        label_smoothing=0.0,
        weights = [1.0, 1.0],
        axis=-1,
        name="weighted_binary_crossentropy",
        fn = None,
    ):
        """Initializes `WeightedBinaryCrossentropy` instance.
        Args:
          from_logits: Whether to interpret `y_pred` as a tensor of
            [logit](https://en.wikipedia.org/wiki/Logit) values. By default, we
            assume that `y_pred` contains probabilities (i.e., values in [0,
            1]).
          label_smoothing: Float in [0, 1]. When 0, no smoothing occurs. When >
            0, we compute the loss between the predicted labels and a smoothed
            version of the true labels, where the smoothing squeezes the labels
            towards 0.5.  Larger values of `label_smoothing` correspond to
            heavier smoothing.
          axis: The axis along which to compute crossentropy (the features
            axis).  Defaults to -1.
          name: Name for the op. Defaults to 'weighted_binary_crossentropy'.
        """
        super().__init__()
        self.weights = weights # tf.convert_to_tensor(weights)
        self.label_smoothing = label_smoothing
        self.name = name
        self.fn = weighted_binary_crossentropy if fn is None else fn

    def __call__(self, y_true, y_pred):
        y_pred = tf.convert_to_tensor(y_pred)
        y_true = tf.cast(y_true, y_pred.dtype)
        self.label_smoothing = tf.convert_to_tensor(self.label_smoothing, dtype=y_pred.dtype)

        def _smooth_labels():
            return y_true * (1.0 - self.label_smoothing) + 0.5 * self.label_smoothing

        y_true = tf.__internal__.smart_cond.smart_cond(self.label_smoothing, _smooth_labels, lambda: y_true)

        return tf.reduce_mean(self.fn(y_true, y_pred, self.weights),axis=-1)

    def get_config(self):
        config = {"name": self.name, "weights": self.weights, "fn": self.fn}

        # base_config = super().get_config()
        return dict(list(config.items()))

    @classmethod
    def from_config(cls, config):
        """Instantiates a `Loss` from its config (output of `get_config()`).
        Args:
            config: Output of `get_config()`.
        """
        if saving_lib.saving_v3_enabled():
            fn_name = config.pop("fn", None)
            if fn_name:
                config["fn"] = get(fn_name)
        return cls(**config)

In [26]:
embed_dim = X.shape[3]
latent_dim = 1024
num_heads = 6
signal_len = X.shape[2]
max_hamiltonian_len = 2
hamiltonian_cycle = y_train
def get_model():
    signal_p1 = keras.Input(shape=(signal_len,embed_dim), dtype="float64")
    signal_p2 = keras.Input(shape=(signal_len,embed_dim), dtype="float64")

    pos_emb_1_p1= Token_and_PositionalEmbedding(signal_len)(signal_p1)
    pos_emb_1_p2= Token_and_PositionalEmbedding(signal_len)(signal_p2)

    signal_p1=tf.keras.layers.Add()([signal_p1, pos_emb_1_p1])
    signal_p2=tf.keras.layers.Add()([signal_p2, pos_emb_1_p2])

    signal_lstm_p1 = layers.LSTM(units=256, return_sequences=True, activation="tanh")(signal_p1)
    signal_lstm_p2 = layers.LSTM(units=256, return_sequences=True, activation="tanh")(signal_p2)

    pos_emb_2_p1= Token_and_PositionalEmbedding(signal_len)(signal_lstm_p1)
    pos_emb_2_p2= Token_and_PositionalEmbedding(signal_len)(signal_lstm_p2)
    
    signal_pos_p1=tf.keras.layers.Add()([signal_lstm_p1, pos_emb_2_p1])
    signal_pos_p2=tf.keras.layers.Add()([signal_lstm_p2, pos_emb_2_p2])
    
    x_att_p1= keras.layers.MultiHeadAttention(num_heads,embed_dim)(query=signal_pos_p1,value=signal_pos_p1,key=signal_pos_p1)
    x_att_p2= keras.layers.MultiHeadAttention(num_heads,embed_dim)(query=signal_pos_p2,value=signal_pos_p2,key=signal_pos_p2)

    x_res1_p1=tf.keras.layers.Add()([x_att_p1, signal_pos_p1])
    x_ln1_p1=layers.LayerNormalization()(x_res1_p1)

    x_res1_p2=tf.keras.layers.Add()([x_att_p2, signal_pos_p2])
    x_ln1_p2=layers.LayerNormalization()(x_res1_p2)
    
    x_2_p1 = layers.Dense(latent_dim, activation="relu")(x_ln1_p1)
    x_2_p1=layers.Dense(embed_dim)(x_2_p1)

    x_2_p2 = layers.Dense(latent_dim, activation="relu")(x_ln1_p2)
    x_2_p2=layers.Dense(embed_dim)(x_2_p2)

    x_res2_p1=tf.keras.layers.Add()([x_att_p1, x_ln1_p1])
    x_ln2_p1=layers.LayerNormalization()(x_res2_p1)

    x_res2_p2=tf.keras.layers.Add()([x_att_p2, x_ln1_p2])
    x_ln2_p2=layers.LayerNormalization()(x_res2_p2)
    
    x_3_p1 = keras.layers.Flatten()(x_ln2_p1)
    x_3_p2 = keras.layers.Flatten()(x_ln2_p2)

    concat=keras.layers.Concatenate()([x_3_p1,x_3_p2])
    
    output = keras.layers.Dense(2, activation='sigmoid')(concat)
    model = keras.Model(inputs=[signal_p1,signal_p2], outputs=output)
    return model

transformer_model = get_model()

In [27]:
model=get_model()

In [28]:
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_7 (InputLayer)           [(None, 59, 95)]     0           []                               
                                                                                                  
 input_8 (InputLayer)           [(None, 59, 95)]     0           []                               
                                                                                                  
 lstm_2 (LSTM)                  (None, 59, 256)      360448      ['input_7[0][0]']                
                                                                                                  
 lstm_3 (LSTM)                  (None, 59, 256)      360448      ['input_8[0][0]']                
                                                                                            

In [29]:
from keras.callbacks import Callback
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
class Metrics(Callback):
    def on_train_begin(self, logs={}):
        self.val_f1s = []
        self.val_recalls = []
        self.val_precisions = []
     
    def on_epoch_end(self, epoch, logs={}):
        val_predict = (np.asarray(self.model.predict(self.model.validation_data[0]))).round()
        val_targ = self.model.validation_data[1]
        _val_f1 = f1_score(val_targ, val_predict)
        _val_recall = recall_score(val_targ, val_predict)
        _val_precision = precision_score(val_targ, val_predict)
        self.val_f1s.append(_val_f1)
        self.val_recalls.append(_val_recall)
        self.val_precisions.append(_val_precision)
        print ( "— val_f1: %f — val_precision: %f — val_recall %f" %(_val_f1, _val_precision, _val_recall))
        return
 
metrics = Metrics()

In [30]:
@tf.autograph.experimental.do_not_convert
def f1_score(y_true, y_pred):
    def recall(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = true_positives / (possible_positives + K.epsilon())
        return recall

    def precision(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision

    precision = precision(y_true, y_pred)
    recall = recall(y_true, y_pred)
    f1 = 2 * ((precision * recall) / (precision + recall + K.epsilon()))
    return f1


In [31]:
epochs = 50 # This should be at least 30 for convergence
wbce = WeightedBinaryCrossentropy(weights = [1.0, 1.0])

#model.compile(
#    keras.optimizers.Adam(learning_rate=0.000001), loss=wbce, metrics=['accuracy']
#)

model.compile(
    keras.optimizers.Adam(learning_rate=0.0001), loss="binary_crossentropy", metrics=['accuracy']
)

#model.fit(X_train, y_train, validation_data=(val_data,val_target), epochs= epochs, callbacks=[metrics])

model.fit([X_train[:,0],X_train[:,1]], y_train, epochs= epochs, shuffle=True, validation_split=0.05)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x19506823c10>

In [35]:
y_pred=model.predict([X_test[:,0],X_test[:,1]])



In [36]:
from sklearn.metrics import average_precision_score
average_precision_score(y_test, y_pred)

0.9999397490829787

In [37]:
from sklearn.metrics import confusion_matrix
confusion_matrix(y_test[:,1],1*(y_pred[:,1]>0.5))

array([[1010,    4],
       [   1,  985]], dtype=int64)