In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import os 
import numpy as np
import tensorflow as tf
import keras
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Lambda
from tensorflow.keras.applications import EfficientNetB2
from tensorflow.keras.layers import Permute
from tensorflow.keras.layers import Layer
import keras_cv

2024-05-10 08:21:56.007499: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-10 08:21:56.007675: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-10 08:21:56.167575: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Montages

In [2]:
NAMES = ['LL', 'LP', 'RL', 'RP', 'C']

IMP_FEATS = ['Fp1', 'F3', 'C3', 'P3', 'F7', 'T3',
             'T5','O1', 'Fp2', 'F4', 'C4', 'P4',
             'F8', 'T4', 'T6', 'O2', 'Fz', 'Cz', 'Pz']

IMP_FEATS_index = {y:x for x,y in enumerate(IMP_FEATS)}

BANANA = [['Fp1','F7','T3','T5','O1'],
         ['Fp1','F3','C3','P3','O1'],
         ['Fp2','F8','T4','T6','O2'],
         ['Fp2','F4','C4','P4','O2']]

CENTER = ['Fz', 'Cz', 'Pz']

# Data Loader

In [3]:
TARS = {'Seizure':0, 'LPD':1, 'GPD':2, 'LRDA':3, 'GRDA':4, 'Other':5}
TARS2 = {x:y for y,x in TARS.items()}
file_path = '/kaggle/input/hms-harmful-brain-activity-classification/test_eegs/'
class DataGenerator(tf.keras.utils.Sequence):
    'Generate data for keras'
    def __init__(self, data, batch_size = 32, shuffle = False, mode='train', IMP_FEATS = IMP_FEATS ,
                 file_path = file_path, BANANA = BANANA, CENTER = CENTER):

        self.data = data
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.mode = mode
        self.BANANA = BANANA
        self.CENTER = CENTER
        self.IMP_FEATS = IMP_FEATS
        if isinstance(file_path, str):
            self.file_path = file_path
        else: print('file_path must be a path to the training file signals in parquet format')
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        ct = int(np.ceil(len(self.data) / self.batch_size))
        return ct

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.data))
        if self.shuffle == True: np.random.shuffle(self.indexes)

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        X1, y = self.__data_generation(indexes)
        return X1, y


    def __data_generation(self, indexes):
        'Generates data containing batch_size samples'

        X1 = np.zeros((len(indexes), 10000, 18, 1), dtype='float32')
        y = np.zeros((len(indexes),6),dtype='float32')

        for j,i in enumerate(indexes):

            row = self.data.iloc[i]
            eeg = pd.read_parquet(f'{self.file_path}{row.eeg_id}.parquet', columns = self.IMP_FEATS)
            middle = (len(eeg)-10_000)//2
            eeg = eeg.iloc[middle:middle+10_000]
            count = 0
            # Calculate BANANA Features
            for k in range(4):
                COLS = self.BANANA[k]
                coeff = []
                for kk in range(4):
                    
                    # Calculate the differences
                    x = eeg[COLS[kk]].values - eeg[COLS[kk+1]].values
                    
                    # Standardize
                    x = np.clip(x, -1024, 1024)
                    x = np.nan_to_num(x, nan = 0) / 32.0
                    
                    # Add dimension to make it compatible with cwt layer input
                    X1[j,:,k + kk + count, :] = np.expand_dims(x, axis = -1)
                    
                count += 3
            # Calculate CENTER Features
            for m in range(2):
                z = eeg[self.CENTER[m]].values - eeg[self.CENTER[m + 1]].values
                
                z = np.clip(z, -1024, 1024)
                z = np.nan_to_num(z, nan = 0) / 32.0
                
                X1[j,:,-1 - m, :] = np.expand_dims(z, axis = -1)
                
            if self.mode != 'test':
                y[j,] = row[TARGETS]

        return X1, y

# CWT Layer

In [4]:
%cd /kaggle/input/cmorlet-tensorflow-3/cmorlet-tensorflow-3/CWT

