# Gaussian Process

In this notebook, we will implement a Gaussian Process model from scratch.

In [1]:
# import all packages and set plots to be embedded inline
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.optimize import Bounds
from pyDOE import lhs
import seaborn as sns
import pandas as pd

%matplotlib inline

## Test function

In [2]:
def test_function(x):
    """1D Test Function"""
    
    y = (x*6-2)**2*np.sin(x*12-4)
    
    return y

In [44]:
class GaussianProcess:
    """A Gaussian Process class that trains and predicts 
    with a Gaussian Process model"""
    
    def __init__(self, n_restarts, optimizer, bounds):
        """Initialize a Gaussian Process model
        
        Input
        ------
        n_restarts: number of restarts of the local optimizer
        optimizer: algorithm of local optimization
        bounds: Bounds for GP model parameter theta"""
        
        self.n_restarts = n_restarts
        self.optimizer = optimizer
        self.bounds = bounds
        
    def likelihood(self, theta):
        """Likelihood function"""
        
        theta = 10**theta    # Correlation length
        n = self.X.shape[0]       # Number of training instances
        one = np.ones((n,1))      # A vector of ones
        
        # Construct correlation matrix
        Psi = np.zeros((n,n))
        for i in range(n-1):
            for j in range(i+1,n):
                Psi[i,j] = np.exp(-np.sum(theta*(self.X[i,:]-self.X[j,:])**2))
        Psi = Psi + Psi.T + np.eye(n) + np.eye(n)*1e-10
        
        # Mean estimation
        mu = (one.T @ np.linalg.inv(Psi) @ self.y)/ (one.T @ np.linalg.inv(Psi) @ one)
        
        # Variance estimation
        SigmaSqr = (self.y-mu*one).T @ np.linalg.inv(Psi) @ (self.y-mu*one) / n
        
        # Compute log-likelihood
        DetPsi = np.linalg.det(Psi)
        LnLike = -(n/2)*np.log(SigmaSqr) - 0.5*np.log(DetPsi)
        
        # Update attributes
        self.Psi, self.mu, self.SigmaSqr = Psi, mu, SigmaSqr
        
        return -LnLike.flatten()
        
        
    def fit(self, X, y):
        """GP model training
        
        Input
        -----
        X: 2D array of shape (n_samples, n_features)
        y: 2D array of shape (n_samples, 1)
        """
        
        self.X, self.y = X, y
        
        # Generate random starting points (Latin Hypercube)
        lhd = lhs(self.X.shape[1], samples=self.n_restarts)
        # Scale random samples to the given bounds 
        lb, ub = self.bounds[0], self.bounds[1]
        initial_points = (ub-lb)*lhd + lb
        
        # Create A Bounds instance for optimization
        bnds = Bounds(lb,ub)
        
        # Run local optimizer on all points
        self.opt_para, self.opt_func = np.zeros(self.n_restarts), np.zeros(self.n_restarts)
        for i,point in enumerate(initial_points):
            res = minimize(self.likelihood, point, method=self.optimizer,
                bounds=bnds)
            self.opt_para[i] = res.x
            self.opt_func[i] = res.fun
        
        # Locate the optimum results
        self.theta = self.opt_para[np.argmin(self.opt_func)]
        
        # Update GP attributes
        self.LnLike = -self.likelihood(self.theta)
    
    def predict(self, X_test):
        """GP model predicting
        
        Input
        -----
        X_test: Prediction sites"""
        
        theta = 10**self.theta
        n = self.X.shape[0]
        one = np.ones((n,1))
        
        # Construct correlation matrix between test and train data
        psi=np.zeros((n,X_test.shape[0]))
        for i in range(n):
            for j in range(X_test.shape[0]):
                psi[i,j] = np.exp(-np.sum(theta*(self.X[i,:]-X_test[j,:])**2))
        
        # Mean prediction
        f = self.mu + psi.T @ np.linalg.inv(self.Psi) @ (self.y-self.mu*one)
        
        # Variance prediction
        SSqr = self.SigmaSqr*(1 - psi.T @ np.linalg.inv(self.Psi) @ psi)
        
        return f, SSqr

In [45]:
X_train = np.array([0.0, 0.1, 0.2, 0.4, 0.6, 1], ndmin=2).T
y_train = test_function(X_train)

In [46]:
GP = GaussianProcess(n_restarts=30, optimizer='L-BFGS-B', bounds=(-3,2))

In [47]:
GP.fit(X_train, y_train)

In [56]:
X_test = np.array([[0.5],[0.1]])

In [57]:
f, SSqr = GP.predict(X_test)
print(f)
print(SSqr)

[[-1.0029982 ]
 [-0.65657677]]
[[5.49644536e-01 3.95380955e+01]
 [3.95380955e+01 4.43427610e-09]]


In [58]:
test_function(X_test)

array([[ 0.90929743],
       [-0.65657677]])