# Example
### __Multifold cross-validation for GP regression__

In [None]:
import random
from typing import Any, Dict, List, Tuple, Union

import gpytorch
import matplotlib.pyplot as plt
import numpy as np
import torch
from nptyping import Complex, Float, Int, NDArray, Shape

# Import GP cross validation and metrics library
import rapid_models.gp_diagnostics.cv as gp_cv
import rapid_models.gp_diagnostics.metrics as gp_metrics
# For GP modelling
import rapid_models.gp_models.utils as gputils
from rapid_models.gp_diagnostics.utils.stats import snorm_qq
from rapid_models.gp_models.templates import ExactGPModel

%matplotlib inline


# DNV primary colors 
clr = {
    'Sky blue':'#99d9f0',
    'Land green':'#3f9c35',
    'Sea blue':'#003591',
    'Dark blue':'#0f204b',
    'Cyan':'#009fda',
}

First we generate some dummy data

In [None]:
# Generate data
random.seed(42)
torch.manual_seed(42)
np.random.seed(42) 

N_DIM: int = 2 # the X dimension
N_TRAIN: int = 12

KER_SCALE_TRUE: float = 1.0
KER_LENGTHSCALE_TRUE: torch.Tensor = torch.ones(N_DIM+1)*0.5
KER_LENGTHSCALE_TRUE[-1] = KER_LENGTHSCALE_TRUE[-1]*0.5 # Shorter lengthscale in the t-dimension

N_T_TRAIN: NDArray[Shape['N_TRAIN'], Int] 
N_T_TRAIN = np.random.randint(5, 20, size = N_TRAIN) # Different number of observations in the t-dimension for each observed function 

T_TRAIN: List[NDArray[Shape['*'], Float]]
T_TRAIN = [np.linspace(np.random.uniform(0, 0.1), np.random.uniform(0.9, 1.0), n) for n in N_T_TRAIN]

X_TRAIN: NDArray[Shape['N_TRAIN, N_DIM'], Float] 
X_TRAIN = np.array([
    [0.82, 0.88], 
    [0.84, 0.81],
    [0.7, 0.22], 
    [0.2, 0.6]
])
X_TRAIN = np.append(X_TRAIN, np.random.uniform(size = (N_TRAIN-4, N_DIM)), axis = 0)

from_to: List[int] = [0] + list(N_T_TRAIN.cumsum())
FOLDS_INDICES: List[List[int]] = [list(range(from_to[i], from_to[i+1])) for i in range(N_TRAIN)]

N_XT_TRAIN: int = int(N_T_TRAIN.sum())
N_XT_DIM: int = N_DIM + 1

XT_TRAIN: NDArray[Shape['N_XT_TRAIN, N_XT_DIM'], Float]
XT_TRAIN = np.zeros((N_XT_TRAIN, N_XT_DIM))
for i in range(N_TRAIN):
    XT_TRAIN[FOLDS_INDICES[i], 0:N_DIM] = X_TRAIN[i]
    XT_TRAIN[FOLDS_INDICES[i], -1] = T_TRAIN[i]


In [None]:
# Define kernel and sample training data

# Kernel
ker: gpytorch.kernels.Kernel = gputils.gpytorch_kernel_Matern(
    outputscale=KER_SCALE_TRUE, 
    lengthscale=KER_LENGTHSCALE_TRUE,
)

# Covariance matrix; shape=(N_XT_TRAIN, N_XT_TRAIN)
K: gpytorch.lazy.LazyEvaluatedKernelTensor = ker(torch.tensor(XT_TRAIN, dtype=torch.float))  # type: ignore

# Distribution
normal_rv: gpytorch.distributions.Distribution = gpytorch.distributions.MultivariateNormal(
    mean = torch.zeros(K.shape[0]),  # vector; shape=(N_XT_TRAIN)
    covariance_matrix = K,  # matrix; shape=(N_XT_TRAIN, N_XT_TRAIN)
)