/kaggle/input/cmorlet-tensorflow-3/cmorlet-tensorflow-3/CWT


In [5]:
import cwt

# Magnitude Layer

In [6]:
@keras.saving.register_keras_serializable()
class Magnitude_of_Complex_CWT(Layer):

    """
    Custom Keras layer to compute the magnitude of complex Tensor obtained from CWT.
    """
    def __init__(self, magnitude = True, **kwargs):
        super().__init__(**kwargs)
        self.magnitude = magnitude
        
    def call(self, inputs):
        if self.magnitude:
            real_part = inputs[:, :, :, 0]
            imag_part = inputs[:, :, :, 1]
            result = tf.math.sqrt(tf.math.square(real_part) + tf.math.square(imag_part))
            result = tf.expand_dims(result, axis=-1)
            return result
        else:
            print("Layer is not activated!")
            
    def get_config(self):
        return {"magnitude": self.magnitude}

# Scale Layer

In [7]:
@keras.saving.register_keras_serializable()
class scale_to_255(Layer):
    """
    Layer to Normalize the Image between values 0 and 255
    """
    def __init__(self, scale = True, **kwargs):
        super().__init__(**kwargs)
        self.scale = scale

    def call(self, x):
        if self.scale:
            min_val = tf.reduce_min(x)
            max_val = tf.reduce_max(x)
            return 255.0 * (x - min_val) / (max_val - min_val + 1e-7)
        else:
            print("layer is not acitvated!")
            
    def get_config(self):
        return {"scale": self.scale}

# Initialize the 3 custom Layers for Training

## cwt

In [8]:
cwt_layer = cwt.ComplexMorletCWT(
    wavelet_width=7,  # Example wavelet width (adjust as needed)
    fs=200,            # Example sampling frequency (adjust as needed)
    stride=16,
    border_crop=1,
    lower_freq=0.5,       # Example lower frequency (adjust as needed)
    upper_freq=40,      # Example upper frequency (adjust as needed)
    n_scales=40,        # Example number of scales (adjust as needed)
    trainable=False     # Example setting for wavelet width trainability (adjust as needed)
)

## Scale and Magnitude

In [9]:
# Initialize Instance for Magnitude_of_Complex_CWT Layer
mos = Magnitude_of_Complex_CWT()
# Initialize Instance for scale_to_255 Layer
scale = scale_to_255()

# Load the Model

In [10]:
eff_net_model = keras_cv.models.ImageClassifier.from_preset("efficientnetv2_b2_imagenet", num_classes = 6)

Attaching 'config.json' from model 'keras/efficientnetv2/keras/efficientnetv2_b2_imagenet/2' to your Kaggle notebook...
Attaching 'config.json' from model 'keras/efficientnetv2/keras/efficientnetv2_b2_imagenet/2' to your Kaggle notebook...
Attaching 'model.weights.h5' from model 'keras/efficientnetv2/keras/efficientnetv2_b2_imagenet/2' to your Kaggle notebook...


