#Single-fidelity NN surrogate model

In the following notebook construct, train, and validate a single-fidelity neural network (NN) surrogate model of lung function.

We begin by importing the necessary modules.

In [None]:
from IPython.display import clear_output

In [None]:
!pip install keras-tuner --upgrade

clear_output()

In [None]:
# General imports
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors as mcolors
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
colors = dict(mcolors.BASE_COLORS, **mcolors.CSS4_COLORS)
%matplotlib inline

# TensorFlow imports
import pathlib
import pandas as pd
import seaborn as sns
import tensorflow as tf
import keras_tuner
from tensorflow import keras
from tensorflow.keras import layers

print(tf.__version__)

np.random.seed(20)

2.12.0


## Import high-fidelity dataset

Next, we import and load the created dataset from high fidelity simulations. Each sample/observations of the dataset consists in an input and output array of the form:

\begin{array}{cc}
\text{Input} & \text{Output}\\
[ c, \beta, c_1, c_3, k, K_s ] & [ \mathrm{C_{rs}}, \mathrm{R} ]
\end{array}
\
,where the input contains our six model parameters ($c$, $\beta$, $c_1$ and $c_3$ from the tissue constitutive model + permeability $k$ + chest wall stiffness $K_s$) and the output our two variables that represent the lung response (respiratory system compliance $\mathrm{C_{rs}}$ and airways resistance $\mathrm{R}$).

\
- For the surrogate model training, we have available 20 samples.

- For the surrogate model testing, we have available 100 samples.

In [None]:
# Getting files from repository
!wget https://raw.githubusercontent.com/josebarahonay/datos/master/training_input_data_20_hf.npy
!wget https://raw.githubusercontent.com/josebarahonay/datos/master/training_output_data_20_hf.npy
!wget https://raw.githubusercontent.com/josebarahonay/datos/master/testing_input_data_100_hf.npy
!wget https://raw.githubusercontent.com/josebarahonay/datos/master/testing_output_data_100_hf.npy

clear_output()

In [None]:
# Loading high fidelity training datasets
X_hf = np.load('training_input_data_20_hf.npy')
Y_hf = np.load('training_output_data_20_hf.npy')

# Loading high fidelity testing datasets
X_hf_100 = np.load('testing_input_data_100_hf.npy')
Y_hf_100 = np.load('testing_output_data_100_hf.npy')

# Checking size of HF and LF datasets
print("Training X HF size :", X_hf.shape)
print("Training Y HF size :", Y_hf.shape)
print("")
print("Testing X HF size :", X_hf_100.shape)
print("Testing Y HF size :", Y_hf_100.shape)

Checking size of HF and LF datasets:

Training X HF size : (20, 6)
Training Y HF size : (20, 2)

Testing X HF size : (100, 6)
Testing Y HF size : (100, 2)


## Data preprocessing for single-fidelity Neural network (NN) model

After loading it, we divide our training dataset into an internal training and validation sets. As there are only a few observations, we want to use the majority of them for training our surrogate model. Thus, we specify a training size of 95%, leaving only one observation for the internal model validation.The testing dataset remains untouched for the entire analysis and is only used to evaluate the performance after we trained our surrogate model.

In [None]:
from sklearn.model_selection import train_test_split
# Next, the High Fidelity training datasets is split into an internal Training and Validation set
X_hf_train, X_hf_test = train_test_split(X_hf, train_size=0.95, shuffle=True, random_state=1) # this split the input data
Y_hf_train, Y_hf_test = train_test_split(Y_hf, train_size=0.95, shuffle=True, random_state=1) # this split the output data

X_hf_test_100 = X_hf_100
Y_hf_test_100 = Y_hf_100

Given that the six model parameters have considerable differences in their scale, it is recommended to normalize the input dataset:

In [None]:
from sklearn.preprocessing import StandardScaler

## Normalize input data
scaler = StandardScaler()

