In [None]:
# tensorflow version 2.10
# cudnn version 8.2.0.51
# python version 3.8

import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))
from tensorflow.keras import Model
from keras.regularizers import l2
from tensorflow.keras.layers import Conv3D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import MaxPool3D
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import TimeDistributed
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.applications import InceptionV3
import os
from video_data_generator import *
from keras import backend as K
import matplotlib.pyplot as plt
import pandas as pd

os.environ["CUDA_VISIBLE_DEVICES"]="0,1"
os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

In [None]:
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

def scheduler(epoch, lr):
    if epoch%10:
        return lr * tf.math.exp(-0.1)
    else:
        return lr

In [None]:
test_dir = '../../seizure_dataset/360p_splited/test'
train_dir = '../../seizure_dataset/360p_splited/train'

width = 112
height = 112
channel = 3
batch_size = 32
length = 16
classes = os.listdir(train_dir)

input_shape = [length, height, width, 3]

train_gen = video_generator(train_dir, input_shape, batch_size)
test_gen = video_generator(test_dir, input_shape, batch_size)

# in order to aovid RAM leaking, try only use a fixed dataset for testing
test_gen = video_generator(test_dir, input_shape, 64)
test_data = next(test_gen)

In [None]:
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
    weight_decay = 0.005
    
    input_ = Input(shape=input_shape)
    base_cnn_model = InceptionV3(include_top=False)

    temporal_analysis = TimeDistributed(base_cnn_model)(input_)
    conv3d_analysis = Conv3D(256, (3, 3, 3), padding = "same",
                            kernel_regularizer=l2(weight_decay))(temporal_analysis)
    conv3d_analysis = Conv3D(256, (3, 3, 3), padding = "same",
                            kernel_regularizer=l2(weight_decay))(conv3d_analysis)
    pooling = MaxPool3D((2, 2, 2), strides=(2, 2, 2), padding='same')(conv3d_analysis)
    flat = Flatten()(pooling)
    
    dense= Dense(512, activation="relu",
                   kernel_regularizer=l2(weight_decay))(flat)
    drop = Dropout(0.5)(dense)
    output = Dense(len(classes), activation="softmax",
                   kernel_regularizer=l2(weight_decay))(drop)

    model = Model(input_, output)
    model.summary()

    # set the inceptionV3 model not trainable
    model.layers[1].trainable = False
    
    lr = 3e-4
    opt = tf.keras.optimizers.Adam(learning_rate=lr)
    model.compile(loss="categorical_crossentropy", optimizer=opt, 
                  metrics=[tf.keras.metrics.CategoricalAccuracy(),
                           f1_m, precision_m, recall_m]
                  )
    call = [tf.keras.callbacks.LearningRateScheduler(scheduler, verbose=1),
           tf.keras.callbacks.ModelCheckpoint('models/best_model.h5',monitor= "val_f1_m",
                                              save_best_only = True,mode = "max"),]

In [None]:
history = model.fit(train_gen, epochs=100, validation_data=test_data,
                    steps_per_epoch=200,validation_freq=1,
                    workers=20, use_multiprocessing=True,
                    max_queue_size=200,
                    callbacks = call)

In [None]:
model.save('models/IncepV3_Conv3D_oct_26.h5', )

fig = plt.figure(figsize=(15, 15))
plt.subplot(221)
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')

plt.subplot(222)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')

plt.subplot(223)
plt.plot(history.history['f1_m'])
plt.plot(history.history['val_f1_m'])
plt.title('model f-1 score')
plt.ylabel('f-1 score')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')

plt.subplot(224)
plt.plot(history.history['precision_m'])
plt.plot(history.history['val_precision_m'])

plt.title('model precision/recall')
plt.ylabel('precision/recall')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
# fig.savefig("Training_loss.png")


## Validation for model performance

In [None]:
# calculate average accuracy 
from tqdm import tqdm
test_gen = video_generator(test_dir, input_shape, 64)

all_supports = []
pred = []
true = []

for j in tqdm(range(5)):
    # print('step:', j+1)
    temp = next(test_gen)
    res = model.predict(temp[0])

    support = 0
    #print(len(res))
    for i in range(len(res)):
        pred.append(res[i].argmax(axis=0))
        true.append(temp[1][i].argmax(axis=0))
        if res[i].argmax(axis=0) == temp[1][i].argmax(axis=0):
            support += 1
    # print('{}/{}'.format(support, 64))
    all_supports.append(support/64)

In [None]:
from sklearn.metrics import precision_recall_fscore_support

precision_recall_fscore_support(true, pred, average='micro')

In [None]:
wrong = 0
for i in range(len(true)):
    if true[i] == pred[i]:
        offset = 0
        plt.scatter(true[i]+offset, pred[i]+offset, 
                    color='green', marker='o')
    else:
        offset = np.random.uniform(-0.1, 0.1)
        plt.scatter(true[i]+offset, pred[i]+offset, 
                    color='red', marker='x')
        wrong += 1
plt.title('Wrong pred: {}/{} ({}%)'.format(wrong, len(pred), 100*wrong/len(pred)))
plt.show()