# Import required packages

In [2]:
import numpy as np
import random
from scipy import stats
from smt.sampling_methods import LHS as LHS_sampling
import sklearn.preprocessing
from lib import MultiIndex, MPJacn, JNodeWt
import spgl1

import torch
import pyro
import pyro.contrib.gp as gp

from scipy.special import eval_legendre, factorial
import matplotlib.pyplot as plt

# Import LaTeX
from matplotlib import rc
rc('text', usetex=True)
plt.rc('axes', labelsize=25)
plt.rcParams['text.usetex'] = True
plt.rcParams['text.latex.preamble'] = r"\usepackage{amsmath} \usepackage{amssymb}"
plt.rcParams['font.family'] = 'Latin Modern Roman'

In [1]:
def LegendrePoly(x, order):
    norm = np.sqrt(2 / (2*order + 1))
    legendre_poly = eval_legendre(order, x)/norm
    return legendre_poly

def ProdLegendrePoly(x, indx):
    N_LD, dimension = np.shape(x)
    prod = np.ones((N_LD, 1))
    for dim in range(dimension):
        legendre_poly = LegendrePoly(x[:, dim], indx[dim])
        prod = prod*legendre_poly.reshape((-1, 1))
    return prod

def compute_rho_torch_pyro(_xobs, _F, Pif, Pic, _nugget, _kernel):
    # Xf points
    Kf = _kernel.forward(_xobs[Pif, :]) + _nugget*torch.eye(sum(Pif))
    _Ff = _F[Pif]

    # Xc points
    Kc = _kernel.forward(_xobs[Pic, :]) + _nugget*torch.eye(sum(Pic))
    _Fc = _F[Pic]

    # Compute rho
    den = (_Fc.T @ torch.linalg.solve(Kc, _Fc))
    num = (_Ff.T @ torch.linalg.solve(Kf, _Ff))
    rho = 1 - den / num
    if rho < 0 or rho > 1:
        raise ValueError(f"Error in rho: {rho}")
    return rho

def compute_RMS_from_data(_x):
    """
    Compute the root mean square distance from a set of positions.
    """
    squared_distance = []
    try: # d-dimension
        n_obs, dimension = np.shape(_x)
        for i in range(n_obs):
            for j in range(i+1, n_obs):
                dist = 0
                for dim in range(dimension):
                    dist = dist + (_x[i, dim] - _x[j, dim])**2
                squared_distance.append(dist)

        RMS = np.sqrt( (2 / (n_obs * (n_obs-1)))*np.sum(squared_distance) )
        print("RMS computed from {} observations of dimension {}: {}".format(n_obs, dimension, RMS))
        return RMS
    except: # 1-dimension
        n_obs = np.size(_x)
        dimension = 1
        for i in range(n_obs):
            for j in range(i+1, n_obs):
                dist = 0
                dist = dist + (_x[i] - _x[j])**2
                squared_distance.append(dist)

        RMS = np.sqrt( (2 / (n_obs * (n_obs-1)))*np.sum(squared_distance) )
        print("RMS computed from {} observations of dimension {}: {}".format(n_obs, dimension, RMS))
        return RMS

def create_pif_pic(num_obs, Nf, Nc, seed = None):
    if seed != None:
        rng = np.random.default_rng(seed)
    else:
        rng = np.random.default_rng()

    # Pif points
    Pitot = np.full((num_obs), False)
    Pitot[:Nf] = True
    rng.shuffle(Pitot)
    Pif = Pitot

    # Pic points
    Pitot = np.full((num_obs), False)
    Pitot[rng.choice(np.where(Pif == True)[0], size = Nc, replace = False)] = True
    Pic = Pitot

    return Pif, Pic

# Ishigami test case

## Ishigami function parameters

In [9]:
def FGT(_x, a, b):
    x, y, z = _x[:, 0], _x[:, 1], _x[:, 2]
    out = np.sin(x) + a*(np.sin(y))**2 + b*((z)**4)*np.sin(x)
    return out

performance_func = "Ishigami_3D"
a, b = 7, 0.1
analytical_mean = a*0.5
analytical_variance = 0.5 + a**2/8 + b**(2)*np.pi**(8)/18 + b*np.pi**(4)/5

# Parameters of the study
number_of_seeds = 10
number_of_observations_LD_list = [100]

# Bounds of the input space
bounds = np.array([[-np.pi, np.pi], [-np.pi, np.pi], [-np.pi, np.pi]])

# Scaling the inputs
scaler = sklearn.preprocessing.MinMaxScaler((-1, 1))
scaler.fit(bounds.T)

dimension = 3

## Surrogate modeling methods

### SSKRR Algorithm and sparse gPC

In [None]:
# Some useful parameters
nuggets = 10**(np.linspace(np.log10(1e-12), np.log10(1e0), num = 31))

# Save the predictions on the test dataset
Y_SSKRR_TD_list = []
Y_PCE_TD_list = []
Y_TD_list = []

# Total order of the basis
total_order = 10

# Compute the corresponding indexes
indexes, _ = MultiIndex.MultiIndex(total_order, dimension)

# Corresponds to uniform distribution
alpha = [0]*dimension; beta = [0]*dimension