# Sample training data
Y_TRAIN: torch.Tensor = normal_rv.sample()  # vector; shape=(N_XT_TRAIN)
YT_TRAIN: List[torch.Tensor] = [Y_TRAIN[t_idx] for t_idx in FOLDS_INDICES]
pass


In [None]:
fig, axs = plt.subplots(ncols = 2, figsize = (20, 8), gridspec_kw={'width_ratios': [1, 2]})
axs = axs.ravel() 

# y(t) 
ax = axs[1]
colors = [clr['Dark blue'], clr['Sea blue'], clr['Land green'], clr['Cyan']]
for i in range (4):
    ax.plot(T_TRAIN[i], YT_TRAIN[i], marker = '.', markersize = 15, label = r'$(x_1, x_2) = ({:.2f}, {:.2f})$'.format(X_TRAIN[i][0], X_TRAIN[i][1]), color = colors[i]) 
for i in range (4, N_TRAIN):
    ax.plot(T_TRAIN[i], YT_TRAIN[i], marker = '.', markersize = 15, color = clr['Sky blue'], alpha = 0.5) 

ax.set_xlabel('t', fontsize = 16)
ax.set_ylabel('y', fontsize = 16)
ax.set_title(r'$y(x_1, x_2, t)$', fontsize = 16)
ax.legend()

# x1, x2 - space
ax = axs[0]
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.set_xlabel(r'$x_1$', fontsize = 16)
ax.set_ylabel(r'$x_2$', fontsize = 16)
ax.set_title(r'$(x_1, x_2)$', fontsize = 16)

for i in range(4):
    ax.scatter(X_TRAIN[i,0], X_TRAIN[i,1], marker = 'x', s = 60, color = clr['Dark blue'])
ax.scatter(X_TRAIN[:,0], X_TRAIN[:,1], marker = 'x', s = 60, color = clr['Dark blue'])

fig.tight_layout()

## LOO example

## 1) Fit a GP to some data

In [None]:
# Training data

X_Train = torch.tensor(T_TRAIN[2][::2].reshape(-1, 1), dtype=torch.float)
Y_Train = torch.tensor(YT_TRAIN[2][::2], dtype=torch.float) 

# fig, ax = plt.subplots(figsize = (16, 8))

# ax.plot(X_Train, Y_Train, marker = '.', markersize = 15, color = clr['Dark blue']) 
# ax.set_xlabel('t', fontsize = 16)
# ax.set_ylabel('y', fontsize = 16)
# ax.set_title(r'Training data', fontsize = 16)

In [None]:
# Fit GP: Create and fit GP model to training data

# Kernel
ker: gpytorch.kernels.Kernel = gputils.gpytorch_kernel_Matern(
    outputscale=KER_SCALE_TRUE, 
    lengthscale=KER_LENGTHSCALE_TRUE[-1].reshape(1)
    )
# Covariance matrix
K: gpytorch.lazy.LazyEvaluatedKernelTensor = ker(X_Train)  # type: ignore
# Create and fit GP model to training data
model: ExactGPModel = ExactGPModel(
                X_Train, Y_Train,                                                                   # Training data
                gputils.gpytorch_mean_constant(0.0, fixed = True),                                  # Mean function
                ker,                                                                                # Kernel
                gputils.gpytorch_likelihood_gaussian(variance = 1e-6, fixed = False),               # Likelihood
                '', '') # Name and path for save/load


In [None]:
# Evaluate GP: Draw 20 samples from GP model

# 20 samples = 20 sampled time series of 100 data points each
N_SAMPLES: int = 20  # number of samples to be drawn
N_SAMPLE_POINTS: int = 100  # number of data points in each sample

# Input: time (single dimension input data)
t: torch.Tensor = torch.linspace(0, 1, N_SAMPLE_POINTS).reshape(-1, 1)  # time; shape=(N_SAMPLE_POINTS, 1)

