# Gaussian Process 

### Data Training

In [1]:
import sys
import os

# Add the parent directory to the sys.path
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(parent_dir)

import numpy as np
from elliptic_files.train_elliptic import samples_param
from elliptic_files.FEM_Solver import FEMSolver

obs, nthetas = 6, 100
thetas  = samples_param(nthetas,nparam=2)
fem_solver = FEMSolver(np.zeros(2),vert=50)
obs_points = np.linspace(0.2,0.8,obs).reshape(-1,1)
training_data = np.zeros((nthetas,obs ))

for i,theta in enumerate(thetas):
    fem_solver.theta = theta
    fem_solver.solve()
    training_data[i,:] = fem_solver.eval_at_points(obs_points).reshape(1, -1)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import jax.numpy as jnp
from jax.scipy.linalg import solve_triangular
from scipy.optimize import minimize
import jax

jax.config.update("jax_enable_x64", True)
class KernelFunction:
    def __init__(self,kernel_type="squared_exponential"):

        # Supported kernels
        supported_covariances = {
            "squared_exponential": self.squared_exponential_cov,
            "grad_squared_exponential": self.grad_squared_exponential_cov
        }

        if kernel_type in supported_covariances:
            self.covariance = supported_covariances[kernel_type]
        else:
            raise ValueError(f"Kernel type '{kernel_type}' is not supported.")
        
    def euclidean_distance_matrix(self,x, y):
        x_sq = jnp.sum(x ** 2, axis=1, keepdims=True)
        y_sq = jnp.sum(y ** 2, axis=1, keepdims=True).T
        xy = jnp.dot(x, y.T)
        dist_sq = x_sq - 2 * xy + y_sq
        return jnp.sqrt(jnp.maximum(dist_sq, 0.0))

    def squared_exponential_cov(self,r, sigma, l):
        return (sigma**2) * jnp.exp(-0.5 * (r/l) ** 2)

    def grad_squared_exponential_cov(self, r, sigma, l):
        return ((sigma/l)**2) *r * jnp.exp(-0.5 * (r/l) ** 2)
    
    def compute_covariance(self,X,params,Y = None):
        Y = X if Y is None else Y
        d = self.euclidean_distance_matrix(X, Y)
        return self.covariance(d,*params)

