In [1]:
from stpy.kernels import KernelFunction
from stpy.continuous_processes.nystrom_fea import NystromFeatures
from stpy.helpers.helper import interval_torch
import torch
from torch.nn import Sequential, Linear
import torch.optim as optim
import numpy as np
import tqdm
from mtevi import EvidenceRegularizer, EvidentialnetMarginalLikelihood

In [2]:
# kernel = KernelFunction(kernel_name="matern", gamma=0.5, nu = 3.5, d = 1) #TODO check why runtime is slow
kernel = KernelFunction(kernel_name="squared_exponential", gamma=0.5, kappa=6.6,d = 1)

In [3]:
# np.random.seed(10)
# torch.manual_seed(10)

# num_factor = 10

# X = np.concatenate((np.linspace(-3, 6, 20*num_factor), np.linspace(6, 10, 5*num_factor)))
# rand_num = [6,3]

# F = lambda X: np.sin(X*4)**3 + (X**2)/10 -2.8 -0.0274
# eps = lambda X: np.random.randn(*X.shape)*0.1 + 2*np.random.normal(scale=np.abs(7-np.abs(X))*0.05)
# Y = F(X) + eps(X)

# X = np.expand_dims(X, 1)
# dim = len(X[0])

# sparse_idx_A = 21*num_factor
# sparse_idx_B = 24*num_factor

# s = 4*num_factor
# e = 13*num_factor

# X_t = np.concatenate((X[s:e], X[sparse_idx_A:sparse_idx_B]))
# X_v = np.concatenate((X[:s], X[e:sparse_idx_A], X[sparse_idx_B:]))
# Y_t = np.concatenate((Y[s:e], Y[sparse_idx_A:sparse_idx_B]))
# Y_v = np.concatenate((Y[:s], Y[e:sparse_idx_A], Y[sparse_idx_B:]))

# # outliers 30 points
# n_outliers = 5
# # X_outliers = np.linspace(-1,3,n_outliers)
# # random x values between -1 and 3
# X_outliers = np.random.uniform(-1,3,n_outliers)
# density_multiplier = 2
# X_outliers = np.concatenate([X_outliers]*density_multiplier)
# Y_outliers = np.sin(X_outliers*4)**3 + (X_outliers**2)/10 + np.random.randn(*X_outliers.shape)*0.1 + 2*np.random.normal(scale=np.abs(7-np.abs(X_outliers))*0.05) + 5
# X_outliers = np.expand_dims(X_outliers, 1)  # Expand dimensions to match X_t

# X_t = np.concatenate((X_t, X_outliers))
# Y_t = np.concatenate((Y_t, Y_outliers))

# Y_t = np.expand_dims(Y_t, 1)
# Y_v = np.expand_dims(Y_v, 1)

# x_tr = torch.tensor(X_t, dtype=torch.float64)
# y_tr = torch.tensor(Y_t, dtype=torch.float64)
# x_tr.shape, y_tr.shape

In [None]:
np.random.seed(10)
torch.manual_seed(10)

num_factor = 10

X = np.linspace(-3, 10, 100)

F = lambda X: np.sin(X*4)**3 + (X**2)/10 -2.8 -0.0274
eps = lambda X: np.random.randn(*X.shape)*0.1 + 2*np.random.normal(scale=np.abs(7-np.abs(X))*0.05)
Y = F(X) + eps(X)

X = np.expand_dims(X, 1)

# outliers 30 points
n_outliers = 5
X_outliers = np.random.uniform(-3,10,n_outliers)

density_multiplier = 1
X_outliers = np.concatenate([X_outliers]*density_multiplier)
Y_outliers = F(X_outliers) + eps(X_outliers) + 5
X_outliers = np.expand_dims(X_outliers, 1)  # Expand dimensions to match X_t

X_t = np.concatenate((X, X_outliers))
Y_t = np.concatenate((Y, Y_outliers))

Y_t = np.expand_dims(Y_t, 1)

x_tr = torch.tensor(X_t, dtype=torch.float64)
y_tr = torch.tensor(Y_t, dtype=torch.float64)
x_tr.shape, y_tr.shape