for number_of_observations_LD in number_of_observations_LD_list:
    for current_seed in range(number_of_seeds):
        print(f"Current seed: {current_seed}")
        seed = current_seed
        number_of_observations = number_of_observations_LD

        # Define the learning set
        # Number of observatitons of the learning set
        N_LD = number_of_observations

        # Sampling the learning set
        sampling = LHS_sampling(xlimits = bounds, criterion = "maximin", random_state = seed)
        X_LD = sampling(N_LD)
        X_LD_scaled = scaler.transform(X_LD)
        Y_LD = FGT(X_LD, a, b)

        # Define Kappa for the SSKRR algorithm
        kappa_var = np.var(Y_LD)

        # SPGL1 Optimization
        # min |c|_L1 s. t. ||Tc - b||_2 <= epsilon

        print(f"Compute theta for SPGL1 using PCE with total order {total_order} corresponding to {len(indexes)} expansion coefficients")
        theta = np.zeros((N_LD, len(indexes)))
        for obs in range(N_LD):
            theta[obs, :] = MPJacn.MPJacn(X_LD_scaled[obs, :], total_order, alpha, beta, indexes)

        # Convergence criterion for SPGL1
        sigma = 1e-6
        opt_tol = 1e-8

        # SPGL1 solver
        c, resid, grad, info = spgl1.spg_bpdn(theta, np.reshape(Y_LD, (-1,)), sigma, verbosity=3, iter_lim = int(1e5), opt_tol = opt_tol)

        # Plot the SPGL1 solution of the expansion coefficients
        plt.figure()
        plt.plot(abs(c), 'or', markersize=3, mec='k', mew = 0.5)
        plt.xlabel(r"i--th Legendre polynomial"); plt.ylabel(r"$|c_i|$")
        plt.grid(True, color = "black", which="major", ls="--", linewidth = 0.25)
        plt.grid(True, color = "gray", which="minor", ls="--", linewidth = 0.25)
        plt.show()
        plt.close('all')

        plt.figure()
        plt.plot(abs(c), 'or', markersize=3, mec='k', mew = 0.5)
        plt.yscale('log')
        plt.xlabel(r"i--th Legendre polynomial"); plt.ylabel(r"$|c_i|$")
        plt.grid(True, color = "black", which="major", ls="--", linewidth = 0.25)
        plt.grid(True, color = "gray", which="minor", ls="--", linewidth = 0.25)
        plt.show()
        plt.close('all')

        # c_k values and number of basis functions
        ck_value, R_S = c, np.shape(c)[0]

        # Matrix corresponding to the values of the basis at the observations
        e_sparse = theta

        # PCE
        # Norm for moment order of PCE
        norm_moment_order = 1
        for dim in range(dimension):
            norm_moment_order = norm_moment_order*(2**(alpha[dim]+beta[dim]+1)*factorial(alpha[dim])*factorial(beta[dim])/factorial(alpha[dim]+beta[dim]+1))
        norm_moment_order = np.sqrt(norm_moment_order)

        # Mean using PCE
        mean_PCE = ck_value[0] / norm_moment_order
        # Variance using PCE
        var_PCE = np.sum(ck_value[1:]**2) / norm_moment_order**2

        # SSKRR Algorithm
        # Minimize the eigenvalues
        sigma_min = []
        den = np.sum(np.abs(ck_value))
        for k in range(R_S):
            num = np.abs(ck_value[k])
            sigma_temp = kappa_var*(num/den)
            sigma_min.append(sigma_temp)

        sigma_min = np.array(sigma_min).reshape((-1,))

        # Test dataset
        # Compute the error over test_sampling using N_TD sampling
        N_TD = int(1e5)
        np.random.seed(seed)
        X_TD_scaled = np.random.uniform(-1, 1, size=(N_TD, dimension))
        X_TD = scaler.inverse_transform(X_TD_scaled)
        Y_TD = FGT(X_TD, a, b)
        Y_TD_list.append(Y_TD)

        # Compute PCE in the test dataset
        e_PCE = np.zeros((N_TD, R_S))
        for idx in range(R_S):
            e_PCE[:, idx] = ProdLegendrePoly(X_TD_scaled, indexes[idx]).reshape((-1, ))
        Y_PCE = e_PCE @ np.array(ck_value)
        Y_PCE_TD_list.append(Y_PCE)

        RMSE_gPC = np.sqrt(np.mean((Y_PCE - Y_TD)**2))

        # Find the best value of the nugget through grid search
        RMSE_optm_list = []
        for nugget in nuggets:
            # Prediction mean of the GP using the kernel given by the SSKRR algorithm
            K_OBS = e_sparse @ np.diag(sigma_min) @ e_sparse.T
            K_OBS = K_OBS + nugget*np.eye(N_LD)
            K_PREDICT = e_PCE @ np.diag(sigma_min) @ e_sparse.T
            Y_predict_optm = K_PREDICT @ np.linalg.solve(K_OBS, Y_LD)

            # Save the current RMSE on the test set
            RMSE_optm_list.append(np.sqrt(np.mean((Y_TD - Y_predict_optm)**2)))

            # Only keep the best kernel
            if (len(RMSE_optm_list) == 1) or (RMSE_optm_list[-1] < np.min(RMSE_optm_list[:-1])):
                # Value of the minimum of the RMSE
                min_RMSE_SSKRR = RMSE_optm_list[-1]

                # Kappa corresponding to the minimum of the RMSE
                min_nugget = nugget

                # Best prediction corresponding to the minimum of the RMSE
                Y_best_predict_optm = Y_predict_optm

        Y_SSKRR_TD_list.append(Y_best_predict_optm)

        print(f"Log10(RMSE) with SSKRR algorithm: {np.log10(min_RMSE_SSKRR)}")
        print(f"Log10(RMSE) with gPC: {np.log10(RMSE_gPC)}")

### Gaussion process regression surrogate using Gaussian kernel