class GaussianProcess:
    def __init__(self,X_train,Y_train, prior_mean =0, kernel = "squared_exponential"):

        self.X_train = jnp.array(X_train, dtype=jnp.float64)
        self.Y_train = jnp.array(Y_train, dtype=jnp.float64).reshape(-1)
        self.spatial_obs = Y_train.shape[-1]
        self.param_obs = X_train.shape[0]
        self.dim_total = self.spatial_obs*self.param_obs

        self.prior_mean = prior_mean
        self.kernel = KernelFunction(kernel_type=kernel)
        self.opt_params = None  # Store optimized parameters

    def observed_kernel(self,params):
        # Step 1: Compute covariance matrix
        cov_matrix_ob = self.kernel.compute_covariance(self.X_train,params)
        cov_matrix = jnp.kron(cov_matrix_ob, jnp.eye(self.spatial_obs, dtype=jnp.float64)) +  1e-10 * jnp.eye(self.dim_total , dtype=jnp.float64)
        # Step 2: Cholesky decomposition
        L = jnp.linalg.cholesky(cov_matrix)
        idt = jnp.eye(L.shape[0])
        # Step 3: Solve the triangular system directly
        z = solve_triangular(L, idt, lower=True)
        z_t = solve_triangular(L, idt, lower=True, trans=1)
        return L, jnp.dot(z_t, z)

    def neg_log_likelihood(self, params):
        params = jnp.array(params, dtype=jnp.float64)
        
        L, cov_inv = self.observed_kernel(params)

        # Compute log determinant of K via Cholesky: logdet(K) = 2 * sum(log(diag(L)))
        logdet_K = 2.0 * jnp.sum(jnp.log(jnp.diag(L)))

        return 0.5 * (logdet_K + jnp.dot( self.Y_train.T,jnp.dot(cov_inv, self.Y_train)) + self.dim_total * jnp.log(2 * jnp.pi))

    def nll_grad(self, params):
        """Compute the gradient of the negative log-likelihood"""
        return jax.grad(self.neg_log_likelihood)(params)
    
    def optimize_nll(self,init_params = jnp.array([1.0, 1.0])):
        res = minimize(self.neg_log_likelihood,init_params,
                        method="L-BFGS-B",jac=self.nll_grad,bounds=[(1e-5, None)] * len(init_params))
        self.opt_params = res.x  # Store optimized params
        _,self.obs_cov_inv = self.observed_kernel(self.opt_params)
    

    def predict_mean(self, x_test):
        cov_train_test_ind = self.kernel.compute_covariance(x_test,self.opt_params,Y = self.X_train)
        cov_train_test = jnp.kron(cov_train_test_ind, jnp.eye(self.spatial_obs))  
        return self.prior_mean + cov_train_test @ (self.obs_cov_inv @ (self.Y_train - self.prior_mean))
    
    
    def predict_var(self, x_test):
        cov_test_train_ind = self.kernel.compute_covariance(x_test, self.opt_params, Y=self.X_train)
        cov_test_train = np.kron(cov_test_train_ind, np.eye(self.spatial_obs))

        #cov_train_test_ind = self.kernel.compute_covariance(self.X_train, self.opt_params, Y=x_test)

        cov_test_ind = self.kernel.compute_covariance(x_test, self.opt_params)
        cov_test = np.kron(cov_test_ind, np.eye(self.spatial_obs))

        pred_var = cov_test - cov_test_train @ (self.obs_cov_inv @ cov_test_train.T)

        return pred_var


In [3]:

from scipy.stats import qmc

class LD_cube:
    
    def __init__(self, range_para, dimen = 1):
        
        self.dimen = dimen
        self.range_para = self.para_range_setter(range_para)
        self.points, self.V = self.para_to_points()
        
    def para_range_setter(self, range_para):
        
        if not isinstance(range_para[0],list) and self.dimen == 1:
            range_para = [range_para]
        elif not isinstance(range_para[0],list) and isinstance(self.dimen,int):
            range_para = [range_para] * self.dimen        
        
        return range_para
    
    def para_to_points(self):
                    
        samples =  self.generator()
                
        V = 1
        for j in range(self.dimen):
            samples[:,j] = (self.range_para[j][1]-self.range_para[j][0])*samples[:,j] + self.range_para[j][0]
            V = V * (self.range_para[j][1]-self.range_para[j][0])
        
        
        return samples, V
    
class Halton_cube(LD_cube):

    def __init__(self, range_para, n, dimen = 1):
        
        self.n = n
        super().__init__(range_para, dimen)
    
    def generator(self):
    
        sampler = qmc.Halton(d=self.dimen, scramble=False)
        
        return sampler.random(n=self.n)
    
class data:
    """
    Atrributes:
    k_dm: dimension of unkonwn parameters
    d_tr: number of training points
    n_ob: number of observation points
    
    U: d_tr*k_dm matrix, unknown parameters
    Y: d_tr*n_ob matrix, observational data
    xx: spatial points corresponding to observations
    
    """
    
    def __init__(self, U, Y, k_dm, xx = None):
        
        self.k_dm = k_dm
        self.d_tr = int(U.size/k_dm)
        self.n_ob = int(Y.size/self.d_tr)
        
        self.U = U
        self.Y = Y
        
        self.yy = Y.reshape(-1,1)
        self.xx = xx

In [4]:
import numpy as np
from scipy.spatial import distance_matrix


