In [None]:
import numpy as np
import random
import logging
import tensorflow as tf

logging.basicConfig(level=logging.INFO)

from scripts.constants import RANDOM_SEED
random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

In [None]:
import logging
import os
import h5py
import optuna
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dropout, Dense, Input, Bidirectional

In [None]:
# CUDA test
logging.info(f"TF GPU device list: {tf.config.list_physical_devices('GPU')}")

In [None]:
TYPE = 'cross'

In [None]:
if TYPE == 'cross':
    cross_hdf5_file_path = os.path.join('..', 'data', 'processed', 'cross.h5')
    with h5py.File(cross_hdf5_file_path, 'r') as file:
        cross_train_1d = file['train/data_1d'][:]
        cross_train_mesh = file['train/meshes'][:]
        cross_train_label = file['train/labels'][:]
        
        cross_test1_1d = file['test1/data_1d'][:]
        cross_test1_mesh = file['test1/meshes'][:]
        cross_test1_label = file['test1/labels'][:]
        print(cross_test1_1d.shape)
        
        cross_test2_1d = file['test2/data_1d'][:]
        cross_test2_mesh = file['test2/meshes'][:]
        cross_test2_label = file['test2/labels'][:]
        print(cross_test2_1d.shape)
        
        cross_test3_1d = file['test3/data_1d'][:]
        cross_test3_mesh = file['test3/meshes'][:]
        cross_test3_label = file['test3/labels'][:]
        print(cross_test3_1d.shape)
        
    intra_hdf5_file_path = os.path.join('..', 'data', 'processed', 'intra.h5')
    with h5py.File(intra_hdf5_file_path, 'r') as file:
        intra_combi_1d = np.concatenate([file['train/data_1d'][:], file['val/data_1d'][:], file['test/data_1d'][:]], axis=0)
        intra_combi_mesh = np.concatenate([file['train/meshes'][:], file['val/meshes'][:], file['test/meshes'][:]], axis=0)
        intra_combi_label = np.concatenate([file['train/labels'][:], file['val/labels'][:], file['test/labels'][:]], axis=0)
        
    X_train = cross_train_1d 
    Y_train = cross_train_label 
    
    X_val = intra_combi_1d
    Y_val = intra_combi_label
    
    X_test = np.concatenate([cross_test1_1d, cross_test2_1d, cross_test3_1d], axis=0)
    Y_test = np.concatenate([cross_test1_label, cross_test2_label, cross_test3_label], axis=0)
    
elif TYPE == 'intra':
    intra_hdf5_file_path = os.path.join('..', 'data', 'processed', 'intra.h5')
    with h5py.File(intra_hdf5_file_path, 'r') as file:
        intra_train_1d = file['train/data_1d'][:]
        intra_train_mesh = file['train/meshes'][:]
        intra_train_label = file['train/labels'][:]
        
        intra_val_1d = file['val/data_1d'][:]
        intra_val_mesh = file['val/meshes'][:]
        intra_val_label = file['val/labels'][:]
        
        intra_test_1d = file['test/data_1d'][:]
        intra_test_mesh = file['test/meshes'][:]
        intra_test_label = file['test/labels'][:]
        
    X_train = intra_train_1d
    Y_train = intra_train_label
    
    X_val= intra_val_1d
    Y_val= intra_val_label
    
    X_test = intra_test_1d
    Y_test = intra_test_label
else:
    raise Exception('Invalid type')

