# Set base configurations

In [1]:
seed_value= 42

import random
random.seed(seed_value)

import numpy as np
np.random.seed(seed_value)

import tensorflow as tf
tf.random.set_seed(seed_value)
tf.keras.utils.set_random_seed(seed_value)

In [2]:
import os

os.environ["CUDA_VISIBLE_DEVICES"]="0"
print(tf.__version__)
print(tf.config.list_physical_devices('GPU'))

2.9.0
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


# Import

In [3]:
from tensorflow.keras.layers import Input, Dense, Dropout, GlobalAveragePooling2D, Conv2D, Flatten, UpSampling2D
from tensorflow.keras.layers import MaxPooling2D, AveragePooling2D, BatchNormalization, Conv2DTranspose, concatenate
from tensorflow.keras.layers import Conv1D, MaxPooling1D, GRU, Add, ReLU
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import regularizers, optimizers, losses
from livelossplot.inputs.tf_keras import PlotLossesCallback

# Initialization

In [4]:
num_epochs  = 200
batch_size  = 32
cls_num     = 8
shape       = (1000, 13)
lr          = 0.0001
opt         = optimizers.Adam(learning_rate=lr)
los         = losses.BinaryCrossentropy()
mtr         = ['accuracy']

In [5]:
address = '../Dataset/Main-1000'

files = os.listdir(address)
random.shuffle(files)
l = len(files)

data_train = files[:int(0.8*l)]
data_test = files[int(0.8*l):int(0.9*l)]
data_val = files[int(0.9*l):]

steps_per_train = len(data_train)//batch_size
steps_per_test = len(data_test)//batch_size
steps_per_val = len(data_val)//batch_size

# Data generator

In [6]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, address, data, shape, batch_size, cls_num, shuffle=True):
        self.address    = address
        self.data       = data
        self.shape      = shape
        self.batch_size = batch_size
        self.cls_num    = cls_num
        self.shuffle    = shuffle

        self.on_epoch_end()


    def __len__(self):
        return int(np.floor(len(self.data) / self.batch_size))


    def __getitem__(self, index):
        indexes       = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        list_IDs_temp = [self.data[k] for k in indexes]
        x, y          = self.__data_generation(list_IDs_temp)
        return x, y


    def on_epoch_end(self):
        self.indexes = np.arange(len(self.data))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)        
        

    def __data_generation(self, list_IDs_temp):
        x = np.empty((self.batch_size, int(self.shape[0]), int(self.shape[1])))
        y = np.empty((self.batch_size), dtype=int)

        for i, ID in enumerate(list_IDs_temp):
            raw_data = np.load(os.path.join(self.address, ID))
            sig = np.transpose(raw_data['signals'])
            
            for idx, row in enumerate(sig):
                x[i, :, idx] = (row - row.min()) / (row.max() - row.min())
                
            y[i]  = int(str(raw_data['label'])[0])

        return x, to_categorical(y, num_classes=self.cls_num)


train_gen = DataGenerator(address, data_train, shape, batch_size, cls_num)
test_gen  = DataGenerator(address, data_test , shape, batch_size, cls_num)
val_gen   = DataGenerator(address, data_val  , shape, batch_size, cls_num)

In [7]:
test_sample = next(iter(test_gen))
x_sample = test_sample[0]
y_sample = test_sample[1]

print('Label: ', y_sample[0])
print('Signal max: ', x_sample[1].max())
print('Signal min: ', x_sample[1].min())
print('Single signal shape: ', x_sample[1][:, 0].shape)

Label:  [0. 0. 0. 0. 0. 1. 0. 0.]
Signal max:  1.0
Signal min:  0.0
Single signal shape:  (1000,)


# Model

In [8]:
def get_model(myshape, cls_num):
  inputs = Input(shape=myshape)
  
  x = Conv1D(filters=128, kernel_size=8, strides=1, activation='relu')(inputs)
  x = MaxPooling1D(2)(x)
  x = BatchNormalization()(x)
  x = Dropout(0.2)(x)
  
  x = Conv1D(filters=128, kernel_size=8, strides=1, activation='relu')(x)
  x = MaxPooling1D(2)(x)
  x = BatchNormalization()(x)
  x = Dropout(0.2)(x)
  
  temporal_1 = GRU(128, return_sequences=True)(x)
  temporal_1 = GRU(64, return_sequences=False)(temporal_1)
  
  temporal_2 = GRU(64, return_sequences=False)(x)
  
  adding = Add()([temporal_1, temporal_2])
  adding = ReLU()(adding)
  adding = Dropout(0.2)(adding)
  adding = Flatten()(adding)
  
  outputs = Dense(cls_num, activation='softmax')(adding)

  return Model(inputs=inputs, outputs=outputs)

model = get_model(shape, cls_num)
model.compile(optimizer=opt, loss=los, metrics=mtr)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 1000, 13)]   0           []                               
                                                                                                  
 conv1d (Conv1D)                (None, 993, 128)     13440       ['input_1[0][0]']                
                                                                                                  
 max_pooling1d (MaxPooling1D)   (None, 496, 128)     0           ['conv1d[0][0]']                 
                                                                                                  
 batch_normalization (BatchNorm  (None, 496, 128)    512         ['max_pooling1d[0][0]']          
 alization)                                                                                   

# Train

In [9]:
def callback():
  mymonitor = 'val_loss'
  mymode    = 'min'

  main_chk  = ModelCheckpoint(filepath='checkpoints/base', monitor=mymonitor, mode=mymode, verbose=1, save_best_only=True)
  early_st  = EarlyStopping(monitor=mymonitor, mode=mymode, patience=10, verbose=1)
  rduce_lr  = ReduceLROnPlateau(monitor=mymonitor, mode=mymode, factor=0.5, patience=5, verbose=1, min_lr=0.00001)
  tr_plot   = PlotLossesCallback()

  return [main_chk, early_st, rduce_lr, tr_plot]

history = model.fit(train_gen,
                    validation_data=val_gen,
                    batch_size=batch_size,
                    epochs=num_epochs,
                    verbose=1,
                    callbacks=callback(),
                    steps_per_epoch  = steps_per_train,
                    validation_steps = steps_per_val)

accuracy
	training         	 (min:    0.122, max:    0.915, cur:    0.914)
	validation       	 (min:    0.116, max:    0.728, cur:    0.696)
Loss
	training         	 (min:    0.076, max:    0.585, cur:    0.077)
	validation       	 (min:    0.156, max:    0.629, cur:    0.169)
lr
	lr               	 (min:    0.000, max:    0.000, cur:    0.000)
Epoch 118: early stopping


# Evaluation

In [10]:
testmodel = load_model('checkpoints/base', compile=True)
tst_loss , tst_acc = testmodel.evaluate(test_gen, steps = steps_per_test)