In [5]:
class ShallowModel(Sequential):
    def __init__(self, input_dim, emb_dim ,output_dim):
        super(ShallowModel, self).__init__()
        
        self.embedding = NystromFeatures(kernel, m=emb_dim, approx="nothing")
        self.emb_fit = False
        m = self.embedding.get_m()
        self.dense = Linear(m, output_dim, bias=False).double()
    
    def phi(self, x):
        if not self.emb_fit:
            # careful: only fit on first batch
            self.embedding.fit_gp(x,None, eps=0.1)
            self.emb_fit = True
            
        return self.embedding.embed(x)

    def forward(self, x):
        h = self.phi(x)
        y = self.dense(h)
        return y

model = ShallowModel(input_dim=x_tr.shape[1], emb_dim=x_tr.shape[0], output_dim=Y_t.shape[1])
huber_model = ShallowModel(input_dim=x_tr.shape[1], emb_dim=x_tr.shape[0], output_dim=Y_t.shape[1])

class AminiModel(ShallowModel):
    def __init__(self, input_dim, emb_dim, output_dim):
        super(AminiModel, self).__init__(input_dim, emb_dim, output_dim * 4)
        
    def forward(self, x):
        h = self.phi(x)
        output = self.dense(h)
        mu, logv, logalpha, logbeta = torch.chunk(output, 4, dim=-1)
        v = torch.nn.functional.softplus(logv)
        alpha = torch.nn.functional.softplus(logalpha) + 1
        beta = torch.nn.functional.softplus(logbeta)
        return torch.cat([mu, v, alpha, beta], dim=-1)

amini_model = AminiModel(input_dim=x_tr.shape[1], emb_dim=x_tr.shape[0], output_dim=Y_t.shape[1])

In [None]:

optimizer = optim.Adam(amini_model.parameters(), lr=0.01)

objective = EvidentialnetMarginalLikelihood()
reg = EvidenceRegularizer(factor=0.0001)

def amini_loss(x,y,amini_model):
    output = amini_model(x)
    gamma, nu, alpha, beta = torch.chunk(output, 4, dim=-1)
    y = y.double()
    nll = (objective(gamma,nu,alpha,beta,y)).mean()
    reg_loss = (reg(gamma, nu, alpha, beta, y)).mean()
    return nll + reg_loss
    

#set_postfix loss print
pbar = tqdm.tqdm(range(1000))
for i in pbar:
    optimizer.zero_grad()
    loss = amini_loss(x_tr, y_tr, amini_model)
    loss.backward()
    optimizer.step()
    pbar.set_postfix(loss=f'{loss.item():.4f}')


In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.01)

def loss_fn(x,y,model):
    y_hat = model(x)
    # likelihood + prior
    return torch.mean((y_hat - y)**2) + 0.002*torch.sum(model.dense.weight**2)

#set_postfix loss print
pbar = tqdm.tqdm(range(1000))
for i in pbar:
    optimizer.zero_grad()
    loss = loss_fn(x_tr,y_tr,model)
    loss.backward()
    optimizer.step()
    pbar.set_postfix(loss=f'{loss.item():.4f}')


In [None]:
optimizer = optim.Adam(huber_model.parameters(), lr=0.01)

def huber_loss(x,y,model, beta=1.):
    y_hat = model(x)
    return torch.nn.SmoothL1Loss(beta=beta)(y_hat, y)

#set_postfix loss print
pbar = tqdm.tqdm(range(1000))
for i in pbar:
    optimizer.zero_grad()
    loss = huber_loss(x_tr,y_tr,huber_model)
    loss.backward()
    optimizer.step()
    pbar.set_postfix(loss=f'{loss.item():.4f}')


In [None]:
import botorch
from botorch.models import SingleTaskGP, FixedNoiseGP
from gpytorch.mlls import ExactMarginalLogLikelihood
import gpytorch

# Convert data to torch tensors
X_t_tensor = torch.tensor(X_t, dtype=torch.double)
Y_t_tensor = torch.tensor(Y_t, dtype=torch.double)

#gpytorch fixed kernel parameters
mean_module = gpytorch.means.ConstantMean()
covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

# set values for the covariance function
covar_module.base_kernel.lengthscale = 0.5
covar_module.outputscale = 1.0

bo_model = FixedNoiseGP(X_t_tensor, Y_t_tensor, torch.ones_like(Y_t_tensor)*0.1, mean_module=mean_module, covar_module=covar_module)

# Fit the model
mll = ExactMarginalLogLikelihood(bo_model.likelihood, bo_model)
# botorch.fit_gpytorch_mll(mll)