In [11]:
def build_model_MK():
    # Addiding the Last Dimension for compatibility with cwt_layer input shape which represent the channel axis
    inp = tf.keras.Input(shape = (10000, 18, 1))
    
    scalogram_0 = cwt_layer(inp[:,:,0])
    scalogram_0 = mos(scalogram_0)
    scalogram_1 = cwt_layer(inp[:,:,1])
    scalogram_1 = mos(scalogram_1)
    scalogram_2 = cwt_layer(inp[:,:,2])
    scalogram_2 = mos(scalogram_2)
    scalogram_3 = cwt_layer(inp[:,:,3])
    scalogram_3 = mos(scalogram_3)
    scalogram_4 = cwt_layer(inp[:,:,4])
    scalogram_4 = mos(scalogram_4)
    scalogram_5 = cwt_layer(inp[:,:,5])
    scalogram_5 = mos(scalogram_5)
    scalogram_6 = cwt_layer(inp[:,:,6])
    scalogram_6 = mos(scalogram_6)
    scalogram_7 = cwt_layer(inp[:,:,7])
    scalogram_7 = mos(scalogram_7)
    scalogram_8 = cwt_layer(inp[:,:,8])
    scalogram_8 = mos(scalogram_8)
    scalogram_9 = cwt_layer(inp[:,:,9])
    scalogram_9 = mos(scalogram_9)
    scalogram_10 = cwt_layer(inp[:,:,10])
    scalogram_10 = mos(scalogram_10)
    scalogram_11 = cwt_layer(inp[:,:,11])
    scalogram_11 = mos(scalogram_11)
    scalogram_12 = cwt_layer(inp[:,:,12])
    scalogram_12 = mos(scalogram_12)
    scalogram_13 = cwt_layer(inp[:,:,13])
    scalogram_13 = mos(scalogram_13)
    scalogram_14 = cwt_layer(inp[:,:,14])
    scalogram_14 = mos(scalogram_14)
    scalogram_15 = cwt_layer(inp[:,:,15])
    scalogram_15 = mos(scalogram_15)
    scalogram_16 = cwt_layer(inp[:,:,16])
    scalogram_16 = mos(scalogram_16)
    scalogram_17 = cwt_layer(inp[:,:,17])
    scalogram_17 = mos(scalogram_17)
    x1 = tf.keras.layers.Concatenate(axis = 2)([scalogram_0, scalogram_1,scalogram_2, scalogram_3,
                                              scalogram_4, scalogram_5, scalogram_6, scalogram_7,
                                              scalogram_8, scalogram_9, scalogram_10, scalogram_11,
                                              scalogram_12, scalogram_13, scalogram_14, scalogram_15,
                                               scalogram_16, scalogram_17])
    
    x2 = Permute((2,1,3))(x1)
    x3 = tf.keras.layers.Resizing(height=512, width=512, interpolation='nearest')(x2)
    x4 = scale(x3)
    x5 = tf.keras.layers.Concatenate(axis=3)([x4,x4,x4])
    
    out = eff_net_model(x5)
    
    final_model = Model(inputs = inp, outputs = out)
    
    opt = tf.keras.optimizers.Adam(learning_rate=1e-3)
    loss = tf.keras.losses.KLDivergence()
    final_model.compile(loss=loss, optimizer = opt)
    
    return final_model

In [12]:
model = build_model_MK()
model.load_weights("/kaggle/input/effnet-keras-hms-models/Eff_Net_v10_f1.keras")

  trackable.load_own_variables(weights_store.get(inner_path))


# Make Prediction on Test Data

## Read Test Data

In [13]:
# Reading Testing data
test = pd.read_csv('/kaggle/input/hms-harmful-brain-activity-classification/test.csv')
print('Test shape:',test.shape)
test.head()

Test shape: (1, 3)


Unnamed: 0,spectrogram_id,eeg_id,patient_id
0,853520,3911565283,6885


## Generate Test data 

In [14]:
test_gen = DataGenerator(test, shuffle = False, batch_size = 64, mode = 'test')

## Predictions

In [15]:
pred = model.predict(test_gen, verbose=1)

  self._warn_if_super_not_called()


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 5s/step


# Submit to CSV file

In [16]:
TARGETS = ['seizure_vote', 'lpd_vote', 'gpd_vote', 'lrda_vote', 'grda_vote', 'other_vote']
sub = pd.DataFrame({'eeg_id':test.eeg_id.values})
sub[TARGETS] = pred
sub.to_csv('/kaggle/working/submission.csv',index=False)
print('Submissionn shape',sub.shape)
sub.head()

Submissionn shape (1, 7)


Unnamed: 0,eeg_id,seizure_vote,lpd_vote,gpd_vote,lrda_vote,grda_vote,other_vote
0,3911565283,0.77015,0.005387,0.006731,0.006325,0.011388,0.200018