In [None]:
init_nugget_kernel = 0
Y_GPR_list = []
for number_of_observations_LD in number_of_observations_LD_list:
    for current_seed in range(number_of_seeds):
        # Current seed for LHS
        seed = current_seed

        # Define the learning set
        # Number of observatitons of the learning set
        N_LD = number_of_observations_LD

        # Sampling the learning set
        sampling = LHS_sampling(xlimits = bounds, criterion = "maximin", random_state = seed)
        X_LD = sampling(N_LD)
        X_LD_scaled = scaler.transform(X_LD)
        Y_LD = FGT(X_LD, a, b)

        _, dimension = np.shape(X_LD)
        kappa_var = np.var(Y_LD)

        # Test dataset
        # Compute the error over test_sampling using N_TD sampling
        N_TD = int(1e5)
        np.random.seed(seed)
        X_TD_scaled = np.random.uniform(-1, 1, size=(N_TD, dimension))
        X_TD = scaler.inverse_transform(X_TD_scaled)
        Y_TD = FGT(X_TD, a, b)

        # Initial lengthscale
        # One lengthscale per input dimension
        gamma = []
        for d in range(dimension):
            temp = compute_RMS_from_data(X_LD_scaled[:, d])/np.sqrt(2)
            gamma.append(temp)

        # GPR using pyro
        Nf, Nc = (N_LD, int(N_LD/2))
        ite_max = int(2000)
        ite_save = int(100)

        # Pif and Pic
        Pif, Pic = create_pif_pic(N_LD, Nf, Nc, seed = None)

        # Parameters of the algo
        variable_gamma = torch.tensor(gamma, requires_grad=True)
        nugget_optm = torch.tensor([init_nugget_kernel], requires_grad=False)
        nugget_optm_list = []
        nugget_optm_list.append(torch.clone(nugget_optm).detach().numpy())
        variable_gamma_optm = [torch.clone(variable_gamma).detach().numpy()]

        # Choice of the kernel
        kernel = gp.kernels.RBF(input_dim = dimension, variance=torch.tensor(1.), lengthscale=variable_gamma) # RBF

        # Compute the loss and choose the optimizer
        loss = compute_rho_torch_pyro(torch.from_numpy(X_LD_scaled), torch.tensor(Y_LD), Pif, Pic, nugget_optm, kernel)
        optimizer = torch.optim.Adam([kernel.lengthscale_unconstrained, nugget_optm], lr = 3e-4)

        # Clear pyro parameters for new loop
        pyro.clear_param_store()

        # GPR Prediction
        gpr_opt = gp.models.GPRegression(torch.from_numpy(X_LD_scaled), torch.from_numpy(Y_LD), kernel, noise=torch.tensor(0.), jitter = nugget_optm)
        value_torch, _ = gpr_opt(torch.from_numpy(X_TD_scaled), full_cov=False, noiseless=True)
        value_torch = torch.clone(value_torch).detach().numpy()
        RMSE_torch = np.sqrt(np.mean((Y_TD - value_torch)**2))

        rho_optm = [loss]
        print(f"rho_optm: {rho_optm}")
        ite_list = [0]
        RMSE_torch_list_temp = [RMSE_torch]
        for k in range(ite_max):
            optimizer.zero_grad()
            loss = compute_rho_torch_pyro(torch.from_numpy(X_LD_scaled), torch.tensor(Y_LD), Pif, Pic, nugget_optm, kernel)
            loss.backward()
            optimizer.step()
            with torch.no_grad():
                kernel.lengthscale[:] = kernel.lengthscale.clamp(0, 1e6)
                nugget_optm[:] = nugget_optm.clamp(0, 1e6)
                if (k+1)%ite_save == 0:
                    print(f"Learning: Iteration n. {k+1} / {ite_max}")

                    # Print current length scales of the kernel
                    print(kernel.lengthscale)

                    # Store loss value and current optimized parameters
                    rho_optm.append(loss.item())
                    ite_list.append((k+1))
                    variable_gamma_optm.append(torch.clone(kernel.lengthscale).detach().numpy())
                    nugget_optm_list.append(torch.clone(nugget_optm).detach().numpy())

                    # Compute the test error
                    gpr_opt = gp.models.GPRegression(torch.from_numpy(X_LD_scaled), torch.from_numpy(Y_LD), kernel, noise=torch.tensor(0.), jitter = nugget_optm)
                    value_torch, _ = gpr_opt(torch.from_numpy(X_TD_scaled), full_cov=False, noiseless=True)
                    value_torch = torch.clone(value_torch).detach().numpy()
                    RMSE_torch = np.sqrt(np.mean((Y_TD - value_torch)**2))
                    RMSE_torch_list_temp.append(RMSE_torch)

            # Update new batch
            Pif, Pic = create_pif_pic(N_LD, Nf, Nc, seed = None)

        plt.figure()
        plt.plot(ite_list, RMSE_torch_list_temp, '-ok', markersize = 4, markeredgecolor = 'r', markeredgewidth = 1)
        plt.yscale("log")
        plt.xlabel(r"Number of iterations")
        plt.ylabel(r"$\log_{10}(e_{RMSE})$")
        plt.show()
        plt.close('all')

        plt.figure()
        plt.plot(ite_list, rho_optm, '-ok', markersize = 4, markeredgecolor = 'r', markeredgewidth = 1)
        plt.yscale("log")
        plt.xlabel(r"Number of iterations")
        plt.ylabel(r"$\rho$")
        plt.show()
        plt.close('all')

        plt.figure()
        plt.plot(ite_list, nugget_optm_list, '-ok', markersize = 4, markeredgecolor = 'r', markeredgewidth = 1)
        plt.yscale("log")
        plt.xlabel(r"Number of iterations")
        plt.ylabel(r"$\lambda$")
        plt.show()
        plt.close('all')

        # Compute with the best hyperparameters on the test set
        idx_min_RMSE = np.argmin(RMSE_torch_list_temp)
        gamma_min = variable_gamma_optm[idx_min_RMSE]
        kernel_best = gp.kernels.RBF(input_dim = dimension, variance=torch.tensor(1.), lengthscale=torch.from_numpy(gamma_min)) # RBF
        gpr_opt_best = gp.models.GPRegression(torch.from_numpy(X_LD_scaled), torch.from_numpy(Y_LD), kernel_best, noise=torch.tensor(0.), jitter = nugget_optm)
        value_torch_best, _ = gpr_opt_best(torch.from_numpy(X_TD_scaled), full_cov=False, noiseless=True)
        value_torch_best = torch.clone(value_torch_best).detach().numpy()

        Y_GPR_list.append(value_torch_best)

        print(f"Log10(RMSE): {np.log10(RMSE_torch_list_temp[idx_min_RMSE])}")