In [None]:
class BiLSTM:
    def __init__(self, window_size, lstm1_cells, lstm2_cells, output_dense1_nodes, output_dense1_activation, depth,
                 output_dropout_ratio):

        self.number_classes = 4
        self.num_sensors = 248

        self.window_size = window_size

        self.lstm1_cells = lstm1_cells
        self.lstm2_cells = lstm2_cells

        self.output_dense1_nodes = output_dense1_nodes
        self.output_dense1_activation = output_dense1_activation
        self.output_dropout_ratio = output_dropout_ratio

        self.model = self.get_model()

    def get_model(self):
        # Input
        input_layer = Input(shape=(self.window_size, self.num_sensors), name="input_sequence")
        
        # Bi-LSTM
        lstm1 = Bidirectional(LSTM(self.lstm1_cells, return_sequences=True, name="lstm1"))(input_layer)
        lstm2 = Bidirectional(LSTM(self.lstm2_cells, return_sequences=False, name="lstm2"))(lstm1)
        
        # Output
        output_dense1 = Dense(self.output_dense1_nodes, activation=self.output_dense1_activation, name="output_dense1")(lstm2)
        output_dropout = Dropout(self.output_dropout_ratio, name="output_dropout")(output_dense1)
        output_dense2 = Dense(self.number_classes, activation="softmax", name="output_dense2")(output_dropout)
    
        model = Model(inputs=input_layer, outputs=output_dense2)
        return model

In [None]:
# Locked
locked_params = {
    'window_size': 32,
    'depth': 1
}                   

In [None]:
#study = f'tuning_bilstm_type_{TYPE}'.lower()
study = f'tuning_bilstm_type_cross'.lower()
study_instance = f'{study}_intra_val_TEST'
db_url = f'postgresql://postgres:029602@localhost:5432/{study}'
study = optuna.create_study(study_name=study_instance, storage=db_url, load_if_exists=True, direction='minimize')

In [None]:
print("Best trial:")
trial = study.best_trial
best_params = trial.params
best_user_atrr = trial.user_attrs
print(f"  Best loss: {trial.value}")
print(f"  Best params: {best_params}")
print(f"\n  Best user attrs: {best_user_atrr}")

In [None]:
ignored_keys = ['learning_rate', 'decay', 'batch_size'] 
filtered_params = {k: best_params[k] for k in best_params.keys() if k not in ignored_keys}

bilstm_object = BiLSTM(**filtered_params, **locked_params)

bilstm_model = bilstm_object.model
bilstm_model.summary()

In [None]:
window_size = locked_params['window_size']
X_train = np.moveaxis(X_train,-1,1)
X_train = np.expand_dims(X_train, -1)
X_val = np.moveaxis(X_val,-1,1)
X_val = np.expand_dims(X_val, -1)
X_test = np.moveaxis(X_test,-1,1)
X_test = np.expand_dims(X_test, -1)

In [None]:
print(f"{X_train.shape = }")
print(f"{Y_train.shape = }")
print(f"{X_val.shape = }")
print(f"{Y_val.shape = }")
print(f"{X_test.shape = }")
print(f"{Y_test.shape = }")

In [None]:
from tensorflow.keras.metrics import Precision, Recall
from tensorflow_addons.metrics import F1Score  # deprecated but not yet in conda tf version

batch_size = best_params['batch_size']

NUM_LOOPS = 2
histories = []
models = []
escbs = []

for i in range(NUM_LOOPS):
    bilstm_object = BiLSTM(**filtered_params, **locked_params)
    bilstm_model = bilstm_object.model
    
    F1 = F1Score(average='macro', num_classes=4)
    P = Precision(name='precision')
    R = Recall(name='recall')
    metrics=["accuracy", P, R, F1]

    escb = EarlyStopping(monitor='val_loss', mode='min', patience=3, restore_best_weights=True, verbose=True)
    bilstm_model.compile(optimizer = Adam(learning_rate=best_params['learning_rate'], decay=best_params['decay']), loss="categorical_crossentropy", metrics=metrics)

    history = bilstm_model.fit(
        X_train, 
        Y_train,
        batch_size=batch_size,  
        epochs=50, 
        callbacks=escb,
        validation_data=(X_val, Y_val),
        shuffle=True,
    )
    escbs.append(escb)
    histories.append(history)
    models.append(bilstm_model)

In [None]:
best_epoch_accuracy, best_epoch_precision, best_epoch_recall, best_epoch_f1, best_epoch_loss = [], [], [], [], []
train_best_epoch_accuracy, train_best_epoch_precision, train_best_epoch_recall, train_best_epoch_f1, train_best_epoch_loss = [], [], [], [], []