# Mean and covariance matrix
model.eval_mode()
m: torch.Tensor  # mean vector; shape=(N_SAMPLE_POINTS)
v: torch.Tensor  # covariance matrix; shape=(N_SAMPLE_POINTS, N_SAMPLE_POINTS)
m, v = model.predict(t, latent=True, full_cov=True, CG_tol = 0.001)

# Add a jitter to covariance matrix
w: Union[NDArray[Shape['N_SAMPLE_POINTS'], Float], NDArray[Shape['N_SAMPLE_POINTS'], Complex]]  # Eigenvalues of covariance matrix
w, _ = np.linalg.eig(v)
min_eigval: Union[float, complex] = w.min()
print(min_eigval)
jitter: float = abs(min_eigval*2)
w, _ = np.linalg.eig(v + torch.eye(t.shape[0])*jitter)
print(w.min())
v_with_jitter: torch.Tensor = v + torch.eye(t.shape[0])*jitter

# Distribution (incl. jitter)
normal_rv: gpytorch.distributions.Distribution = gpytorch.distributions.MultivariateNormal(
    mean=m,  # mean vector; shape=(N_SAMPLE_POINTS)
    covariance_matrix=v_with_jitter,  # covariance matrix with jitter; shape=(N_SAMPLE_POINTS, N_SAMPLE_POINTS)
)

# Draw 20 samples from the distribution
samples: NDArray[Shape['N_SAMPLES, N_SAMPLE_POINTS'], Float]
samples = np.array([normal_rv.sample().numpy() for _ in range(N_SAMPLES)])

In [None]:
fig, ax = plt.subplots(figsize = (16, 8))

ax.scatter(X_Train, Y_Train, marker = '.', s = 150, color = clr['Dark blue']) 
ax.plot(t.flatten(), m, color = clr['Dark blue'])
ax.fill_between(t.flatten(), m - 2*v.diagonal()**0.5, m + 2*v.diagonal()**0.5, color = clr['Cyan'], alpha = 0.1)
ax.set_xlabel('t', fontsize = 16)
ax.set_ylabel('y', fontsize = 16)
ax.set_title(r'GP fitted to {} (noiseless) observations $y(t_1), y(t_2), ...$'.format(Y_Train.shape[0]), fontsize = 16)

ax.plot(t, samples.T, color = 'red', alpha = 0.3, linewidth = 0.3)
fig.tight_layout()

## 2) Compute LOO errors

In [None]:
# Compute Leave-One-Out (LOO) residuals for GP regression 
# (residual = observed - predicted)

N_Y_TRAIN: int = Y_Train.shape[0]  # number of observations

mean: Union[NDArray[Shape['N_Y_TRAIN'], Float], None]  # Mean of CV residuals
cov: Union[NDArray[Shape['N_Y_TRAIN, N_Y_TRAIN'], Float], None]  # Covariance of CV residuals
residuals_transformed: Union[NDArray[Shape['N_Y_TRAIN'], Float], None]  # The residuals transformed to the standard normal space

# To compute the LOO residuals we only need the covariance matrix 
# and the training observations (converted to numpy arrays)
mean, cov, residuals_transformed = gp_cv.loo(
    K.numpy(),  # covariance matrix
    Y_Train.numpy(),  # training observations
    noise_variance = 0.,  # Gaussian noise
    )

# Assert that results are not None
# Note:  This is necessary as loo() returns (None, None, None) in case 
#        the lower triangular cholesky factor could not successfully be computed.
# @TODO: We should consider to change this behaviour in loo() 
#        and i.e. rather raise an exception instead. This would create a cleaner API.
#        CLAROS, 2022-11-02
assert mean is not None
assert cov is not None
assert residuals_transformed is not None

# 'residuals_transformed' is the residuals transformed to the standard normal space.
# We will see that this is not the same as normalizing the individual residuals (which will remain correlated)
residuals_scaled: NDArray[Shape['N_Y_TRAIN'], Float] = mean / cov.diagonal()**0.5

