### Notebook 4: Gaussian Process for Surrogate Modeling

In this notebook, we will demonstrate how to use active learning to make GP model accurately approximate a given limit state. This type of analysis is very useful for probabilistic reliability analysis, where the surrogate model (i.e., a GP model) is tasked with making accurate decisions on whether the values of the system response are greater/lower than the given threshold (i.e., the limit state).

This active learning scheme enrich the training dataset sequentially by using a so-called **U** formula:

\begin{equation}
U = \frac{|\mu - G|}{\sigma},
\end{equation}

where $\mu$ and $\sigma$ stand for the GP prediction mean and standard deviation, respective, and $G$ denotes the value of the limit state. By identifying the candidate sample with the minimum **U** value, we are able to greatly improve the GP predictions in the vicinity of the limit state.

> GPflow depends on both TensorFlow and TensorFlow Probability, which require very specific versions to be compatible. For example:
>
> **`!pip install gpflow tensorflow~=2.10.0 tensorflow-probability~=0.18.0`**
>
> For more details, please refer to the [official documents](https://gpflow.github.io/GPflow/2.9.1/installation.html).

### 0. Import libraries

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from scipy.stats import qmc
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error

import gpflow
import gpflow.utilities.model_utils as util
import tensorflow as tf

### 1. Problem Setup

We select the following test function in this case study:

\begin{equation}
y(x_1, x_2) = 20-(x_1-x_2)^2-8(x_1+x_2-4)^3, \; x_1 \in [-5, 5], x_2 \in [-5, 5]
\end{equation}

The failure boundary is defined as $y(x_1, x_2) = 0$.

Later on, we will train a Gaussian Process model to capture the failure boundary. This is crucial for accurate and robust risk analysis. To minimize the number of employed training samples, we will use an active learning strategy to allocate new samples in the vicinity of the failure boundary. 

Let's first plot this function to gain some intuition.

In [None]:
def Test_2D(X):
    """2D Test Function"""
    y = 20 - (X[:,0]-X[:,1])**2 - 8*(X[:,0]+X[:,1]-4)**3
    return y

In [None]:
# Load LHS package
from pyDOE import lhs

# Test data
X1 = np.linspace(-5, 5, 100)
X2 = np.linspace(-5, 5, 100)
X1, X2 = np.meshgrid(X1, X2)
X_test = np.hstack((X1.reshape(-1,1), X2.reshape(-1,1)))
y_test = Test_2D(X_test)

In [None]:
# Visualize the limit state function
fig, ax = plt.subplots(figsize=(7,5))
ax.contour(X_test[:,0].reshape(100,-1), X_test[:,1].reshape(100,-1), 
           y_test.reshape(100,-1), levels = [0], 
           colors='r',linestyles='--',linewidths=2)
ax.set_xlabel(r'$X_1$', fontsize=15)
ax.set_ylabel(r'$X_2$', fontsize=15)
ax.set_title('Limit-State Function', fontsize=15)
ax.tick_params(axis='both', which='major', labelsize=12);

### 2. Generate Training Dataset

#### 2.1 Train an initial model

As usual, we use **Latin Hypercube Sampling** approach to generate training samples. We use 15 samples to train an initial GP model. Remember that it is a good practice to normalize the inputs before submitting to GP training.

In [None]:
# Generate training data
sample_num = 15
lb, ub = np.array([-5, -5]), np.array([5, 5])
X_train = (ub-lb)*qmc.LatinHypercube(d=2, seed=42).random(sample_num) + lb

# Compute labels
y_train = Test_2D(X_train).reshape(-1,1)

# Scale input
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)

Next, let's create a couple of helper functions to streamline the analysis.

In [None]:
def config_GP(lengthscale, X_train, y_train):
    """
    Initialize the kernel parameters.

    Args:
    -----
    lengthscale: initial length scale values for GP model.
    X_train: training input data.
    y_train: training output data.

    Returns:
    --------
    model: configured GP model using GPflow.
    """  
    
    # Set up the kernel
    kernel = gpflow.kernels.SquaredExponential(variance=np.var(y_train), lengthscales=lengthscale)

    # Set up the model
    model = gpflow.models.GPR(
        (X_train, y_train.reshape(-1, 1)),
        kernel=kernel,
        mean_function=gpflow.functions.Polynomial(0),
        noise_variance=1e-5
    )
    
    gpflow.set_trainable(model.likelihood.variance, False)
    
    return model

In [None]:
def init_kernel_params(dim, n_restarts=5):
    """
    Initialize the kernel parameters.

    Args:
    -----
    dim: input dimension. One length scale for each dimension.
    n_restarts: number of random restarts for length scale optimization.

    Returns:
    --------
    length_scales: random guesses of the length scale parameters.
    """  
    
    lb, ub = -2, 2
    lhd = qmc.LatinHypercube(d=dim, seed=42).random(n_restarts)
    length_scales = (ub-lb)*lhd + lb
    length_scales = 10**length_scales

    return length_scales

In [None]:
def fit(X_train, y_train, n_restarts=5):
    
    models = []
    log_likelihoods = []
    init_length_scales = init_kernel_params(dim=X_train.shape[1], n_restarts=n_restarts)
    
    with tf.device("CPU:0"):
    
        for i, length_scale in enumerate(init_length_scales):
    
            # Init model
            model = config_GP(length_scale, X_train, y_train)
    
            # Training
            opt = gpflow.optimizers.Scipy()
            opt.minimize(model.training_loss, model.trainable_variables, options=dict(maxiter=100))
    
            # Record keeping
            models.append(model)
            log_likelihoods.append(model.log_marginal_likelihood().numpy())
    
    # Select the model with the highest log-marginal likelihood
    best_model_index = np.argmax(log_likelihoods)
    best_model = models[best_model_index]
    
    print(f"Best model log-marginal likelihood: {log_likelihoods[best_model_index]}")

    return best_model