### Fully tensorized gPC

In [None]:
Y_FT_PCE_list = []

for number_of_observations_LD in number_of_observations_LD_list:
    # Define the learning set
    # Number of observatitons of the learning set
    N_LD = number_of_observations_LD

    # Bounds of the input space
    bounds = np.array([[-np.pi, np.pi], [-np.pi, np.pi], [-np.pi, np.pi]])
    scaler = sklearn.preprocessing.MinMaxScaler((-1, 1))
    scaler.fit(bounds.T)

    # Total order of the basis
    total_order = 10

    # Compute the corresponding indexes
    indexes, _ = MultiIndex.MultiIndex(total_order, dimension)

    # Corresponds to uniform distribution
    alpha = [0, 0, 0]; beta = [0, 0, 0]

    N_NODES = int(np.around((total_order*2 + 3) / 2))
    CHOICE_QUAD = 2 # Correspond to GJL quadrature
    N_LD = N_NODES**dimension
    X_LD_scaled = np.zeros((N_NODES**dimension, dimension))
    quad, weights = JNodeWt.JNodeWt(N_NODES-1, CHOICE_QUAD,0 ,0)
    multi_dim_weights = np.zeros((N_NODES**dimension, dimension))
    count = 0
    for i in range(N_NODES):
        for j in range(N_NODES):
            for k in range(N_NODES):
                X_LD_scaled[count, :] = quad[i], quad[j], quad[k]
                multi_dim_weights[count, :] = weights[i], weights[j], weights[k]
                count = count + 1
    prod_multi_dim_weights = np.prod(multi_dim_weights, axis = 1)
    diag_prod_multi_dim_weights = np.diag(prod_multi_dim_weights)
    X_LD = scaler.inverse_transform(X_LD_scaled)
    Y_LD = FGT(X_LD, a, b)

    indexes, _ = MultiIndex.MultiIndex(total_order, dimension)
    theta = np.zeros((N_LD, len(indexes)))
    alpha = [0, 0, 0]; beta = [0, 0, 0] # Corresponds to uniform distribution
    print(f"Compute theta vector using PCE with total order {total_order} corresponding to {len(indexes)} parameters")
    for obs in range(N_LD):
        theta[obs, :] = MPJacn.MPJacn(X_LD_scaled[obs, :], total_order, alpha, beta, indexes)

    # Compute the expansion coefficients size (P+1, 1)
    c = Y_LD.T @ diag_prod_multi_dim_weights @ theta

    plt.figure()
    plt.plot(abs(c), 'or', markersize=3, mec='k', mew = 0.5)
    plt.xlabel(r"i--th Legendre polynomial"); plt.ylabel(r"$|c_i|$")
    plt.grid(True, color = "black", which="major", ls="--", linewidth = 0.25)
    plt.grid(True, color = "gray", which="minor", ls="--", linewidth = 0.25)
    plt.show()
    plt.close('all')

    plt.figure()
    plt.plot(abs(c), 'or', markersize=3, mec='k', mew = 0.5)
    plt.yscale('log')
    plt.xlabel(r"i--th Legendre polynomial"); plt.ylabel(r"$|c_i|$")
    plt.grid(True, color = "black", which="major", ls="--", linewidth = 0.25)
    plt.grid(True, color = "gray", which="minor", ls="--", linewidth = 0.25)
    plt.show()
    plt.close('all')

    # c_k values and number of basis functions
    ck_value, R_S = c, np.shape(c)[0]

    # Matrix corresponding to the values of the basis at the observations
    e_sparse = theta

    for current_seed in range(number_of_seeds):
        seed = current_seed
        # Test dataset
        # Compute the error over test_sampling using N_TD sampling
        N_TD = int(1e5)
        np.random.seed(seed)
        X_TD_scaled = np.random.uniform(-1, 1, size=(N_TD, dimension))
        X_TD = scaler.inverse_transform(X_TD_scaled)
        Y_TD = FGT(X_TD, a, b)
        # Compute PCE in the test dataset
        e_PCE = np.zeros((N_TD, R_S))
        for idx in range(R_S):
            e_PCE[:, idx] = ProdLegendrePoly(X_TD_scaled, indexes[idx]).reshape((-1, ))
        Y_PCE = e_PCE @ np.array(ck_value)
        Y_FT_PCE_list.append(Y_PCE)
        RMSE_FT_gPC = np.sqrt(np.mean((Y_PCE - Y_TD)**2))

        print(f"Log10(RMSE) with gPC: {np.log10(RMSE_FT_gPC)}")

## Post-processing

### Box-plot

In [16]:
NRMSE_optm_list, RMSE_optm_list = [], []
NRMSE_PCE_list, RMSE_PCE_list = [], []
NRMSE_RBF_list, RMSE_RBF_list = [], []
NRMSE_FullT_list, RMSE_FullT_list = [], []