In [None]:
# Function for plotting residuals 
def plotres(ax, res, lbl = ''):
    """
    Compute QQ plot of residuals 'res' and plot to ax
    """
    q_sample, q_snorm, q_snorm_upper, q_snorm_lower = snorm_qq(res)
    ax.scatter(q_snorm, q_sample, marker = 'o', facecolors='none', edgecolors='k')
    ax.plot(q_snorm_upper, q_sample, 'k--')
    ax.plot(q_snorm_lower, q_sample, 'k--')
    ax.set_xlabel('Theoretical quantiles')
    ax.set_ylabel('Sample quantiles')
    ax.set_title('Normal Q-Q Plot of {}'.format(lbl))
    line_min = min(q_snorm.min(), q_sample.min())*1.1
    line_max = max(q_snorm.max(), q_sample.max())*1.1
    ax.plot([line_min, line_max], [line_min, line_max], 'k')
    ax.set_xlim(line_min, line_max)
    ax.set_ylim(line_min, line_max)

In [None]:
# Plot residuals 
fig, axs = plt.subplots(ncols = 2, figsize = (18, 8))
axs = axs.ravel()

plotres(axs[0], residuals_transformed, 'Transformed LOO Residuals')
plotres(axs[1], residuals_scaled, 'Standardized LOO Residuals')

In [None]:
metrics: Union[Dict[str, Any], None]
metrics = gp_metrics.evaluate_GP(K.numpy(), Y_Train.numpy(), noise_variance = 0) 

assert metrics is not None

for key in ['log_marginal_likelihood', 'log_pseudo_likelihood', 'MSE']:
    print(key, metrics[key])

# Multifold CV example

In [None]:
fig, axs = plt.subplots(ncols = 2, figsize = (20, 8), gridspec_kw={'width_ratios': [1, 2]})
axs = axs.ravel() 

# y(t) 
ax = axs[1]
colors = [clr['Dark blue'], clr['Sea blue'], clr['Land green'], clr['Cyan']]
i = 2
ax.plot(T_TRAIN[i], YT_TRAIN[i], marker = '.', markersize = 15, label = r'$(x_1, x_2) = ({:.2f}, {:.2f})$'.format(X_TRAIN[i][0], X_TRAIN[i][1]), color = clr['Land green']) 
for i in [0, 1]:
    ax.plot(T_TRAIN[i], YT_TRAIN[i], marker = '.', markersize = 15, label = r'$(x_1, x_2) = ({:.2f}, {:.2f})$'.format(X_TRAIN[i][0], X_TRAIN[i][1]), color = clr['Dark blue']) 
for i in range (3, N_TRAIN):
    ax.plot(T_TRAIN[i], YT_TRAIN[i], marker = '.', markersize = 15, color = clr['Sky blue'], alpha = 0.5) 

ax.set_xlabel('t', fontsize = 16)
ax.set_ylabel('y', fontsize = 16)
ax.set_title(r'$y(x_1, x_2, t)$', fontsize = 16)
ax.legend()

# x1, x2 - space
ax = axs[0]
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.set_xlabel(r'$x_1$', fontsize = 16)
ax.set_ylabel(r'$x_2$', fontsize = 16)
ax.set_title(r'$(x_1, x_2)$', fontsize = 16)

i = 2
ax.scatter(X_TRAIN[i,0], X_TRAIN[i,1], marker = 'o', facecolors='none', edgecolors=clr['Land green'], s = 140)

for i in [0, 1]:
    ax.scatter(X_TRAIN[i,0], X_TRAIN[i,1], marker = 'o', facecolors='none', edgecolors=clr['Dark blue'], s = 140)

ax.scatter(X_TRAIN[:,0], X_TRAIN[:,1], marker = 'x', s = 60, color = clr['Cyan'])

fig.tight_layout()

## 1) Compute GP covariance matrix from training inputs 

In [None]:
# Training data
X_Train = torch.tensor(XT_TRAIN, dtype=torch.float)
Y_Train = torch.tensor(torch.cat(YT_TRAIN), dtype=torch.float) 