Let's fit an initial GP model.

In [None]:
GP = fit(X_train_scaled, y_train)

Then, we assess the performance of the fitted model.

In [None]:
# GP model predicting
f_mean, _ = GP.predict_f(scaler.transform(X_test))
f_mean = f_mean.numpy().flatten()

# Post-processing - Contour plot
fig, ax = plt.subplots(figsize=(7,5))

# Reference results
ax.contour(X_test[:,0].reshape(100,-1), 
           X_test[:,1].reshape(100,-1), 
           y_test.reshape(100,-1), levels = [0],
           colors='r',linestyles='--',linewidths=2)

# GP predictions
ax.contour(X_test[:,0].reshape(100,-1), 
           X_test[:,1].reshape(100,-1), 
           f_mean.reshape(100, -1), levels = [0],
           colors='k',linestyles='-',linewidths=2)

ax.plot(X_train[:,0], X_train[:,1],'bo', 
        markerfacecolor='b', markersize=10)

ax.set_xlabel(r'$X_1$', fontsize=15)
ax.set_ylabel(r'$X_2$', fontsize=15)
ax.set_title('Limit-State Function', fontsize=15)
ax.tick_params(axis='both', which='major', labelsize=12);

Obviously, the obtained limit state (or called stability margin) is far from correct. In the following, we use **U-learning** to gradually refine the GP approximation accuracy. 

#### 2.2 Acquisition function

In [None]:
def acquisition(model, candidate, limit_state_value=0, diagnose=False):

    # Compute prediction variance
    f_mean, f_var = model.predict_f(candidate, full_cov=False)
    f_mean = f_mean.numpy().flatten()
    f_var = f_var.numpy().flatten()

    # Calculate U values
    U_values = np.abs(f_mean-limit_state_value)/np.sqrt(f_var)
    target = np.min(U_values)

    # Locate promising sample
    index = np.argmin(U_values)

    # Select promising sample
    sample = candidate[[index],:]
    reduced_candidate = np.delete(candidate, obj=index, axis=0)

    # For diagnose purposes
    diagnostics = U_values

    if diagnose is True:
        return target, sample, candidate, reduced_candidate, diagnostics
    else:
        return target, sample, candidate, reduced_candidate

#### 2.3 Iterations

Before we start iterating, we need to generate a pool of candidate samples. We will maintain this candidate pool for the subsequent iterations. At each iteration, we pick one sample from the candidate pool. This sample should yield the maximum expected prediction error value among all the candidate samples.

In [None]:
# Start active learning
iteration = 1
U_history = []

# Generate candidate samples (in normalized scale)
Pool = np.random.rand(10000, 1)

Now we are ready for the first iteration.

**The following three cells can be excuted multiple times to manually control the iteration flow**. The first cell identify the sample with the minimum U value. The second and third cell visually summarizes the results from the current iteration and add newly identified samples to the current training dataset.

In [None]:
# 1-GP model training and predicting
GP = fit(X_train_scaled, y_train, n_restarts=5)
f_mean, f_var = GP.predict_f(scaler.transform(X_test), full_cov=False)
f_upper = f_mean + 3*np.sqrt(f_var)
f_lower = f_mean - 3*np.sqrt(f_var)

# 2-Active learning
target, sample, org_pool, Pool, U = acquisition(model=GP, candidate=Pool, 
                                                limit_state_value=0, diagnose=True)

# Record keeping
U_history.append(target)

# 3-Display iteration info
summary = 'Iteration summary:'
iter_number = 'Current iteration: {}'.format(str(iteration))

Iteration_summary = 'Iteration {}:'.format(str(iteration)) \
                    + os.linesep \
                    + 'Current min U is {}'.format(str(target)) 

print(Iteration_summary)

In [None]:
# 4-Iteration assessment
fig, ax = plt.subplots(figsize=(7,5))

display_y = [y_test, f_mean, f_upper, f_lower]
colors = ['r', 'k', 'k', 'k']
linestyles = ['-', '-', '--', '--']

for i in range(4):
    ax.contour(X_test[:,0].reshape(100,-1), X_test[:,1].reshape(100,-1),
               display_y[i].reshape(100,-1), levels = [0],
               colors=colors[i], linestyles=linestyles[i], linewidths=2)

ax.plot(X_train[:sample_num, 0], X_train[:sample_num, 1],'bo', 
        markerfacecolor='b', markersize=10)

# 5-Enrich training dataset
org_sample = scaler.inverse_transform(sample)  # Sample in original scale
X_train = np.vstack((X_train, org_sample))
y_train = np.vstack((y_train, Test_2D(org_sample)))
iteration += 1

# Newly enriched samples
ax.plot(scaler.inverse_transform(sample)[sample_num:, 0], X_train[sample_num:, 1],'ro', 
        markerfacecolor='r', markersize=10)

ax.set_xlabel(r'$X_1$', fontsize=15)
ax.set_ylabel(r'$X_2$', fontsize=15)
ax.set_title('Limit-State Function', fontsize=15)
ax.tick_params(axis='both', which='major', labelsize=12);

In [None]:
# U evolution history
fig, ax = plt.subplots(figsize=(7,5))

threshold = 1.65
ax.plot(np.arange(1, iteration), U_history, 'k-o', lw=2)
ax.plot([0, 32.3], [threshold, threshold], 'r--', lw=2)
ax.tick_params(axis='both', which='major', labelsize=12)
ax.set_xlabel('Iteration', fontsize=15)
ax.set_ylabel('Min U', fontsize=15)
ax.set_title('Evolution', fontsize=15)
ax.set_xlim([0.8, 32.2]);