for k in range(number_of_seeds):
        Y_TD_current = Y_TD_list[k]
        Y_PCE_current = Y_PCE_TD_list[k]
        Y_GPR_current = Y_GPR_list[k]
        Y_FT_PCE_current = Y_FT_PCE_list[k]
        Y_SSKRR_current = Y_SSKRR_TD_list[k]

        # NRMSE
        NRMSE_PCE_list.append(np.sqrt(np.sum((Y_TD_current - Y_PCE_current)**2) / np.sum((Y_TD_current)**2)))
        NRMSE_optm_list.append(np.sqrt(np.sum((Y_TD_current - Y_SSKRR_current)**2) / np.sum((Y_TD_current)**2)))
        NRMSE_RBF_list.append(np.sqrt(np.sum((Y_TD_current - Y_GPR_current)**2) / np.sum((Y_TD_current)**2)))
        NRMSE_FullT_list.append(np.sqrt(np.sum((Y_TD_current - Y_FT_PCE_current)**2) / np.sum((Y_TD_current)**2)))

        # RMSE
        RMSE_PCE_list.append(np.sqrt(np.mean((Y_TD_current - Y_PCE_current)**2)))
        RMSE_optm_list.append(np.sqrt(np.mean((Y_TD_current - Y_SSKRR_current)**2)))
        RMSE_RBF_list.append(np.sqrt(np.mean((Y_TD_current - Y_GPR_current)**2)))
        RMSE_FullT_list.append(np.sqrt(np.mean((Y_TD_current - Y_FT_PCE_current)**2)))

In [None]:
# Box plot
plt.figure()
plt.boxplot([np.log10(NRMSE_optm_list), np.log10(NRMSE_PCE_list), np.log10(NRMSE_FullT_list), np.log10(NRMSE_RBF_list)])
plt.xticks([1, 2, 3, 4], ["SSKRR", "Sparse gPC", "Fully tensorized gPC", "KRR"])
plt.grid(True, color = "black", which="major", ls="-.", linewidth = 0.5)
plt.ylabel(r"$\log_{10}(e_{\mathrm{NRMSE}})$")
plt.show()
plt.close()

plt.figure()
plt.boxplot([np.log10(RMSE_optm_list), np.log10(RMSE_PCE_list), np.log10(RMSE_FullT_list), np.log10(RMSE_RBF_list)])
plt.xticks([1, 2, 3, 4], ["SSKRR", "Sparse gPC", "Fully tensorized gPC", "KRR"])
plt.grid(True, color = "black", which="major", ls="-.", linewidth = 0.5)
plt.ylabel(r"$\log_{10}(e_{\mathrm{RMSE}})$")
plt.show()
plt.close()

### PDF

In [24]:
Y_TD_current = Y_TD_list[0]
Y_PCE_current = Y_PCE_TD_list[0]
Y_GPR_current = Y_GPR_list[0]
Y_FT_PCE_current = Y_FT_PCE_list[0]
Y_SSKRR_current = Y_SSKRR_TD_list[0]

# Ground Truth
GT_data = Y_TD_current
test_GT = np.linspace(np.min(GT_data), np.max(GT_data), num = int(1e5))
pdf_predict_GT = stats.gaussian_kde(GT_data)(test_GT)
pdf_predict_GT_sorted = np.sort(pdf_predict_GT)

# Sparse gPC
gPC_data = Y_PCE_current
pdf_predict_PCE = stats.gaussian_kde(gPC_data)(test_GT)
pdf_predict_PCE_sorted = np.sort(pdf_predict_PCE)

# SSKRR
optm_data = Y_SSKRR_current
pdf_predict_optm = stats.gaussian_kde(optm_data)(test_GT)
pdf_predict_optm_sorted = np.sort(pdf_predict_optm)

# KRR with Gaussian Kernel
RBF_data = Y_GPR_current
pdf_predict_RBF = stats.gaussian_kde(RBF_data)(test_GT)
pdf_predict_RBF_sorted = np.sort(pdf_predict_RBF)

# Full tensorized gPC
FullT_data = Y_FT_PCE_current
pdf_predict_FullT = stats.gaussian_kde(FullT_data)(test_GT)
pdf_predict_FullT_sorted = np.sort(pdf_predict_FullT)

In [None]:
# Plot PDF
fig = plt.figure()
plt.plot(test_GT, pdf_predict_optm, 'rD-.', linewidth=0.5, alpha = 1, label = "SSKRR", markevery = 1000, markersize = 1.5)
plt.plot(test_GT, pdf_predict_PCE, 'b*-.', linewidth=0.5, alpha = 1, label = " Sparse gPC", markevery = 1000, markersize = 1.5)
plt.plot(test_GT, pdf_predict_FullT, 'c^-.', linewidth=0.5, alpha = 1, label = "Fully tensorized gPC", markevery = 1000, markersize = 1.5)
plt.plot(test_GT, pdf_predict_RBF, 'ms-.', linewidth=0.5, alpha = 1, label = "KRR", markevery = 1000, markersize = 1.5)
plt.plot(test_GT, pdf_predict_GT, 'k', linestyle = 'dotted', linewidth=0.5, alpha = 1, label = "Ground truth")
plt.xticks(fontsize = 14)
plt.yticks(fontsize = 14)
plt.grid(True, color = "black", which="major", ls="-.", linewidth = 0.20)
plt.legend(fontsize = 9, loc="upper right")

# Create zoom-out plot
ax_new = fig.add_axes([0.18, 0.65, 0.2, 0.2]) # the position of zoom-out plot compare to the ratio of zoom-in plot
ax_new.plot(test_GT, pdf_predict_optm, 'rD-.', linewidth=0.5, alpha = 1, label = "SSKRR", markevery = 1000, markersize = 1.5)
ax_new.plot(test_GT, pdf_predict_PCE, 'b*-.', linewidth=0.5, alpha = 1, label = " Sparse gPC", markevery = 1000, markersize = 1.5)
ax_new.plot(test_GT, pdf_predict_FullT, 'c^-.', linewidth=0.5, alpha = 1, label = "Fully tensorized gPC", markevery = 1000, markersize = 1.5)
ax_new.plot(test_GT, pdf_predict_RBF, 'ms-.', linewidth=0.5, alpha = 1, label = "KRR", markevery = 1000, markersize = 1.5)
ax_new.plot(test_GT, pdf_predict_GT, 'k', linestyle = 'dotted', linewidth=0.5, alpha = 1, label = "Ground truth")
ax_new.set_xlim(3, 7)
ax_new.set_ylim(0.081, 0.11)
plt.show()