# Kernel
ker: gpytorch.kernels.Kernel = gputils.gpytorch_kernel_Matern(
    outputscale=KER_SCALE_TRUE, 
    lengthscale=KER_LENGTHSCALE_TRUE,
    )

# Covariance matrix
K: gpytorch.lazy.LazyEvaluatedKernelTensor = ker(X_Train)  # type: ignore


## 2) Define folds

In [None]:
# Here, the folds are naturally given from the training data, as a "single observation" corresponds to 
# observing the function y(t), at some times t_1, t_2, ..., for one specific value of (x_1, x_2)
folds_startstop = np.array([0] + [Y.shape[0] for Y in YT_TRAIN]).cumsum()
folds = [list(range(folds_startstop[i], folds_startstop[i+1])) for i in range(len(folds_startstop)-1)]

print('There are a total of {} folds with the following indices:'.format(len(folds)))
display(folds)


## 3) Compute CV residuals

In [None]:
# To compute the LOO residuals we only need the covariance matrix and the training observations (converted to numpy arrays)
mean, cov, residuals_transformed = gp_cv.multifold(K.numpy(), Y_Train.numpy(), folds, noise_variance = 0) 

# 'residuals_transformed' is the residuals transformed to the standard normal space
# we will see that this is not the same normalizing the individual residuals (which will remain correlated)
residuals_scaled = mean / cov.diagonal()**0.5

In [None]:
# Plot residuals 
fig, axs = plt.subplots(ncols = 2, figsize = (18, 8))
axs = axs.ravel()

plotres(axs[0], residuals_transformed, 'Transformed LOO Residuals')
plotres(axs[1], residuals_scaled, 'Standardized LOO Residuals')

In [None]:
# Color by each fold 
fig, ax = plt.subplots(figsize = (12, 8))

q_sample, q_snorm, q_snorm_upper, q_snorm_lower = snorm_qq(residuals_transformed)
ax.plot(q_snorm_upper, q_sample, 'k--')
ax.plot(q_snorm_lower, q_sample, 'k--')
ax.set_xlabel('Theoretical quantiles', fontsize = 15)
ax.set_ylabel('Sample quantiles', fontsize = 15)
ax.set_title('Normal Q-Q Plot of Transformed LOO Residuals', fontsize = 20)
line_min = min(q_snorm.min(), q_sample.min())*1.1
line_max = max(q_snorm.max(), q_sample.max())*1.1
ax.plot([line_min, line_max], [line_min, line_max], 'k')
ax.set_xlim(line_min, line_max)
ax.set_ylim(line_min, line_max)

for i in range(len(folds)):
    ax.scatter(q_snorm[folds[i]], q_sample[folds[i]], marker = 'o', label = 'Fold_{}'.format(i))

ax.legend()

In [None]:
fig, ax = plt.subplots(figsize = (12, 8))

y_obs = Y_Train
y_pred_mean = y_obs - mean
y_pred_std = cov.diagonal()**0.5

ax.set_xlabel('Observed', fontsize = 15)
ax.set_ylabel('Predicted', fontsize = 15)

line_min = min(y_obs.min(), y_pred_mean.min())*1.1
line_max = max(y_obs.max(), y_pred_mean.max())*1.1
ax.plot([line_min, line_max], [line_min, line_max], 'k')
ax.set_xlim(line_min, line_max)
ax.set_ylim(line_min, line_max)

ax.errorbar(y_obs, y_pred_mean, yerr=2*y_pred_std, fmt='o', color = 'k')

for i in range(len(folds)):
    ax.errorbar(y_obs[folds[i]], y_pred_mean[folds[i]], yerr=2*y_pred_std[folds[i]], fmt='o', label = 'Fold_{}'.format(i))

ax.legend()

In [None]:
fig, ax = plt.subplots(figsize = (12, 8))

y_obs = Y_Train
y_pred_mean = y_obs - mean
y_pred_std = cov.diagonal()**0.5

ax.set_xlabel('Observed', fontsize = 15)
ax.set_ylabel('Predicted', fontsize = 15)