for history, escb in zip(histories, escbs):
    best_epoch = escb.best_epoch

    best_epoch_accuracy.append(history.history['val_accuracy'][best_epoch])
    best_epoch_precision.append(history.history['val_precision'][best_epoch])
    best_epoch_recall.append(history.history['val_recall'][best_epoch])
    best_epoch_f1.append(history.history['val_f1_score'][best_epoch])  
    best_epoch_loss.append(history.history['val_loss'][best_epoch])

    train_best_epoch_accuracy.append(history.history['accuracy'][best_epoch])
    train_best_epoch_precision.append(history.history['precision'][best_epoch])
    train_best_epoch_recall.append(history.history['recall'][best_epoch])
    train_best_epoch_f1.append(history.history['f1_score'][best_epoch]) 
    train_best_epoch_loss.append(history.history['loss'][best_epoch])

final_avg_accuracy = np.mean(best_epoch_accuracy)
final_avg_precision = np.mean(best_epoch_precision)
final_avg_recall = np.mean(best_epoch_recall)
final_avg_f1 = np.mean(best_epoch_f1)
final_avg_loss = np.mean(best_epoch_loss)

train_final_avg_accuracy = np.mean(train_best_epoch_accuracy)
train_final_avg_precision = np.mean(train_best_epoch_precision)
train_final_avg_recall = np.mean(train_best_epoch_recall)
train_final_avg_f1 = np.mean(train_best_epoch_f1)
train_final_avg_loss = np.mean(train_best_epoch_loss)

print("Validation Metrics at Best Epochs:")
print(f"Average Validation Loss: {round(final_avg_loss, 4)}")
print(f"Average Validation Accuracy: {round(final_avg_accuracy, 4)}")
print(f"Average Validation Precision: {round(final_avg_precision, 4)}")
print(f"Average Validation Recall: {round(final_avg_recall, 4)}")
print(f"Average Validation F1 Score: {round(final_avg_f1, 4)}\n")

print("Training Metrics at Best Epochs:")
print(f"Average Training Loss: {round(train_final_avg_loss, 4)}")
print(f"Average Training Accuracy: {round(train_final_avg_accuracy, 4)}")
print(f"Average Training Precision: {round(train_final_avg_precision, 4)}")
print(f"Average Training Recall: {round(train_final_avg_recall, 4)}")
print(f"Average Training F1 Score: {round(train_final_avg_f1, 4)}")

In [None]:
for i, history in enumerate(histories):
    print(f"HISTORY {i}")\
    # Accuracy history
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])   
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()
    
    # Loss history
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()

In [None]:
test1_results = []
test2_results = []
test3_results = []
intra_results = []

for model in models:
    if TYPE == 'cross':
        test_size = cross_test1_1d.shape[0]
        test1_results.append(model.evaluate(
            X_test[:test_size,:], 
            Y_test[:test_size,:], 
            batch_size=batch_size,  
        ))
        test2_results.append(model.evaluate(
            X_test[test_size:test_size*2,:], 
            Y_test[test_size:test_size*2,:], 
            batch_size=batch_size,  
        ))
        test3_results.append(model.evaluate(
            X_test[test_size*2:,:], 
            Y_test[test_size*2:,:], 
            batch_size=batch_size,  
        ))
    elif TYPE == 'intra':
        intra_results.append(model.evaluate(
            X_test, 
            Y_test,
            batch_size=batch_size,  
        ))

In [None]:
if TYPE == 'cross':
    for i, test_results in enumerate([test1_results, test2_results, test3_results]):
        print('\ntest result: ', i+1)
        print(f'     ', bilstm_model.metrics_names)
        print(f'mean: {np.mean(test_results, axis=0)}')
        print(f'stdv: {np.std(test_results, axis=0)}')
        print(f'max:  {np.max(test_results, axis=0)}')
        print(f'min:  {np.min(test_results, axis=0)}')
else:
    print(f'     ', bilstm_model.metrics_names)
    print(f'mean: {np.mean(intra_results, axis=0)}')
    print(f'stdv: {np.std(intra_results, axis=0)}')
    print(f'max:  {np.max(intra_results, axis=0)}')
    print(f'min:  {np.min(intra_results, axis=0)}')