# CWT preprocessing

Loading pretrained model, apply feature engineering to test eegs, and predict on local test set.

In [4]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
import sys
import os
import pywt
import time


# ----------------------------------------
# Flags for working on different hardware.
flag_kaggle = True
# flag_FW = True
# flag_LN = True

try:
    if flag_kaggle:
        sys.path.insert(0, '/kaggle/input/hms-lib')
        base_dir = '/kaggle/input/hms-harmful-brain-activity-classification'
except:
    pass 

try:
    if flag_FW:
        sys.path.insert(0, '../lib')
        base_dir = '../../kaggle_data/hms'
        devset_dir = '../data'
except:
    pass 

try:
    if flag_LN:
        sys.path.insert(0, '../lib')
        base_dir = '../../data/hms'
        devset_dir = '../data'
except:
    pass 
# ----------------------------------------


from lib_banana import banana
from lib_pooling import poolingOverlap


test_path = '/kaggle/input/hms-toy-test-eegs'

test_files = os.listdir(test_path)
test_size = len(test_files)
test_size


2024-03-10 12:53:32.184100: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Preprocessing test set

In [None]:
scales = np.arange(1,50)
waveletname = 'morl'
n_channels = 5
dim1 = scales.shape[0]
pool_window = 5
dim2 = int(2000/pool_window)
sampling_period = 1
# Center 10 s adjusted by pooling window.
start2 = int(4000/pool_window)
end2 = int(6000/pool_window)

sgrams = np.empty((test_size, dim1, dim2, n_channels))
# item: [eeg_id, eeg_sub_id, idx in sgrams (1st index), target,
#       seizure_vote, lpd_vote, gpd_vote, lrda_vote,
#       grda_vote, other_vote]
items = np.array([], dtype=float).reshape(0,10)

In [None]:
t1 = time.perf_counter()

for i, file in enumerate(test_files):
    eeg_full = pd.read_parquet(f'{test_path}/{file}')
    # eeg_full = eeg_full.interpolate(limit_direction='both') # <<<<< Interpolation
    eeg = banana(eeg_full, filter=False)

        # Averaging each chain in the banana montage.

    # Left temporal chain.
    coeff = np.zeros((dim1, 10000))
    # coeff = np.zeros((dim1, 6000))  # keeping 30 s to reduce runtime.
    for col in [0,1,2,3]:
        coeff_, freq = pywt.cwt(eeg.iloc[:,col], scales, waveletname, sampling_period=sampling_period)
        coeff = coeff + coeff_

    coeff = coeff/4
    coeff = poolingOverlap(coeff,(1,pool_window),stride=None,method='mean',pad=False)
    coeff = (coeff - np.mean(coeff)) / np.std(coeff)
    sgrams[i,:,:,0] = coeff[:,start2:end2].copy()

    # Right temporal chain.
    coeff = np.zeros((dim1, 10000))
    for col in [4,5,6,7]:
        coeff_, freq = pywt.cwt(eeg.iloc[:,col], scales, waveletname, sampling_period=sampling_period)
        coeff = coeff + coeff_

    coeff = coeff/4
    coeff = poolingOverlap(coeff,(1,pool_window),stride=None,method='mean',pad=False)
    coeff = (coeff - np.mean(coeff)) / np.std(coeff)
    sgrams[i,:,:,1] = coeff[:,start2:end2].copy()

    # Left parasagittal chain.
    coeff = np.zeros((dim1, 10000))
    for col in [8,9,10,11]:
        coeff_, freq = pywt.cwt(eeg.iloc[:,col], scales, waveletname, sampling_period=sampling_period)
        coeff = coeff + coeff_

    coeff = coeff/4
    coeff = poolingOverlap(coeff,(1,pool_window),stride=None,method='mean',pad=False)
    coeff = (coeff - np.mean(coeff)) / np.std(coeff)
    sgrams[i,:,:,2] = coeff[:,start2:end2].copy()

    # Right parasagittal chain.
    coeff = np.zeros((dim1, 10000))
    for col in [12,13,14,15]:
        coeff_, freq = pywt.cwt(eeg.iloc[:,col], scales, waveletname, sampling_period=sampling_period)
        coeff = coeff + coeff_

    coeff = coeff/4
    coeff = poolingOverlap(coeff,(1,pool_window),stride=None,method='mean',pad=False)
    coeff = (coeff - np.mean(coeff)) / np.std(coeff)
    sgrams[i,:,:,3] = coeff[:,start2:end2].copy()

    # Central chain.
    coeff = np.zeros((dim1, 10000))
    for col in [16,17]:
        coeff_, freq = pywt.cwt(eeg.iloc[:,col], scales, waveletname, sampling_period=sampling_period)
        coeff = coeff + coeff_

    coeff = coeff/2
    coeff = poolingOverlap(coeff,(1,pool_window),stride=None,method='mean',pad=False)
    coeff = (coeff - np.mean(coeff)) / np.std(coeff)
    sgrams[i,:,:,4] = coeff[:,start2:end2].copy()

    eeg_id = int(test_files[2].split('.')[0])
    # Set unkowns to zero, to reuse code.
    xitem = np.array([eeg_id, 0, i, 0, 0, 0, 0,
                      0, 0, 0], dtype=float).reshape(1,10)
    items = np.concatenate([items, xitem])

t2 = time.perf_counter()
print(f'Time for preprocessing {test_size} files: {np.round(t2-t1,3)} s.')

In [None]:
t1 = time.perf_counter()

TARGETS = ['seizure_vote', 'lpd_vote', 'gpd_vote', 'lrda_vote', 'grda_vote', 'other_vote']

#
# Test Data generator
#
# for predictions in Kaggle.
# 

class TestDataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, items, data, batch_size=32, n_classes=6, shuffle=False):
        ''' Initialization
        items: [eeg_id, eeg_sub_id, idx of offset, target, ...]
        '''
        self.n_channels = 5
        self.data = data
        self.items = items
        self.dim = (self.data.shape[1], self.data.shape[2])
        self.batch_size = batch_size
        self.len = self.data.shape[0]
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.ceil(self.len / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Generate data
        X = self.__data_generation(indexes)

        return X

    def get_dim(self):
        'Dimensions for the input layer.'
        return (self.dim[0], self.dim[1], self.n_channels)

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(self.len)
        # pass 
        
    def __data_generation(self, indexes):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        true_size = len(indexes)
        X = np.empty((true_size, *self.dim, self.n_channels))

        # Generate data
        for i, idx in enumerate(indexes):
            item = self.items[idx]
            # print(item)  # Uncomment for testing.
            X[i,:,:,:] = self.data[np.int32(item[2]), :, :, :]

        return X

params = {
    'batch_size': 32,
    'n_classes': 6,
    }

test_generator = TestDataGenerator(items, sgrams, **params)


In [None]:
model_file = '/kaggle/input/hms-model-cwt-v1/model_cwt_031001_057.keras'
checkpoint_filepath = '/kaggle/input/hms-model-cwt-v1/checkpoint2.model.keras'
loaded_model = keras.models.load_model(model_file)
loaded_model.load_weights(checkpoint_filepath)
y_pred = loaded_model.predict(test_generator)

In [None]:
t2 = time.perf_counter()
print(f'Time for predicting {test_size} files: {np.round(t2-t1,3)} s.')