In [None]:
import numpy as np
import matplotlib.pyplot as plt
from src.data_util import load_augmented_example
from src.evaluation import DAP_SAP_MAP_kde, hist_AP, plot_SAP_MAP

import tensorflow as tf

from sklearn.model_selection import train_test_split
import glob
import json
import os

import seaborn as sns
import pandas as pd

from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

## Load training and validation data

![](images/hist_pig_samples_aug_v2.png)

The model is tuned using the augmented prepared data (hatched orange/blue) and the prepared original data (blue) and is validated using the validation data (green).

In [None]:
pigs = ["P_{0:02d}_PulHyp".format(i) for i in range(1, 11)]
print(pigs)

load_path = "/data/PulHypStudie_Check_npz_v2_SNR20/"

In [None]:
X_train, y_train, clrs_pig_train = load_augmented_example(
    load_path, pigs, sample_skip=500, load_samples="upwards"
)

In [None]:
X_valid, y_valid, clrs_pig_valid = load_augmented_example(
    load_path, pigs, sample_skip=500, load_samples="downwards"
)

In [None]:
print(
    X_train.shape,
    X_valid.shape,
    y_train.shape,
    y_valid.shape,
    clrs_pig_train.shape,
    clrs_pig_valid.shape,
)

**PCA**

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(
    X_valid.reshape(X_valid.shape[0], X_valid.shape[1] * X_valid.shape[2])
)

pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

In [None]:
pigs_int = [int(c[0][4:6]) for c in clrs_pig_valid]

cmap = plt.get_cmap("viridis", np.max(pigs_int) - np.min(pigs_int) + 1)

plt.figure(figsize=(8, 6))
plt.title("Blockwise normalization")
scatter = plt.scatter(
    X_pca[:, 0], X_pca[:, 1], c=pigs_int, cmap=cmap, edgecolor="k", s=25
)

c_bar = plt.colorbar(scatter, ticks=np.arange(np.min(pigs_int), np.max(pigs_int) + 1))
c_bar.set_label("Pig")
c_bar.set_ticks(np.arange(np.min(pigs_int), np.max(pigs_int) + 1))

plt.xlabel("1st principal component")
plt.ylabel("2nd principal component")
plt.show()

## Hyperparameter Tuning 

- [Keras Tuner](https://www.tensorflow.org/tutorials/keras/keras_tuner)

In [None]:
import keras_tuner as kt
from tensorflow import keras

In [None]:
def build_model(hp):
    output_dim = 3

    model = keras.Sequential()
    # input layer
    model.add(keras.layers.Input(shape=(64, 1024, 1)))

    # tune the number of hidden layers and units in each.
    for i in range(1, hp.Int("num_layers", 4, 7)):
        print(f"Init layer {i=}")
        hp_units = hp.Int("units_" + str(i), min_value=2, max_value=16, step=4)
        hp_kernel = hp.Int("kernel_" + str(i), min_value=2, max_value=9, step=1)
        # stride dim (0,1)
        hp_strides_0 = hp.Int("units_0_" + str(i), min_value=1, max_value=4, step=1)
        hp_strides_1 = hp.Int("units_1_" + str(i), min_value=2, max_value=4, step=1)
        hp_activation = hp.Choice(
            "activation_" + str(i), values=["relu", "elu", "tanh"]
        )
        hp_dropout = hp.Float("dropout_" + str(i), 0, 1.0, step=0.1)

        # create layer
        model.add(
            keras.layers.Conv2D(
                hp_units,
                hp_kernel,
                strides=(hp_strides_0, hp_strides_1),
                padding="same",
            )
        )
        model.add(keras.layers.BatchNormalization())
        model.add(tf.keras.layers.Activation(hp_activation))
        model.add(keras.layers.Dropout(hp_dropout))

    model.add(keras.layers.Flatten())
    # output layer.
    model.add(keras.layers.Dense(output_dim, activation="linear"))

    hp_learning_rate = hp.Choice("learning_rate", values=[1e-2, 1e-3, 1e-4, 1e-5])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
        loss=keras.losses.MeanAbsoluteError(),
        # loss=keras.losses.MeanSquaredError(),
        metrics=["accuracy"],
    )
    print(model.summary())

    return model

In [None]:
tuner = kt.Hyperband(
    build_model,
    objective="val_accuracy",
    max_epochs=50,
    factor=2,
    directory="mapper_tuning",
    project_name="mapper_tuning_1",
)
# mapper_tuning_1 -> tuned on all pigs with MAE
# mapper_tuning_2 -> tuned on 9/10 pigs with MAE
# mapper_tuning_3 -> tuned on all pigs with MSE
# mapper_tuning_4 -> tuned on 9/10 pigs with MSE

**Tune model on full data**

In [None]:
stop_early = tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=10)
tuner.search(
    X_train,
    y_train,
    epochs=50,
    batch_size=8,
    validation_data=(X_valid, y_valid),
    callbacks=[stop_early],
)

**Train the model with the parameters from the hpt on the first nine pigs**

In [None]:
from glob import glob

m_idx = len(glob("src/weights/*.h5")) + 1

In [None]:
pigs = ["P_{0:02d}_PulHyp".format(i) for i in range(1, 10)]
print(pigs)

load_path = "/data/PulHypStudie_Check_npz_v2_SNR20/"
X_expt10, y_expt10, clrs_pig_expt10 = load_augmented_example(
    load_path, pigs, sample_skip=500, load_samples="upwards"
)
X_valid_expt10, y_valid_expt10, clrs_pig_valid_expt10 = load_augmented_example(
    load_path, pigs, sample_skip=500, load_samples="downwards"
)