X_hf_train = scaler.fit_transform(X_hf_train)
X_hf_test = scaler.transform(X_hf_test)
X_hf_test_100 = scaler.transform(X_hf_test_100)

## ML surrogate model construction

After the data is prepared, we construct and train the single-fidelity NN model.

Regarding this, we want an approximation of the form

\begin{equation}
    \boldsymbol{y} \approx NN(\boldsymbol{x}, \boldsymbol{\theta}_{NN}),  
\end{equation}

where $\boldsymbol{x}$ is the already described vector of inputs, and $NN$ is a neural network with architecture to be defined and parameters $\boldsymbol{\theta}_{NN}$, also called weights.
    
From here, it is important to note that we build two separate models: one for the respiratory-system compliance prediction, and the other for the airways resistance.

### Grid search of hyperparameters

In order to have an optimal architecture for our model predictions, we perform a grid-search of the following architecture parameters:

- Number of layers
- Number of neurons per layer
- Learning rate α
- Number of epocs

We consider a LeakyReLU activation function between layers

**Note**: *Take consideration that this procedure and the following code blocks may take several hours to be executed*.

To optimize our model, we use the mean squared error loss function

\begin{equation}
    \mathcal{L}(\theta_{NN})=\dfrac{1}{M_H}\sum\limits_{i=1}^{M_H}(NN(\boldsymbol{x}_i)-\hat{y}_i)^2,    
\end{equation}

where ${M_H}$ is the number of training observations.

In [None]:
input_dim = X_hf_train.shape[1]
output_dim = 1

# NN surrogate model
# Trought 'hp', we vary the number of neurons per layer, and the learning rate.
# Outside the model, we vary the number of hidden layers and number of epocs.
# For simplicity, we show only the model with 3 hidden layers:

def build_model(hp):
  model = keras.Sequential([
    layers.Dense(hp.Choice('units', [30, 60, 120]), activation='LeakyReLU', input_shape=[input_dim]),
    layers.Dense(hp.Choice('units', [30, 60, 120]), activation='LeakyReLU'),
    layers.Dense(hp.Choice('units', [30, 60, 120]), activation='LeakyReLU'),
    layers.Dense(output_dim)
  ])

  optimizer = tf.keras.optimizers.Adam(
                  learning_rate=hp.Choice("learning_rate", values=[0.001, 0.01, 0.1]))

  model.compile(loss='mse',
                optimizer=optimizer,
                metrics=[tf.keras.metrics.MeanSquaredError()])
  return model

tuner = keras_tuner.GridSearch(build_model, objective='val_loss')
tuner.search_space_summary()

Search space summary
Default search space size: 2
units (Choice)
{'default': 30, 'conditions': [], 'values': [30, 60, 120], 'ordered': True}
learning_rate (Choice)
{'default': 0.001, 'conditions': [], 'values': [0.001, 0.01, 0.1], 'ordered': True}


In [None]:
# Perform grid-search on the airways resistance NN model
tuner.search(X_hf_train, Y_hf_train[:,0], epochs=5000, validation_data=(X_hf_test_100, Y_hf_test_100[:,0]))
best_model = tuner.get_best_models()[0]
tuner.results_summary()
best_model.summary()

Trial 9 Complete [00h 06m 24s]
val_loss: 3.6223807334899902

Best val_loss So Far: 0.3237500488758087
Total elapsed time: 00h 35m 18s
Results summary
Results in ./untitled_project
Showing 10 best trials
Objective(name="val_loss", direction="min")

Trial 0007 summary
Hyperparameters:
units: 120
learning_rate: 0.01
Score: 0.3237500488758087

Trial 0002 summary
Hyperparameters:
units: 30
learning_rate: 0.1
Score: 0.35352760553359985

Trial 0005 summary
Hyperparameters:
units: 60
learning_rate: 0.1
Score: 0.3706447184085846

Trial 0001 summary
Hyperparameters:
units: 30
learning_rate: 0.01
Score: 0.5023677945137024

