In [3]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import gpytorch
from gpytorch.models import ExactGP
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.kernels import ScaleKernel, RBFKernel
from gpytorch.distributions import MultivariateNormal
from gpytorch.likelihoods import GaussianLikelihood
import tensorflow
from typing import Optional, Tuple

import torch
from linear_operator import to_dense
from gpytorch.constraints import Positive
from gpytorch.kernels import Kernel
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown(string))

In [4]:
# Training data
train_x = torch.tensor([[1.0, 2.0, 3.0],
                        [4.0, 5.0, 6.0],
                        [7.0, 8.0, 9.0],
                        [7.0, 8.0, 9.0]])
y_train = torch.tensor([[2.0],
                        [3.0],
                        [4.0],
                        [5.0]])

# Test data
test_x = torch.tensor([[4.0, 1.0, 2.0],
                       [3.0, 4.0, 5.0]])
y_test = torch.tensor([[1.0],
                       [2.0]])

# Display shapes of the tensors
print(train_x)
print("Shape of X_train:", train_x.shape)
print("Shape of y_train:", y_train.shape)
print("Shape of X_test:", test_x.shape)
print("Shape of y_test:", y_test.shape)

tensor([[1., 2., 3.],
        [4., 5., 6.],
        [7., 8., 9.],
        [7., 8., 9.]])
Shape of X_train: torch.Size([4, 3])
Shape of y_train: torch.Size([4, 1])
Shape of X_test: torch.Size([2, 3])
Shape of y_test: torch.Size([2, 1])


# Fully Additive k(xi, xj) = -1 + ∏ (1+k(x,y)) 

In [13]:

class FullyAdditive(gpytorch.kernels.Kernel):
    def __init__(self, base_kernel: Kernel, num_dims = int, active_dims: Optional[Tuple[int, ...]] = None,  **kwargs):
        super(FullyAdditive, self).__init__(active_dims=active_dims, **kwargs)
        self.base_kernel = base_kernel
        self.num_dims = num_dims
        outputscale_constraint = gpytorch.constraints.Positive()
        #here register one outputscale parameter
        self.register_parameter(name="raw_outputscale", 
            parameter=torch.nn.Parameter(torch.tensor(1.0)))
        outputscale_constraint = gpytorch.constraints.Positive()
        self.register_constraint("raw_outputscale", outputscale_constraint)

    @property
    def outputscale(self):
        return self.raw_outputscale_constraint.transform(self.raw_outputscale)

    @outputscale.setter
    def outputscale(self, value):
        self._set_outputscale(value)


    
    def forward(self, x1, x2, diag=False, **params):
            # Initialize product terms correctly for each pair of inputs
            prod_terms = 1.0  # Start with scalar 1 for multiplication
            #calculate the kernel for each dimension
            for d in range(self.num_dims):
                x1_d = x1[:, d:d+1]  # Isolate the d-th dimension
                x2_d = x2[:, d:d+1]
                # print(x1_d)
                k_d = self.base_kernel(x1_d, x2_d, **params).evaluate()
                # print(k_d)  # Use evaluate() to get the kernel matrix
                prod_terms *= (1 +self.outputscale* k_d) #calculate the multiplication of a sequence of terms
            # print(prod_terms)

            K = -1 + prod_terms 
            # print('k shape',K.shape)

            return  K
    

class MyGP(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super().__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ZeroMean()
        dimensionality = train_x.size(-1)  # Assuming last dimension is the dimensionality 

        #base_kernel = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel()) #when using the scaleKernel it gives the sigma f for the base kernel, using only the RBF - it has no ouputscale
        base_kernel = gpytorch.kernels.RBFKernel() 
        num_dims = dimensionality
        self.covar_module = FullyAdditive(base_kernel, num_dims)

    def forward(self, x): 
        mean = self.mean_module(x)
        covar = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean, covar)


# Create the GP model
likelihood = gpytorch.likelihoods.GaussianLikelihood()

model = MyGP(train_x, y_train.squeeze(-1), likelihood)
# Set up optimizer and marginal log likelihood
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)
# Training loop
training_iter = 50
for i in range(training_iter):
    optimizer.zero_grad()
    output = model(train_x)
    #print(y_train.squeeze().size())
    loss = -mll(output, y_train.squeeze(-1))
    loss = loss.mean() 
    loss.backward()
    optimizer.step()


# Viewing model parameters after training
for param_name, param in model.named_parameters():
    print(f'Parameter name: {param_name:42} value = {param.data}')


#evaluating there is a problem when the test_y and test_x have float numbers
with torch.no_grad():
    model.eval()  # Set the model to evaluation mode (mode is for computing predictions through the model posterior.)
    output = model(test_x)  # Make predictions on new data 
    

# Step 4: Postprocess Predictions (if needed)
predicted_means = output.mean.numpy() 
print(predicted_means)
printmd('\n\n**Printing all model constraints...**\n')
for constraint_name, constraint in model.named_constraints():
    print(f'Constraint name: {constraint_name:55} constraint = {constraint}')

Parameter name: likelihood.noise_covar.raw_noise           value = tensor([-0.7339])
Parameter name: covar_module.raw_outputscale               value = 0.5714855194091797
Parameter name: covar_module.base_kernel.raw_lengthscale   value = tensor([[5.3620]])
[1.9297421 2.6502414]