# Rosenbrock test function

## Rosenbrock function parameters

In [14]:
def FGT(_x):
    dimension = np.shape(_x)[1]
    rosenbrock = 0
    for d in range(dimension-1):
        rosenbrock = rosenbrock + (100*(_x[:, d+1] - _x[:, d]**2)**2 + (1-_x[:, d])**2)
    return rosenbrock

performance_func = "Rosenbroke_10D"

# Parameters of the study
number_of_seeds = 10
number_of_observations_LD_list = [400]

# Bounds of the input space
dimension = 10
bounds = np.zeros((dimension, 2))
bounds[:, 0], bounds[:, 1] = -2, 2
# Scaling the inputs
scaler = sklearn.preprocessing.MinMaxScaler((-1, 1))
scaler.fit(bounds.T)

MinMaxScaler(feature_range=(-1, 1))

## Surrogate modeling methods

### SSKRR Algorithm and sparse gPC

In [None]:
# Some useful parameters
nuggets = 10**(np.linspace(np.log10(1e-12), np.log10(1e0), num = 31))

# Save the predictions on the test dataset
Y_SSKRR_TD_list = []
Y_PCE_TD_list = []
Y_TD_list = []

# Total order of the basis
total_order = 4

# Compute the corresponding indexes
indexes, _ = MultiIndex.MultiIndex(total_order, dimension)

# Corresponds to uniform distribution
alpha = [0]*dimension; beta = [0]*dimension

for number_of_observations_LD in number_of_observations_LD_list:
    for current_seed in range(number_of_seeds):
        print(f"Current seed: {current_seed}")
        seed = current_seed
        number_of_observations = number_of_observations_LD

        # Define the learning set
        # Number of observatitons of the learning set
        N_LD = number_of_observations

        # Sampling the learning set
        sampling = LHS_sampling(xlimits = bounds, criterion = "maximin", random_state = seed)
        X_LD = sampling(N_LD)
        X_LD_scaled = scaler.transform(X_LD)
        Y_LD = FGT(X_LD)

        _, dimension = np.shape(X_LD)
        kappa_var = np.var(Y_LD)

        # SPGL1 Optimization
        # min |c|_L1 s. t. ||Tc - b||_2 <= epsilon

        print(f"Compute theta for SPGL1 using PCE with total order {total_order} corresponding to {len(indexes)} expansion coefficients")
        theta = np.zeros((N_LD, len(indexes)))
        for obs in range(N_LD):
            theta[obs, :] = MPJacn.MPJacn(X_LD_scaled[obs, :], total_order, alpha, beta, indexes)

        # Convergence criterion for SPGL1
        sigma = 1e-6
        opt_tol = 1e-7

        # SPGL1 solver
        c, resid, grad, info = spgl1.spg_bpdn(theta, np.reshape(Y_LD, (-1,)), sigma, verbosity=3, iter_lim = int(1e5), opt_tol = opt_tol)

        # Plot the SPGL1 solution of the expansion coefficients
        plt.figure()
        plt.plot(abs(c), 'or', markersize=3, mec='k', mew = 0.5)
        plt.xlabel(r"i--th Legendre polynomial"); plt.ylabel(r"$|c_i|$")
        plt.grid(True, color = "black", which="major", ls="--", linewidth = 0.25)
        plt.grid(True, color = "gray", which="minor", ls="--", linewidth = 0.25)
        plt.show()
        plt.close('all')

        plt.figure()
        plt.plot(abs(c), 'or', markersize=3, mec='k', mew = 0.5)
        plt.yscale('log')
        plt.xlabel(r"i--th Legendre polynomial"); plt.ylabel(r"$|c_i|$")
        plt.grid(True, color = "black", which="major", ls="--", linewidth = 0.25)
        plt.grid(True, color = "gray", which="minor", ls="--", linewidth = 0.25)
        plt.show()
        plt.close('all')

        # c_k values and number of basis functions
        ck_value, R_S = c, np.shape(c)[0]

        # Matrix corresponding to the values of the basis at the observations
        e_sparse = theta

        # PCE
        # Norm for moment order of PCE
        norm_moment_order = 1
        for dim in range(dimension):
            norm_moment_order = norm_moment_order*(2**(alpha[dim]+beta[dim]+1)*factorial(alpha[dim])*factorial(beta[dim])/factorial(alpha[dim]+beta[dim]+1))
        norm_moment_order = np.sqrt(norm_moment_order)

        # Mean using PCE
        mean_PCE = ck_value[0] / norm_moment_order
        # Variance using PCE
        var_PCE = np.sum(ck_value[1:]**2) / norm_moment_order**2

        # SSKRR Algorithm
        # Minimize the eigenvalues
        sigma_min = []
        den = np.sum(np.abs(ck_value))
        for k in range(R_S):
            num = np.abs(ck_value[k])
            sigma_temp = kappa_var*(num/den)
            sigma_min.append(sigma_temp)

        sigma_min = np.array(sigma_min).reshape((-1,))

        # Test dataset
        # Compute the error over test_sampling using N_TD sampling
        N_TD = int(1e5)
        np.random.seed(seed)
        X_TD_scaled = np.random.uniform(-1, 1, size=(N_TD, dimension))
        X_TD = scaler.inverse_transform(X_TD_scaled)
        Y_TD = FGT(X_TD)
        Y_TD_list.append(Y_TD)

        # Compute PCE in the test dataset
        e_PCE = np.zeros((N_TD, R_S))
        for idx in range(R_S):
            e_PCE[:, idx] = ProdLegendrePoly(X_TD_scaled, indexes[idx]).reshape((-1, ))
        Y_PCE = e_PCE @ np.array(ck_value)
        Y_PCE_TD_list.append(Y_PCE)

        RMSE_gPC = np.sqrt(np.mean((Y_PCE - Y_TD)**2))

        # Find the best value of the nugget through grid search
        RMSE_optm_list = []
        for nugget in nuggets:
            # Prediction mean of the GP using the kernel given by the SSKRR algorithm
            K_OBS = e_sparse @ np.diag(sigma_min) @ e_sparse.T
            K_OBS = K_OBS + nugget*np.eye(N_LD)
            K_PREDICT = e_PCE @ np.diag(sigma_min) @ e_sparse.T
            Y_predict_optm = K_PREDICT @ np.linalg.solve(K_OBS, Y_LD)

            # Save the current RMSE on the test set
            RMSE_optm_list.append(np.sqrt(np.mean((Y_TD - Y_predict_optm)**2)))

            # Only keep the best kernel
            if (len(RMSE_optm_list) == 1) or (RMSE_optm_list[-1] < np.min(RMSE_optm_list[:-1])):
                # Value of the minimum of the RMSE
                min_RMSE_SSKRR = RMSE_optm_list[-1]

                # Kappa corresponding to the minimum of the RMSE
                min_nugget = nugget

                # Best prediction corresponding to the minimum of the RMSE
                Y_best_predict_optm = Y_predict_optm

        Y_SSKRR_TD_list.append(Y_best_predict_optm)

        print(f"Log10(RMSE) with SSKRR algorithm: {np.log10(min_RMSE_SSKRR)}")
        print(f"Log10(RMSE) with gPC: {np.log10(RMSE_gPC)}")