Trial 0004 summary
Hyperparameters:
units: 60
learning_rate: 0.01
Score: 0.5194442272186279

Trial 0006 summary
Hyperparameters:
units: 120
learning_rate: 0.001
Score: 0.5532745718955994

Trial 0003 summary
Hyperparameters:
units: 60
learning_rate: 0.001
Score: 0.569047212600708

Trial 0000 summary
Hyperparameters:
units: 30
learning_rate: 0.001
Score: 0.949256956577301

Trial 

In [None]:
# Perform grid-search on the respiratory-system compliance NN model
tuner.search(X_hf_train, Y_hf_train[:,1], epochs=5000, validation_data=(X_hf_test_100, Y_hf_test_100[:,1]))
best_model = tuner.get_best_models()[0]
tuner.results_summary()
best_model.summary()

Results summary
Results in ./untitled_project
Showing 10 best trials
Objective(name="val_loss", direction="min")

Trial 0007 summary
Hyperparameters:
units: 120
learning_rate: 0.01
Score: 0.3237500488758087

Trial 0002 summary
Hyperparameters:
units: 30
learning_rate: 0.1
Score: 0.35352760553359985

Trial 0005 summary
Hyperparameters:
units: 60
learning_rate: 0.1
Score: 0.3706447184085846

Trial 0001 summary
Hyperparameters:
units: 30
learning_rate: 0.01
Score: 0.5023677945137024

Trial 0004 summary
Hyperparameters:
units: 60
learning_rate: 0.01
Score: 0.5194442272186279

Trial 0006 summary
Hyperparameters:
units: 120
learning_rate: 0.001
Score: 0.5532745718955994

Trial 0003 summary
Hyperparameters:
units: 60
learning_rate: 0.001
Score: 0.569047212600708

Trial 0000 summary
Hyperparameters:
units: 30
learning_rate: 0.001
Score: 0.949256956577301

Trial 0008 summary
Hyperparameters:
units: 120
learning_rate: 0.1
Score: 3.6223807334899902
Model: "sequential"
____________________________

After performed:

Optimal hyperparameters:
- Number of layers = 3
- Neurons per layer = 60
- Learning rate = 0.01
- Epocs = 10000

Next, we construct and train the NN model.

In [None]:
# We construct a NN model with the optimal hyperparameters.
def build_model_6(alpha):
  model = keras.Sequential([
    layers.Dense(30*2, activation='LeakyReLU', input_shape=[input_dim]),
    layers.Dense(30*2, activation='LeakyReLU'),
    layers.Dense(30*2, activation='LeakyReLU'),
    layers.Dense(output_dim)
  ])

  optimizer = tf.keras.optimizers.Adam(
                  learning_rate=alpha)

  model.compile(loss='mse',
                optimizer=optimizer,
                metrics=['mae', 'mse'])
  return model



In [None]:
# Testing model construction
model = build_model_6(0.01)
model.summary()
example_result = model.predict(X_hf_test)
example_result



Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_4 (Dense)             (None, 60)                420       
                                                                 
 dense_5 (Dense)             (None, 60)                3660      
                                                                 
 dense_6 (Dense)             (None, 60)                3660      
                                                                 
 dense_7 (Dense)             (None, 1)                 61        
                                                                 
Total params: 7,801
Trainable params: 7,801
Non-trainable params: 0
_________________________________________________________________


array([[0.00671797]], dtype=float32)

In [None]:
# Display training progress by printing a single dot for each completed epoch
class PrintDot(keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs):
    if epoch % 100 == 0: print('')
    print('.', end='')

EPOCHS = 10000

### RESISTANCE model ###
model_r = build_model_6(0.01)
### COMPLIANCE model ###
model_c = build_model_6(0.01)

## Fit (optimize) the models
model_r.fit(X_hf_train, Y_hf_train[:,0],epochs=EPOCHS, verbose=0,callbacks=[PrintDot()])
print('Resistance model optimized')
model_c.fit(X_hf_train, Y_hf_train[:,1],epochs=EPOCHS, verbose=0,callbacks=[PrintDot()])
print('Compliance model optimized')