# Test data
test_x = torch.tensor(np.linspace(-3, 10, 1000).reshape(-1, 1), dtype=torch.double)

# Make predictions
bo_model.eval()
with torch.no_grad():
    posterior = bo_model.posterior(test_x)
    pred_y = posterior.mean.squeeze().numpy()
    std_y = posterior.variance.sqrt().squeeze().numpy()

bo_model.state_dict()

In [None]:
import matplotlib.pyplot as plt

x_fine = torch.linspace(-3, 10, 1000).unsqueeze(-1).double()
plt.figure(figsize=(10,10))
plt.plot(X_t, Y_t, 'r.')
plt.plot(X_outliers, Y_outliers, 'rx')
plt.plot(x_fine, model(x_fine).detach().numpy(),'g--', lw=3, label='GP Model')
plt.plot(x_fine, huber_model(x_fine).detach().numpy(),'c--', lw=3, label='Huber GP Model')
plt.plot(x_fine, pred_y, 'k--', lw=3, label='BoTorch Model')
plt.plot(x_fine, amini_model(x_fine).detach().numpy()[:,0],'m--', lw=3, label='Amini Model')
plt.plot(x_fine, F(x_fine).detach().numpy(),'b-')
plt.legend()
ax = plt.gca()
ax.set_ylim(-5, 10)

In [11]:
# model parameters
# list(model.parameters())

In [12]:
from sklearn.base import BaseEstimator
import torch
import torch.nn as nn
import torch.optim as optim

class PyTorchEstimator(BaseEstimator):
    def __init__(self, model, criterion, optimizer_cls, epochs=10, batch_size=5000, lr=0.01):
        self.model = model
        self.criterion = criterion
        self.optimizer_cls = optimizer_cls
        self.epochs = epochs
        self.batch_size = batch_size
        self.lr = lr
    
    def fit(self, X, y):
        # Set model to training mode
        self.model.train()
        
        # Prepare data
        X_tensor = torch.tensor(X, dtype=torch.float64)
        y_tensor = torch.tensor(y, dtype=torch.float64)
        
        # Initialize optimizer
        optimizer = self.optimizer_cls(self.model.parameters(), lr=self.lr)
        
        #set_postfix loss print
        pbar = tqdm.tqdm(range(1000))
        for i in pbar:
            optimizer.zero_grad()
            loss = self.criterion(X_tensor, y_tensor, self.model)
            loss.backward()
            optimizer.step()
            pbar.set_postfix(loss=f'{loss.item():.4f}')

    def predict(self, X):
        # Set model to evaluation mode
        self.model.eval()
        
        with torch.no_grad():
            X_tensor = torch.tensor(X, dtype=torch.float64)
            outputs = self.model(X_tensor)
            if outputs.shape[1] == 4:
                return outputs[:,0].numpy().flatten()
        
        return outputs.numpy()


In [13]:
# random shuffle X_t, Y_t 
perm = torch.randperm(X_t_tensor.size(0))
X_t = X_t_tensor[perm]
Y_t = Y_t_tensor[perm]



In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_predict
import numpy as np

# Initialize model, criterion, and optimizer class
criterion = amini_loss
optimizer_cls = optim.Adam

# Wrap the model with our custom estimator
amini_estimator = PyTorchEstimator(model=amini_model, criterion=criterion, optimizer_cls=optimizer_cls, epochs=1000)

# Run cross-validation
amini_scores = cross_val_score(amini_estimator, X_t, Y_t, cv=10, scoring='neg_mean_squared_error', n_jobs=5)
print("Amini CV scores:", amini_scores)

pred = cross_val_predict(amini_estimator, X_t, Y_t, cv=10, n_jobs=5)

In [None]:
plt.figure(figsize=(10,10))
# sort X_t
sorted_idx = np.argsort(X_t.flatten())
sorted_idx

plt.plot(X_t[sorted_idx], Y_t[sorted_idx], 'r.', label='Noisy Observations')
plt.plot(X_t[sorted_idx], pred[sorted_idx], 'b-', label='Predicted')
plt.plot(X_t[sorted_idx], F(X_t[sorted_idx]), 'g-', label='True')
plt.legend()
# mse = (pred[sorted_idx] - F(X_t[sorted_idx]).numpy())**2