### Gaussian process regression with Gaussian kernel

In [None]:
init_nugget_kernel = 0
Y_GPR_list = []
for number_of_observations_LD in number_of_observations_LD_list:
    for current_seed in range(number_of_seeds):
        # Current seed for LHS
        seed = current_seed

        # Define the learning set
        # Number of observatitons of the learning set
        N_LD = number_of_observations_LD

        # Sampling the learning set
        sampling = LHS_sampling(xlimits = bounds, criterion = "maximin", random_state = seed)
        X_LD = sampling(N_LD)
        X_LD_scaled = scaler.transform(X_LD)
        Y_LD = FGT(X_LD)

        _, dimension = np.shape(X_LD)
        kappa_var = np.var(Y_LD)
        # Test dataset
        # Compute the error over test_sampling using N_TD sampling
        N_TD = int(1e5)
        np.random.seed(seed)
        X_TD_scaled = np.random.uniform(-1, 1, size=(N_TD, dimension))
        X_TD = scaler.inverse_transform(X_TD_scaled)
        Y_TD = FGT(X_TD)

        # Initial lengthscale
        # One lengthscale per input dimension
        gamma = []
        for d in range(dimension):
            temp = compute_RMS_from_data(X_LD_scaled[:, d])/np.sqrt(2)
            gamma.append(temp)

        # GPR using pyro
        Nf, Nc = (N_LD, int(N_LD/2))
        ite_max = int(10000)
        ite_save = int(100)

        # Pif and Pic
        Pif, Pic = create_pif_pic(N_LD, Nf, Nc, seed = None)

        # Parameters of the algo
        variable_gamma = torch.tensor(gamma, requires_grad=True)
        nugget_optm = torch.tensor([init_nugget_kernel], requires_grad=False)
        nugget_optm_list = []
        nugget_optm_list.append(torch.clone(nugget_optm).detach().numpy())
        variable_gamma_optm = [torch.clone(variable_gamma).detach().numpy()]

        # Choice of the kernel
        kernel = gp.kernels.RBF(input_dim = dimension, variance=torch.tensor(1.), lengthscale=variable_gamma) # RBF

        # Compute the loss and choose the optimizer
        loss = compute_rho_torch_pyro(torch.from_numpy(X_LD_scaled), torch.tensor(Y_LD), Pif, Pic, nugget_optm, kernel)
        optimizer = torch.optim.Adam([kernel.lengthscale_unconstrained, nugget_optm], lr = 3e-4)

        # Clear pyro parameters for new loop
        pyro.clear_param_store()

        # GPR Prediction
        gpr_opt = gp.models.GPRegression(torch.from_numpy(X_LD_scaled), torch.from_numpy(Y_LD), kernel, noise=torch.tensor(0.), jitter = nugget_optm)
        value_torch, _ = gpr_opt(torch.from_numpy(X_TD_scaled), full_cov=False, noiseless=True)
        value_torch = torch.clone(value_torch).detach().numpy()
        RMSE_torch = np.sqrt(np.mean((Y_TD - value_torch)**2))
        rho_optm = [loss]
        print(f"rho_optm: {rho_optm}")
        ite_list = [0]
        RMSE_torch_list_temp = [RMSE_torch]
        for k in range(ite_max):
            optimizer.zero_grad()
            loss = compute_rho_torch_pyro(torch.from_numpy(X_LD_scaled), torch.tensor(Y_LD), Pif, Pic, nugget_optm, kernel)
            loss.backward()
            optimizer.step()
            with torch.no_grad():
                kernel.lengthscale[:] = kernel.lengthscale.clamp(0, 1e6)
                nugget_optm[:] = nugget_optm.clamp(0, 1e6)
                if (k+1)%ite_save == 0:
                    print(f"Learning: Iteration n. {k+1} / {ite_max}")

                    # Print current length scales of the kernel
                    print(kernel.lengthscale)

                    # Store loss value and current optimized parameters
                    rho_optm.append(loss.item())
                    ite_list.append((k+1))
                    variable_gamma_optm.append(torch.clone(kernel.lengthscale).detach().numpy())
                    nugget_optm_list.append(torch.clone(nugget_optm).detach().numpy())

                    # Compute the test error
                    gpr_opt = gp.models.GPRegression(torch.from_numpy(X_LD_scaled), torch.from_numpy(Y_LD), kernel, noise=torch.tensor(0.), jitter = nugget_optm)
                    value_torch, _ = gpr_opt(torch.from_numpy(X_TD_scaled), full_cov=False, noiseless=True)
                    value_torch = torch.clone(value_torch).detach().numpy()
                    RMSE_torch = np.sqrt(np.mean((Y_TD - value_torch)**2))
                    RMSE_torch_list_temp.append(RMSE_torch)

            # Update new batch
            Pif, Pic = create_pif_pic(N_LD, Nf, Nc, seed = None)

        plt.figure()
        plt.plot(ite_list, RMSE_torch_list_temp, '-ok', markersize = 4, markeredgecolor = 'r', markeredgewidth = 1)
        plt.yscale("log")
        plt.xlabel(r"Number of iterations")
        plt.ylabel(r"$\log_{10}(e_{RMSE})$")
        plt.show()
        plt.close('all')

        plt.figure()
        plt.plot(ite_list, rho_optm, '-ok', markersize = 4, markeredgecolor = 'r', markeredgewidth = 1)
        plt.yscale("log")
        plt.xlabel(r"Number of iterations")
        plt.ylabel(r"$\rho$")
        plt.show()
        plt.close('all')

        plt.figure()
        plt.plot(ite_list, nugget_optm_list, '-ok', markersize = 4, markeredgecolor = 'r', markeredgewidth = 1)
        plt.yscale("log")
        plt.xlabel(r"Number of iterations")
        plt.ylabel(r"$\lambda$")
        plt.show()
        plt.close('all')

        # Compute with the best hyperparameters on the test set
        idx_min_RMSE = np.argmin(RMSE_torch_list_temp)
        gamma_min = variable_gamma_optm[idx_min_RMSE]
        kernel_best = gp.kernels.RBF(input_dim = dimension, variance=torch.tensor(1.), lengthscale=torch.from_numpy(gamma_min)) # RBF
        gpr_opt_best = gp.models.GPRegression(torch.from_numpy(X_LD_scaled), torch.from_numpy(Y_LD), kernel_best, noise=torch.tensor(0.), jitter = nugget_optm)
        value_torch_best, _ = gpr_opt_best(torch.from_numpy(X_TD_scaled), full_cov=False, noiseless=True)
        value_torch_best = torch.clone(value_torch_best).detach().numpy()

        Y_GPR_list.append(value_torch_best)

        print(f"Log10(RMSE): {np.log10(RMSE_torch_list_temp[idx_min_RMSE])}")

