In [None]:
import tensorflow as tf
from tensorflow.keras.models import load_model
import sys
sys.path.insert(0,'../')

import flammkuchen as fl
import os
from neural_networks.src.dataloader import DataLoader
from utils.params import Params
import matplotlib.pyplot as plt
import sounddevice as sd
import numpy as np
import seaborn as sns
from skimage.metrics import structural_similarity as ssim

%load_ext autoreload
%autoreload 2
%matplotlib inline

## Specify paths

In [None]:
PATH_DATA = r"../../Data/coughvid_database/"
PATH_GA_MODEL_NSA = r"../neural_networks/GeneticAlgorithm/Results_GA/GA_20211115-105541_objective2/best_models_per_generation/gen17_Fitness2.4656.hdf5"

PATH_PARAMS = r"../neural_networks/params.json"

### Load Data

In [None]:
params = Params(PATH_PARAMS)

data_loader = DataLoader(params=params, nb_classes=4)
coughs_coughDB, coughs_signals = data_loader.get_cough_database_data(PATH_DATA, n_samples=5000)

## Load AE model and convert Audio to NSA spectrograms

In [None]:
model_ae = load_model("../neural_networks/autoencoder_audio2nsa.hdf5")

coughs_coughDB_ae_nsa = model_ae.predict(coughs_coughDB)

In [None]:
n = 8

plt.title("CoughDB original Audio")
plt.imshow(coughs_coughDB[n])
plt.show()
plt.title("CoughDB generated NSA")
plt.imshow(coughs_coughDB_ae_nsa[n])
plt.show()

## Load NSA model and predict generated NSA data

In [None]:
def plot_predictions(predictions):
    fig, ax = plt.subplots(2, 2)
    ax = ax.flatten()
    
    events = ['Cough', 'Dry swallow', 'Throat clear', 'No event']
    for i, event in enumerate(events): 
        ax[i].hist(predictions[..., i], bins=5, histtype='barstacked', density=True);
        ax[i].set_xlim([0, 1])
        ax[i].set_title(event)
        
    plt.tight_layout()
    plt.show()
    
    print(f"Total number of samples: {len(predictions)}")
    counts = np.zeros(4)
    for pred in predictions:
        counts[np.where(np.max(pred)==pred)] += 1
        
    
    for count, event in zip(counts, events):
        print(f"{event}: {int(count)}")

In [None]:
model_ga = load_model(PATH_GA_MODEL_NSA, custom_objects={'leaky_relu': tf.nn.leaky_relu, 'relu6': tf.nn.relu6})

In [None]:
model_ga.summary()

In [None]:
# Predict
predictions_pre_fine_tuning = model_ga.predict(coughs_coughDB_ae_nsa)
plot_predictions(predictions_pre_fine_tuning)

## Transfer Learning (Fine tuning) the model

In [None]:
path = 'experiments_coughDB/'
if not os.path.isdir(path):
    os.makedirs(path)

N = [50, 100, 200, 500, 1000]
for n in N:
    os.makedirs(path + str(n), exist_ok=True)
    
    model_ga = load_model(PATH_GA_MODEL_NSA)
    fine_tuned_model = model_ga
    
    # freeze layers
    for layer in fine_tuned_model.layers[:-1]:
        layer.trainable = False   
        
    # define fine tuning training data
    X_fine_tuning = coughs_coughDB_ae_nsa[:n]  
    Y_fine_tuning = np.array([[1, 0, 0, 0] for _ in range(len(X_fine_tuning))])
    
    mc = tf.keras.callbacks.ModelCheckpoint(path + str(n) + "/model_{epoch:02d}.h5", verbose=True)
    
    # train the last unfreezed layer for some epochs
    fine_tuned_model.compile(optimizer=tf.keras.optimizers.Adam(0.001), loss=tf.keras.losses.CategoricalCrossentropy(), metrics=["accuracy"])
    history_1 = fine_tuned_model.fit(X_fine_tuning, Y_fine_tuning, shuffle=True, epochs=10, callbacks=[mc], verbose=False)
    
    # unfreeze all layers
    for layer in fine_tuned_model.layers:
        layer.trainable = True
        
    # train the whole network again
    history_2 = fine_tuned_model.fit(X_fine_tuning, Y_fine_tuning, shuffle=True, epochs=75, callbacks=[mc], initial_epoch=10, verbose=False)

In [None]:
params.signal_type = "NSA"
data_loader_nsa = DataLoader(params=params, nb_classes=4)
#(X_train_old, Y_train_old), (X_val_old, Y_val_old) = data_loader.get_train_val_data()
X_test_old, Y_test_old = data_loader_nsa.get_test_data()

In [None]:
all_test_accs_old = []
all_test_accs_coughDB = []

N = [50]
for n in N:
    test_accs_old = []
    test_accs_coughDB = []
    for file in os.listdir(path + str(n)):
        if "h5" in file:
            print(file)
            model = load_model(path + str(n) + "/" + file, custom_objects={'leaky_relu': tf.nn.leaky_relu, 'relu6': tf.nn.relu6})
            
            _, test_acc_old = model.evaluate(X_test_old[..., None], Y_test_old)
            _, test_acc_coughDB = model.evaluate(coughs_coughDB_ae_nsa[n::], np.array([[1, 0, 0, 0] for _ in range(len(coughs_coughDB_ae_nsa[n::]))]))
            test_accs_old.append(test_acc_old)
            test_accs_coughDB.append(test_acc_coughDB)
            
    all_test_accs_old.append(test_accs_old)
    all_test_accs_coughDB.append(test_accs_coughDB)
            
    plt.plot(test_accs_coughDB, label="coughDB")
    plt.plot(test_accs_old, label="Rainbow passage")
    plt.xlabel("Epochs")
    plt.ylabel("Test Accuracy")
    plt.legend()
    plt.title(str(n))
    plt.show()

In [None]:
n = 50
best_fine_tuned_model = load_model("experiments_coughDB/" + str(n) + "/model_48.h5", custom_objects={'leaky_relu': tf.nn.leaky_relu, 'relu6': tf.nn.relu6})

predictions_post_fine_tuning = best_fine_tuned_model.predict(coughs_coughDB_ae_nsa[n::])
plot_predictions(predictions_post_fine_tuning)

## Save GA model performance results on the COUGHVID dataset

In [None]:
d = dict(predictions_pre_fine_tuning=predictions_pre_fine_tuning, 
         predictions_post_fine_tuning=predictions_post_fine_tuning, 
         test_accs_old=all_test_accs_old, 
         test_accs_coughDB=all_test_accs_coughDB)

fl.save("results_fine_tuning_coughDB.vfp", d)