....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
..........................................................................................

After trained, we evaluate the model performance and save the model predictions on the HF testing set

In [None]:
# Predictions
r_test_predictions = model_r.predict(X_hf_test_100)
c_test_predictions = model_c.predict(X_hf_test_100)

# Performance
print(r2_score(Y_hf_test_100[:,0], r_test_predictions[:,0]))
print(r2_score(Y_hf_test_100[:,1], c_test_predictions[:,0]))

# Export values
np.save('NN_C_50percent_updated.npy', r_test_predictions)
np.save('NN_R_50percent_updated.npy', c_test_predictions)

0.9175173847459407
0.6685578704031272


## Cross-Validation (in testing set)

Additionally, we perform a cross-validation using different training sizes.

**Note**: *Take consideration that the following code block may take several hours to be executed*.

In [None]:
split_list_1 = [0.5, 0.55, 0.6, 0.65, 0.7]
split_list_2 = [0.75, 0.8, 0.85, 0.9, 0.95]
random_list = np.arange(1, 100+1,10)
RMSE_list_R_1 = []
RMSE_list_C_1 = []
std_R_1 = []
std_C_1 = []

for split in split_list_1:

  print("Train split: ", split)
  RMSE_inner_list_R = []
  RMSE_inner_list_C = []

  for random_number in random_list:

    # Split
    X_hf_train, X_hf_test = train_test_split(X_hf, train_size=split, shuffle=True, random_state=random_number) # this split the input data
    Y_hf_train, Y_hf_test = train_test_split(Y_hf, train_size=split, shuffle=True, random_state=random_number) # this split the output data

    X_hf_test_100 = X_hf_100
    Y_hf_test_100 = Y_hf_100

    # Normalize
    scaler = StandardScaler()

    X_hf_train = scaler.fit_transform(X_hf_train)
    X_hf_test = scaler.transform(X_hf_test)
    X_hf_test_100 = scaler.transform(X_hf_test_100)

    # Train
    input_dim = X_hf_train.shape[1]
    output_dim = 1

    ### RESISTANCE model ###
    model_r = build_model_6(0.0001)
    ### COMPLIANCE model ###
    model_c = build_model_6(0.0001)

    EPOCHS = 2000

    ## Fit (optimize) the models
    model_r.fit(X_hf_train, Y_hf_train[:,0],epochs=EPOCHS, verbose=0,callbacks=[PrintDot()])
    print('Resistance model optimized')
    model_c.fit(X_hf_train, Y_hf_train[:,1],epochs=EPOCHS, verbose=0,callbacks=[PrintDot()])
    print('Compliance model optimized')

    # Evaluate
    r_test_predictions = model_r.predict(X_hf_test_100)
    c_test_predictions = model_c.predict(X_hf_test_100)

    # Predictions MSE
    print("H-F predictions")
    mse_R = mean_squared_error(Y_hf_test_100[:,0], r_test_predictions)
    print('\nMSE R: {}'.format(mse_R))
    mse_C = mean_squared_error(Y_hf_test_100[:,1], c_test_predictions)
    print('\nMSE C: {}'.format(mse_C))
    RMSE_inner_list_R.append(np.sqrt(mse_R))
    RMSE_inner_list_C.append(np.sqrt(mse_C))
  RMSE_list_R_1.append(np.average(RMSE_inner_list_R))
  RMSE_list_C_1.append(np.average(RMSE_inner_list_C))
  print(RMSE_inner_list_R)
  print(RMSE_inner_list_C)
  std_R_1.append(np.std(RMSE_inner_list_R))
  std_C_1.append(np.std(RMSE_inner_list_C))

#clear_output()