class kernel_function:
    def __init__(self, dimen):
        
        self.dimen = dimen
        self.K = self.cov
    
    def parameter_formalizer(self, para):
        
        para = np.atleast_2d(para).reshape(-1,1)
        
        return para
    
    def input_formalizer(self, x):
        
        return np.atleast_2d(x).reshape(-1,self.dimen)
    
    def weighted_dist_mat(self, l, x1, x2 = None):
        
        if x2 is None:
            x2 = np.copy(x1)
            
        x1 = np.atleast_2d(x1).reshape(-1,self.dimen)
        x2 = np.atleast_2d(x2).reshape(-1,self.dimen)
 
        return distance_matrix(x1/l,x2/l)
 
    def cov(self, x1, x2, n_ob):
        
        raise NotImplementedError(
            'Function cov not implemented. Please initilize a specific kernel function.')
        
    def cov_3d(self, x1, x2, n_ob):
        cov_mat = self.cov(x1,x2)
        a,b = np.shape(cov_mat)
        
        return np.broadcast_to(cov_mat,(n_ob,a,b))
    
    def cov_kron(self, x1, x2, n_ob, Spatial_cov = None):
        
        if Spatial_cov is None:
            Spatial_cov = np.eye(n_ob)
            
        cov_mat = self.cov(x1,x2)
        
        return np.kron(Spatial_cov,cov_mat)
    
class kernel_squared_exponential(kernel_function):
    
    def __init__(self, sigma_square, l, dimen = 1):
        
        self.sigma_square = sigma_square      
        self.l = self.parameter_formalizer(l)  
        self.dimen = dimen
        
        super().__init__(dimen)
        
    def cov(self, x1, x2 = None):
        
        d = self.weighted_dist_mat(self.l, x1, x2)
        
        return self.sigma_square * np.exp(-d**2/2)
    
    def cov_prime(self, x1, x2):
        
        return (-(x1.T-x2.T)/(self.l)**2) * self.cov(x1,x2)


In [5]:
import numpy as np
import jax.numpy as jnp

# Load the data using NumPy
u_Theta = np.loadtxt("2D_trainingset_N10.txt")


# Number of spatial oberservation points
dy = 6
X = np.linspace(0,1,dy)

# Convert to jax.numpy
#u_Theta = jnp.array(data_np)

N = 10
Theta_train = Halton_cube([-1,1], N+1, 2).points[1:,:]

print(Theta_train.shape,u_Theta.shape,X.shape)
tr_data = data(Theta_train, u_Theta.T, 2, X)


(10, 2) (6, 10) (6,)


In [6]:
obs, nthetas = 6, 10
thetas  = samples_param(nthetas,nparam=2)
fem_solver = FEMSolver(np.zeros(2),vert=50)
obs_points = np.linspace(0.2,0.8,obs).reshape(-1,1)
training_data = np.zeros((nthetas,obs ))

for i,theta in enumerate(thetas):
    fem_solver.theta = theta
    fem_solver.solve()
    training_data[i,:] = fem_solver.eval_at_points(obs_points).reshape(1, -1)

In [7]:
training_data.shape

(10, 6)