## Post-processing

### Box-plot

In [None]:
NRMSE_optm_list, RMSE_optm_list = [], []
NRMSE_PCE_list, RMSE_PCE_list = [], []
NRMSE_RBF_list, RMSE_RBF_list, = [], []

for k in range(number_of_seeds):
        Y_TD_current = Y_TD_list[k]
        Y_PCE_current = Y_PCE_TD_list[k]
        Y_GPR_current = Y_GPR_list[k]
        Y_SSKRR_current = Y_SSKRR_TD_list[k]

        # NRMSE
        NRMSE_PCE_list.append(np.sqrt(np.sum((Y_TD_current - Y_PCE_current)**2) / np.sum((Y_TD_current)**2)))
        NRMSE_optm_list.append(np.sqrt(np.sum((Y_TD_current - Y_SSKRR_current)**2) / np.sum((Y_TD_current)**2)))
        NRMSE_RBF_list.append(np.sqrt(np.sum((Y_TD_current - Y_GPR_current)**2) / np.sum((Y_TD_current)**2)))

        # RMSE
        RMSE_PCE_list.append(np.sqrt(np.mean((Y_TD_current - Y_PCE_current)**2)))
        RMSE_optm_list.append(np.sqrt(np.mean((Y_TD_current - Y_SSKRR_current)**2)))
        RMSE_RBF_list.append(np.sqrt(np.mean((Y_TD_current - Y_GPR_current)**2)))

### PDF

In [None]:
Y_TD_current = Y_TD_list[0]
Y_PCE_current = Y_PCE_TD_list[0]
Y_GPR_current = Y_GPR_list[0]
Y_SSKRR_current = Y_SSKRR_TD_list[0]

# Ground Truth
GT_data = Y_TD_current
test_GT = np.linspace(np.min(GT_data), np.max(GT_data), num = int(1e5))
pdf_predict_GT = stats.gaussian_kde(GT_data)(test_GT)
pdf_predict_GT_sorted = np.sort(pdf_predict_GT)

# Sparse gPC
gPC_data = Y_PCE_current
pdf_predict_PCE = stats.gaussian_kde(gPC_data)(test_GT)
pdf_predict_PCE_sorted = np.sort(pdf_predict_PCE)

# SSKRR
optm_data = Y_SSKRR_current
pdf_predict_optm = stats.gaussian_kde(optm_data)(test_GT)
pdf_predict_optm_sorted = np.sort(pdf_predict_optm)

# KRR with Gaussian Kernel
RBF_data = Y_GPR_current
pdf_predict_RBF = stats.gaussian_kde(RBF_data)(test_GT)
pdf_predict_RBF_sorted = np.sort(pdf_predict_RBF)

# Full tensorized gPC
FullT_data = Y_FT_PCE_current
pdf_predict_FullT = stats.gaussian_kde(FullT_data)(test_GT)
pdf_predict_FullT_sorted = np.sort(pdf_predict_FullT)