line_min = min(y_obs.min(), y_pred_mean.min())*1.1
line_max = max(y_obs.max(), y_pred_mean.max())*1.1
ax.plot([line_min, line_max], [line_min, line_max], 'k')
ax.set_xlim(line_min, line_max)
ax.set_ylim(line_min, line_max)

ax.errorbar(y_obs, y_pred_mean, yerr=2*y_pred_std, fmt='o', color = 'k')

for i in range(len(folds)):
    ax.errorbar(y_obs[folds[i]], y_pred_mean[folds[i]], yerr=2*y_pred_std[folds[i]], fmt='o', label = 'Fold_{}'.format(i))

ax.legend()

# Use the 'metrics.evaluate_GP()' function to compute a set of relevant evaluation metrics

In [None]:
metrics = gp_metrics.evaluate_GP(K.numpy(), Y_Train.numpy(), folds, noise_variance = 0) 

for key in ['log_marginal_likelihood', 'log_pseudo_likelihood', 'MSE']:
    print(key, metrics[key])

In [None]:
%timeit gp_metrics.evaluate_GP(K.numpy(), Y_Train.numpy(), folds, noise_variance = 0) 

# LOO for the same data (not correct)

In [None]:
metrics = gp_metrics.evaluate_GP(K.numpy(), Y_Train.numpy(), folds = None, noise_variance = 0) 

for key in ['log_marginal_likelihood', 'log_pseudo_likelihood', 'MSE']:
    print(key, metrics[key])

In [None]:
# To compute the LOO residuals we only need the covariance matrix and the training observations (converted to numpy arrays)
mean, cov, residuals_transformed = gp_cv.loo(K.numpy(), Y_Train.numpy(), noise_variance = 0) 

# 'residuals_transformed' is the residuals transformed to the standard normal space
# we will see that this is not the same normalizing the individual residuals (which will remain correlated)
residuals_scaled = mean / cov.diagonal()**0.5

In [None]:
# Color by each fold 
fig, ax = plt.subplots(figsize = (12, 8))

q_sample, q_snorm, q_snorm_upper, q_snorm_lower = snorm_qq(residuals_transformed)
ax.plot(q_snorm_upper, q_sample, 'k--')
ax.plot(q_snorm_lower, q_sample, 'k--')
ax.set_xlabel('Theoretical quantiles', fontsize = 15)
ax.set_ylabel('Sample quantiles', fontsize = 15)
ax.set_title('Normal Q-Q Plot of Transformed LOO Residuals', fontsize = 20)
line_min = min(q_snorm.min(), q_sample.min())*1.1
line_max = max(q_snorm.max(), q_sample.max())*1.1
ax.plot([line_min, line_max], [line_min, line_max], 'k')
ax.set_xlim(line_min, line_max)
ax.set_ylim(line_min, line_max)

for i in range(len(folds)):
    ax.scatter(q_snorm[folds[i]], q_sample[folds[i]], marker = 'o', label = 'Fold_{}'.format(i))

ax.legend()

In [None]:
mean_multi, cov_multi, residuals_transformed_multi = gp_cv.multifold(K.numpy(), Y_Train.numpy(), folds, noise_variance = 0) 
residuals_scaled_multi = mean_multi / cov_multi.diagonal()**0.5
mean_loo, cov_loo, residuals_transformed_loo = gp_cv.loo(K.numpy(), Y_Train.numpy(), noise_variance = 0) 
residuals_scaled_loo = mean_loo / cov_loo.diagonal()**0.5


In [None]:
residuals_transformed_loo - residuals_transformed_multi
mean_multi - mean_loo
cov_multi - cov_loo
residuals_scaled_multi - residuals_scaled_loo

In [None]:
# Plot residuals 
fig, axs = plt.subplots(ncols = 2, figsize = (18, 8))
axs = axs.ravel()

plotres(axs[0], residuals_scaled_multi, 'residuals_scaled_multi')
plotres(axs[1], residuals_scaled_loo, 'residuals_scaled_loo')