In [8]:
class Gp_regression:
    def __init__(self, kernel, tr_data, normalizer = 10**(-10), prior_mean = 0):
        
        self.k_dm = tr_data.k_dm
        self.d_tr = tr_data.d_tr
        self.n_ob = tr_data.n_ob
        
        self.mean = prior_mean
        self.k = kernel
        self.tr_data = tr_data
                
        self.normalizer = normalizer
                
        self.K_inv_y, self.K_tr_tr = self.inverse_precomputer()

       
    def inverse_precomputer(self):
        
        K_tr_tr = self.k.cov(self.tr_data.U)   
        K_tr_tr = np.kron(K_tr_tr, np.eye(self.n_ob)) + self.normalizer * np.eye(self.n_ob*self.d_tr)

        K_inv_y = np.linalg.solve(K_tr_tr, self.tr_data.yy - self.mean)
        
        return K_inv_y, K_tr_tr
    
    
    def predict_mean(self, test_u):
        
        K_test_tr = self.k.cov(test_u,self.tr_data.U)
        K_test_tr = np.kron(K_test_tr, np.eye(self.n_ob))#self.extend(K_test_tr)
            
        pred_mean = self.mean + K_test_tr @ self.K_inv_y
                
        return pred_mean
    
    
    def predict_var(self, test_u):
        
        K_test_tr = np.kron(self.k.cov(test_u, self.tr_data.U),np.eye(self.n_ob))
        K_tr_test = np.kron(self.k.cov(self.tr_data.U, test_u),np.eye(self.n_ob)) 
        
        pred_var = np.kron(self.k.cov(test_u), np.eye(self.n_ob))- K_test_tr @ np.linalg.solve(self.K_tr_tr, K_tr_test)
        
        return pred_var
    
    def predict_vars(self, test_u):

        pred_vars = np.zeros((len(test_u),self.n_ob,self.n_ob))
        
        i = 0
        for u in test_u:
            pred_vars[i,:,:] = self.predict_var(u)
            i = i + 1
        
        return pred_vars

    def pred_mean_prime(self, test_u, X = None):
        
        cov_prime = self.k.cov_prime(test_u,self.tr_data.U)
        mean_prime = np.kron(cov_prime, np.eye(self.n_ob)) @ self.K_inv_y
        
        return mean_prime
    
    def pred_var_prime(self, test_u):
        
        pred_var_prime = np.zeros((self.k_dm,self.n_ob,self.n_ob))
        cov_prime = self.k.cov_prime(test_u,self.tr_data.U)
        
        K_tr_test = np.kron(self.k.cov(self.tr_data.U, test_u), np.eye(self.n_ob))
        var_prime = np.kron(cov_prime, np.eye(self.n_ob)) @ np.linalg.solve(self.K_tr_tr, K_tr_test) 
        
        for i in range(self.k_dm):
            pred_var_prime[i,:,:] = var_prime[i::self.k_dm,:]

        return -2 * pred_var_prime
    
    
    
    
class Spc_Gpr(Gp_regression):
    
    def __init__(self, kernel, tr_data, kernel_x, normalizer = 10**(-10), prior_mean = 0):
        
        self.kx = kernel_x
        self.xx = tr_data.xx
        
        super().__init__(kernel, tr_data, normalizer, prior_mean)
        
        
    def inverse_precomputer(self):
        
        K_tr_tr = self.k.cov(self.tr_data.U)         
        K_tr_tr = np.kron(K_tr_tr, self.kx.cov(self.xx)) + self.normalizer * np.eye(self.n_ob*self.d_tr)
        
        K_inv_y = np.linalg.solve(K_tr_tr, self.tr_data.yy - self.mean)
        
        return K_inv_y, K_tr_tr
    
    
    def predict_mean(self, test_u, X = None):
        
        if X is None: X = self.xx
        
        K_test_tr = self.k.cov(test_u,self.tr_data.U)
        K_test_tr = np.kron(K_test_tr, self.kx.cov(X, self.xx) )#self.extend(K_test_tr)
            
        pred_mean = self.mean + K_test_tr @ self.K_inv_y
                
        return pred_mean.reshape(len(X), -1)
    
    
    def predict_var(self, test_u, X = None):
        
        if X is None: X = self.xx
        
        K_test_tr = np.kron(self.k.cov(test_u, self.tr_data.U),self.kx.cov(X, self.xx))
        K_tr_test = np.kron(self.k.cov(self.tr_data.U, test_u),self.kx.cov(self.xx, X)) 
        
        pred_var = np.kron(self.k.cov(test_u), self.kx.cov(X))- K_test_tr @ np.linalg.solve(self.K_tr_tr, K_tr_test)
        
        return pred_var

    def pred_mean_prime(self, test_u, X = None):
        
        if X is None: X = self.xx
        
        cov_prime = self.k.cov_prime(test_u,self.tr_data.U)
        mean_prime = np.kron(cov_prime, self.kx.cov(X, self.xx)) @ self.K_inv_y
        
        
        return mean_prime
    
    def pred_var_prime(self, test_u, X = None):
        
        if X is None: X = self.xx
        
        pred_var_prime = np.zeros((self.k_dm,self.n_ob,self.n_ob))
        cov_prime = self.k.cov_prime(test_u,self.tr_data.U)
        
        K_tr_test = np.kron(self.k.cov(self.tr_data.U, test_u),self.kx.cov(self.xx, X))
        var_prime = np.kron(cov_prime, self.kx.cov(X, self.xx)) @ np.linalg.solve(self.K_tr_tr, K_tr_test) 
        
        for i in range(self.k_dm):
            pred_var_prime[i,:,:] = var_prime[i::self.k_dm,:]

        return -2 * pred_var_prime