for i in range(len(split_list_1)):
  print("Train split: ", split_list_1[i], "||    MSE R: ", RMSE_list_R_1[i], "||    MSE C: ", RMSE_list_C_1[i], "||    std R: ", std_R_1[i], "||    std C: ", std_C_1[i])

Train split:  0.5

....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
....................................................................................................
........................................................................

In [None]:
# Export values
np.save('RMSE_R_1_nn.npy', RMSE_list_R_1)
np.save('RMSE_C_1_nn.npy', RMSE_list_C_1)
np.save('std_R_1_nn.npy', std_R_1)
np.save('std_C_1_nn.npy', std_C_1)

In [None]:
split_list_1 = [0.5, 0.55, 0.6, 0.65, 0.7]
split_list_2 = [0.75, 0.8, 0.85, 0.9, 0.95]
random_list = np.arange(1, 100+1,10)
RMSE_list_R_2 = []
RMSE_list_C_2 = []
std_R_2 = []
std_C_2 = []

for split in split_list_2:

  print("Train split: ", split)
  RMSE_inner_list_R = []
  RMSE_inner_list_C = []

  for random_number in random_list:

    # Split
    X_hf_train, X_hf_test = train_test_split(X_hf, train_size=split, shuffle=True, random_state=random_number) # this split the input data
    Y_hf_train, Y_hf_test = train_test_split(Y_hf, train_size=split, shuffle=True, random_state=random_number) # this split the output data

    X_hf_test_100 = X_hf_100
    Y_hf_test_100 = Y_hf_100

    # Normalize
    scaler = StandardScaler()

    X_hf_train = scaler.fit_transform(X_hf_train)
    X_hf_test = scaler.transform(X_hf_test)
    X_hf_test_100 = scaler.transform(X_hf_test_100)

    # Train
    input_dim = X_hf_train.shape[1]
    output_dim = 1

    ### RESISTANCE model ###
    model_r = build_model_6(0.0001)
    ### COMPLIANCE model ###
    model_c = build_model_6(0.0001)

    EPOCHS = 2000

    ## Fit (optimize) the models
    model_r.fit(X_hf_train, Y_hf_train[:,0],epochs=EPOCHS, verbose=0,callbacks=[PrintDot()])
    print('Resistance model optimized')
    model_c.fit(X_hf_train, Y_hf_train[:,1],epochs=EPOCHS, verbose=0,callbacks=[PrintDot()])
    print('Compliance model optimized')

    # Evaluate
    r_test_predictions = model_r.predict(X_hf_test_100)
    c_test_predictions = model_c.predict(X_hf_test_100)

    # Predictions MSE
    print("H-F predictions")
    mse_R = mean_squared_error(Y_hf_test_100[:,0], r_test_predictions)
    print('\nMSE R: {}'.format(mse_R))
    mse_C = mean_squared_error(Y_hf_test_100[:,1], c_test_predictions)
    print('\nMSE C: {}'.format(mse_C))
    RMSE_inner_list_R.append(np.sqrt(mse_R))
    RMSE_inner_list_C.append(np.sqrt(mse_C))
  RMSE_list_R_2.append(np.average(RMSE_inner_list_R))
  RMSE_list_C_2.append(np.average(RMSE_inner_list_C))
  print(RMSE_inner_list_R)
  print(RMSE_inner_list_C)
  std_R_2.append(np.std(RMSE_inner_list_R))
  std_C_2.append(np.std(RMSE_inner_list_C))

#clear_output()

for i in range(len(split_list_2)):
  print("Train split: ", split_list_2[i], "||    MSE R: ", RMSE_list_R_2[i], "||    MSE C: ", RMSE_list_C_2[i], "||    std R: ", std_R_2[i], "||    std C: ", std_C_2[i])

In [None]:
# Export values
np.save('RMSE_R_2_nn.npy', RMSE_list_R_2)
np.save('RMSE_C_2_nn.npy', RMSE_list_C_2)
np.save('std_R_2_nn.npy', std_R_2)
np.save('std_C_2_nn.npy', std_C_2)