a = pred[sorted_idx]
b = F(X_t[sorted_idx]).numpy().squeeze()
mse = (a-b)**2
plt.title(f'MSE: {np.mean(mse):.4f}')

In [None]:
# Initialize model, criterion, and optimizer class
criterion = loss_fn
optimizer_cls = optim.Adam

# Wrap the model with our custom estimator
gp_estimator = PyTorchEstimator(model=model, criterion=criterion, optimizer_cls=optimizer_cls, epochs=1000)

# Run cross-validation
gp_scores = cross_val_score(gp_estimator, X_t, Y_t, cv=10, scoring='neg_mean_squared_error', n_jobs=5)
print("Normal GP CV scores:", gp_scores)

pred = cross_val_predict(gp_estimator, X_t, Y_t, cv=10, n_jobs=5)
a = pred[sorted_idx]
b = F(X_t[sorted_idx]).numpy().squeeze()
mse = (a-b)**2

In [None]:

plt.figure(figsize=(10,10))
# sort X_t
sorted_idx = np.argsort(X_t.flatten())
sorted_idx

plt.plot(X_t[sorted_idx], Y_t[sorted_idx], 'r.', label='Noisy Observations')
plt.plot(X_t[sorted_idx], pred[sorted_idx], 'b-', label='Predicted')
plt.plot(X_t[sorted_idx], F(X_t[sorted_idx]), 'g-', label='True')
plt.legend()
plt.title(f'MSE: {np.mean(mse):.4f}')

In [None]:
# Initialize model, criterion, and optimizer class
criterion = huber_loss
optimizer_cls = optim.Adam

# Wrap the model with our custom estimator
huber_gp_estimator = PyTorchEstimator(model=huber_model, criterion=criterion, optimizer_cls=optimizer_cls, epochs=1000)

# Run cross-validation
huber_scores = cross_val_score(huber_gp_estimator, X_t, Y_t, cv=10, scoring='neg_mean_squared_error', n_jobs=5)

print("Huber GP CV scores:", huber_scores)

pred = cross_val_predict(huber_gp_estimator, X_t, Y_t, cv=10, n_jobs=5)
((F(X_t).numpy().squeeze() - pred)**2).mean()


In [None]:

plt.figure(figsize=(10,10))
# sort X_t
sorted_idx = np.argsort(X_t.flatten())
sorted_idx

plt.plot(X_t[sorted_idx], Y_t[sorted_idx], 'r.', label='Noisy Observations')
plt.plot(X_t[sorted_idx], pred[sorted_idx], 'b-', label='Predicted')
plt.plot(X_t[sorted_idx], F(X_t[sorted_idx]), 'g-', label='True')
plt.legend()
mse = (pred[sorted_idx] - F(X_t[sorted_idx]).numpy().squeeze())**2
plt.title(f'MSE: {np.mean(mse):.4f}')

In [None]:
#random forest
from sklearn.ensemble import RandomForestRegressor

rf_scores = cross_val_score(RandomForestRegressor(), X_t, Y_t, cv=10, scoring='neg_mean_squared_error', n_jobs=5)
print("Random Forest CV scores:", rf_scores)

pred = cross_val_predict(RandomForestRegressor(), X_t, Y_t, cv=10, n_jobs=5)
((F(X_t) - pred)**2).shape
X_t.shape

In [None]:

plt.figure(figsize=(10,10))
# sort X_t
sorted_idx = np.argsort(X_t.flatten())
sorted_idx

plt.plot(X_t[sorted_idx], Y_t[sorted_idx], 'r.', label='Noisy Observations')
plt.plot(X_t[sorted_idx], pred[sorted_idx], 'b-', label='Predicted')
plt.plot(X_t[sorted_idx], F(X_t[sorted_idx]), 'g-', label='True')
plt.legend()
mse = (pred[sorted_idx] - F(X_t[sorted_idx]).numpy().squeeze())**2
plt.title(f'MSE: {np.mean(mse):.4f}')

In [None]:
import seaborn as sns
import pandas as pd

data_dict = {
    'Amini': amini_scores,
    'Normal GP': gp_scores,
    'Huber GP': huber_scores,
    'Random Forest': rf_scores 
}

df = pd.DataFrame(data_dict) * -1 # flip sign to get MSE
sns.boxplot(data=df, width=0.2)

In [23]:
# optimization of hyperparameters

In [24]:
gamma = torch.tensor(0.1, requires_grad=True)