In [None]:
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(best_hps.values)
model = tuner.hypermodel.build(best_hps)

In [None]:
best_hps.values

In [None]:
history = model.fit(
    X_expt10,
    y_expt10,
    epochs=50,
    batch_size=8,
    validation_data=(X_valid_expt10, y_valid_expt10),
)

In [None]:
model.save_weights(f"src/weights/mapper_model_{m_idx}.weights.h5")

**Test model performance with trained model**

In [None]:
# load pig 10 as the test pig
pigs_test = ["P_10_PulHyp"]
X_10, y_10, clrs_pig_10 = load_augmented_example(
    load_path, pigs_test, sample_skip=6552, load_samples="downwards"
)

In [None]:
# predict EIT data of pig 10
y_pred = model.predict(X_10)

In [None]:
# scale all AP values to the initial scale
dap_factor = 180
sap_factor = 180
map_factor = 160

Y_true = np.empty(y_pred.shape)
Y_pred = np.empty(y_pred.shape)

Y_true[:, 0] = y_10[:, 0] * dap_factor  # dap normalization
Y_true[:, 1] = y_10[:, 1] * sap_factor  # sap normalization
Y_true[:, 2] = y_10[:, 2] * map_factor  # map normalization

Y_pred[:, 0] = y_pred[:, 0] * dap_factor  # dap normalization
Y_pred[:, 1] = y_pred[:, 1] * sap_factor  # sap normalization
Y_pred[:, 2] = y_pred[:, 2] * map_factor  # map normalization

np.savez(
    f"src/results/result_mapper_{m_idx}.npz",
    Y_true=Y_true,
    Y_pred=Y_pred,
)

**Error estimation**

In [None]:
import pandas as pd
import seaborn as sns

In [None]:
DAP_err = Y_pred[:, 0] - Y_true[:, 0]
SAP_err = Y_pred[:, 1] - Y_true[:, 1]
MAP_err = Y_pred[:, 2] - Y_true[:, 2]

In [None]:
DF_err = pd.DataFrame({"DAP": DAP_err, "SAP": SAP_err, "MAP": MAP_err})
DF_err.to_csv(f"src/results/mapper_{m_idx}.csv", index=False)

In [None]:
sns.histplot(DF_err)
plt.savefig(f"src/results/histplot_{m_idx}.png")

In [None]:
sns.boxplot(DF_err)
plt.grid()
plt.ylim([-50, 50])
plt.savefig(f"src/results/boxplot_{m_idx}.png")

In [None]:
model.loss(Y_true, Y_pred)

In [None]:
plt.plot(history.history["accuracy"], label="Training")
plt.plot(history.history["val_accuracy"], label="Validation")
plt.grid()
plt.ylim([0, 1.0])
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend(loc="lower right")
plt.savefig(f"src/results/accrcy_{m_idx}.png")
plt.show()

___

## Export

**export results**

In [None]:
from plotLaTeX import HistPlot, BoxPlot

In [None]:
hist = HistPlot(bins=25)

In [None]:
hist.add_histdata(DAP_err, "DAP")
hist.add_histdata(SAP_err, "SAP")
hist.add_histdata(MAP_err, "MAP")

In [None]:
hist.add_axis_labels(xlabel="Absolute AP deviation (mmHg)", ylabel="Count")

In [None]:
hist.export(f_name="hist_results.csv")

In [None]:
box = BoxPlot()

In [None]:
box.add_data(DAP_err, "DAP")
box.add_data(SAP_err, "SAP")
box.add_data(MAP_err, "MAP")

In [None]:
box.add_axis_labels(xlabel="AP", ylabel="Absolute Error (mmHg)")

In [None]:
box.LaTeXcode()

In [None]:
DAP_rerr = (Y_pred[:, 0] - Y_true[:, 0]) / Y_true[:, 0] * 100
SAP_rerr = (Y_pred[:, 1] - Y_true[:, 1]) / Y_true[:, 1] * 100
MAP_rerr = (Y_pred[:, 2] - Y_true[:, 2]) / Y_true[:, 2] * 100

In [None]:
box_r = BoxPlot()

In [None]:
box_r.add_data(DAP_rerr, "DAP")
box_r.add_data(SAP_rerr, "SAP")
box_r.add_data(MAP_rerr, "MAP")

In [None]:
box_r.add_axis_labels(xlabel="AP", ylabel="Relative Error")

In [None]:
box_r.LaTeXcode()

In [None]:
DF_rerr = pd.DataFrame({"DAP": DAP_rerr, "SAP": SAP_rerr, "MAP": MAP_rerr})

In [None]:
sns.boxplot(DF_rerr)
plt.grid()
plt.ylim([-50, 50])

In [None]:
df = DF_rerr

summary = {}
for column in df.columns:
    q1 = df[column].quantile(0.25)
    q3 = df[column].quantile(0.75)
    median = df[column].median()
    iqr = q3 - q1  # Interquartilsabstand
    lower_whisker = df[column][df[column] >= (q1 - 1.5 * iqr)].min()
    upper_whisker = df[column][df[column] <= (q3 + 1.5 * iqr)].max()

    summary[column] = {
        "Q1": q1,
        "Median": median,
        "Q3": q3,
        "Lower Whisker": lower_whisker,
        "Upper Whisker": upper_whisker,
    }

summary_df = pd.DataFrame(summary).T
print(summary_df)