In [9]:
tr_data = data(thetas, training_data, 2, obs_points.reshape(-1,))


In [None]:
kernel = kernel_squared_exponential(1,1, dimen = 2)
Gpr = Gp_regression(kernel,tr_data)

def neg_log_marginal_likelihood(hyp):  # ML_GPR

    kernel = kernel_squared_exponential(hyp[0], hyp[1], dimen = 2)
    Gpr = Gp_regression(kernel,tr_data)
    
    K = Gpr.K_tr_tr

    eig_v = np.linalg.eigvals(K)

    L = np.linalg.cholesky(K)
    y = tr_data.yy
    nlml = sum(np.log(np.diag(L))) + 0.5*np.matmul(y.T,np.linalg.solve(K,y))[0] + 0.5*N*np.log(2*np.pi)

    return nlml

neg_log_marginal_likelihood(np.array([1.0, 1.0]))



array([-39.94413384])

In [18]:
from scipy.optimize import minimize

hyp = np.array([1,1])
bound = [[10**(-10), 100] for i in range(len(hyp))]
hyp1 = minimize(neg_log_marginal_likelihood, hyp, method = 'L-BFGS-B', bounds = bound)
print("Optimized hyperparameter:",hyp1.x)

kernel = kernel_squared_exponential(*hyp1.x, dimen = 2)
Gpr = Gp_regression(kernel,tr_data)

Optimized hyperparameter: [1.57410073 4.23240861]


In [19]:
Gpr.predict_mean(np.array([[0.098, 0.430]]))

array([[0.42518855],
       [0.66964511],
       [0.92114373],
       [1.18377489],
       [1.45010828],
       [1.69917492]])

In [22]:
Gpr.predict_var(np.array([[0.098, 0.430]]))

array([[8.26246152e-07, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 8.26246152e-07, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 8.26246153e-07, 0.00000000e+00,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 8.26246153e-07,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
        8.26246153e-07, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00, 8.26246153e-07]])

In [12]:
gp = GaussianProcess(thetas,training_data)

gp.neg_log_likelihood(jnp.array([1.0, 1.0]))

Array(6.00279282, dtype=float64)

In [13]:
gp = GaussianProcess(thetas,training_data)

gp.optimize_nll()

In [14]:
gp.opt_params

array([1.33301301, 4.32873114])

In [20]:
gp.predict_mean(jnp.array([[0.098, 0.430]]))

Array([0.42521981, 0.66970029, 0.92122171, 1.18386563, 1.4501936 ,
       1.69923526], dtype=float64)

In [21]:
gp.predict_var(jnp.array([[0.098, 0.430]]))

Array([[8.16826559e-07, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 8.09484865e-07, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 8.00558080e-07, 0.00000000e+00,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 8.18160566e-07,
        0.00000000e+00, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
        8.12329220e-07, 0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00, 7.98481281e-07]], dtype=float64)