**Printing all model constraints...**


Constraint name: likelihood.noise_covar.raw_noise_constraint             constraint = GreaterThan(1.000E-04)
Constraint name: covar_module.raw_outputscale_constraint                 constraint = Positive()
Constraint name: covar_module.base_kernel.raw_lengthscale_constraint     constraint = Positive()


# DP

In [14]:
class DPkernel(gpytorch.kernels.Kernel):
    def __init__(self, base_kernel, num_dims, q_additivity, **kwargs):
        super().__init__(**kwargs)
        self.base_kernel = base_kernel
        self.num_dims = num_dims
        self.q_additivity = q_additivity
        self.register_parameter(
            name="raw_outputscale", 
            parameter=torch.nn.Parameter(torch.zeros(1, self.q_additivity))
        )
        self.outputscale_constraint = gpytorch.constraints.Positive()
        self.register_constraint("raw_outputscale", self.outputscale_constraint)

    @property
    def outputscale(self):
        return self.outputscale_constraint.transform(self.raw_outputscale).squeeze()

    @outputscale.setter
    def outputscale(self, value):
        if not torch.is_tensor(value):
            value = torch.tensor(value, device=self.raw_outputscale.device)
        self.initialize(raw_outputscale=self.outputscale_constraint.inverse_transform(value))

    def forward(self, x1, x2, diag=False, **params):
    # Determine sizes based on input matrices
        x1_size = x1.size(0)
        x2_size = x2.size(0)
        
        # Initialize matrices based on input sizes
        result = torch.zeros(x1_size, x2_size, device=x1.device) #initialize the result matrix
        sum_order_b = torch.zeros(x1_size, x2_size, device=x1.device) # initialize the matrix for the matrix for a single order
        kernels =[] # list were the z1, z2,... would be stored

        # print(f"Initial x1 shape: {x1.shape}, x2 shape: {x2.shape}")
        
        #calculations for first order
        #calcualte the kernels for each dimentions
        for d in range(self.num_dims):
            x1_d = x1[:, d:d+1]
            x2_d = x2[:, d:d+1]
            k_d = self.base_kernel(x1_d, x2_d).evaluate()
            kernels.append(k_d) #save them in order in the kernels list
            # print(f"Kernel k_d at dim {d} shape: {k_d.shape}, sum_order_b shape: {sum_order_b.shape}")

            sum_order_b += k_d # add each one dimension kernels to one matrix for first order

        result += sum_order_b * self.outputscale[0] #add the first order kernel miltiplied by first outputscale

        #calculations for higher dimensions
        for i in range(1, self.q_additivity):
            temp_sum = torch.zeros(x1_size, x2_size, device=x1.device)
            new_kernels = []
            for j in range(self.num_dims):
                sum_order_b = sum_order_b - kernels[j]
                k_d = kernels[j] * sum_order_b
                new_kernels.append(k_d)
                temp_sum += k_d

            sum_order_b = temp_sum
            kernels = new_kernels
            result += sum_order_b * self.outputscale[i]

        return result




   
# Example usage in a GP model
class MyGP(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(MyGP, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ZeroMean()
        self.base_kernel = gpytorch.kernels.RBFKernel()
        self.covar_module = DPkernel(base_kernel=self.base_kernel, num_dims=train_x.size(-1), q_additivity=train_x.size(-1))

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x, x)  # Make sure to pass x twice
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

# Create the GP model
likelihood = gpytorch.likelihoods.GaussianLikelihood()

model = MyGP(train_x, y_train.squeeze(-1), likelihood)
# Set up optimizer and marginal log likelihood
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)


# Training loop
training_iter = 50
for i in range(training_iter):
    optimizer.zero_grad()
    output = model(train_x)
    # print(output)
    loss = -mll(output, y_train.squeeze())
    # print(loss)
    loss = loss.mean() 
    loss.backward()
    optimizer.step()



# Viewing model parameters after training
for param_name, param in model.named_parameters():
    print(f'Parameter name: {param_name:42} value = {param.data}')


#evaluating there is a problem when the test_y and test_x have float numbers
with torch.no_grad():
    model.eval()  # Set the model to evaluation mode (mode is for computing predictions through the model posterior.)
    output = model(test_x)  # Make predictions on new data 

# Step 4: Postprocess Predictions (if needed)
predicted_means = output.mean.numpy() 
print(predicted_means)
for constraint_name, constraint in model.named_constraints():
    print(f'Constraint name: {constraint_name:55} constraint = {constraint}')

Parameter name: likelihood.noise_covar.raw_noise           value = tensor([-0.7604])
Parameter name: base_kernel.raw_lengthscale                value = tensor([[5.4945]])
Parameter name: covar_module.raw_outputscale               value = tensor([[ 1.5825, -0.3505, -1.1057]])
[2.0288777 2.687921 ]
Constraint name: likelihood.noise_covar.raw_noise_constraint             constraint = GreaterThan(1.000E-04)
Constraint name: base_kernel.raw_lengthscale_constraint                  constraint = Positive()
Constraint name: covar_module.raw_outputscale_constraint                 constraint = Positive()


In [7]:
#in both i didint have to cancel the sigma f for RBF kernel function as it doesnt have a outputscale. Adding the output scale is using the ScaleKernel()

# just don't understand why do the optimized parameters are sometimes negative if the constraints are positive

 