In [3]:
from typing import Any
import torch
import torch.nn as nn
import torch.nn.functional as F
import botorch

from torch import Tensor
from torch.utils.data import TensorDataset, DataLoader

from botorch.optim.optimize import optimize_acqf
from botorch.test_functions.synthetic import SyntheticTestFunction
from botorch.acquisition import AcquisitionFunction
from botorch.models.model import Model
from botorch.posteriors import DeterministicPosterior


class Forrester(SyntheticTestFunction):
    dim = 1
    _bounds = [(0.0, 1.0)]
    _optimal_value: 0.0
    _optimizers = [(0.0,)]

    def evaluate_true(self, X: Tensor) -> Tensor:
        return (6. * X - 2.) ** 2 * torch.sin(12 * X - 4.)


class Ackley(SyntheticTestFunction):
    dim = 10  
    _bounds = [(-1000, 1000)] * dim  
    _optimal_value = 0.0  
    _optimizers = [(0.0,) * dim]  

    def evaluate_true(self, X: Tensor) -> Tensor:
        a = 20
        b = 0.2
        c = 2 * 3.141592653589793
        
        sum_sq_term = torch.sum(X ** 2, dim=-1)
        cos_term = torch.sum(torch.cos(c * X), dim=-1)

        term1 = -a * torch.exp(-b * torch.sqrt(sum_sq_term / self.dim))
        term2 = -torch.exp(cos_term / self.dim)

        return term1 + term2 + a + torch.exp(torch.tensor(1.0))


class Network(Model):
    def __init__(self, input_dim, output_dim, num_layers, num_units):
        super(Network, self).__init__()
        self.layers = nn.ModuleList()

        for i in range(num_layers):
            if not i:
                self.layers.append(nn.Linear(input_dim, num_units))
                self.layers.append(nn.ReLU())
            self.layers.append(nn.Linear(num_units, num_units))
            self.layers.append(nn.ReLU())

        self.layers.append(nn.Linear(num_units, output_dim))

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

    def posterior(self, X: Tensor, **kwargs: Any) -> DeterministicPosterior:
        y = self.forward(X).view(-1)
        return DeterministicPosterior(y)


class LFAcquisitionFunction(AcquisitionFunction):
    def __init__(self, model: Model) -> None:
        super().__init__(model)

    def forward(self, X):
        return self.model.posterior(X).mean


def train_model(model, X, Y, W):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.0)
    batch_size = 64

    dataset = TensorDataset(X, Y, W)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=False)

    for i in range(500):
        for x, y, w in loader:
            optimizer.zero_grad()
            y_ = model(x)
            loss = nn.BCEWithLogitsLoss(weight=w)(y_, y)
            loss.backward()
            optimizer.step()

    return model


def prepare_data(X, eta=1.0):
    fx = f(X).view(-1)
    tau = torch.quantile(fx, 0.33)

    y = torch.less(fx, tau)
    x1, y1 = X[y], y[y]
    x0, y0 = X, torch.zeros_like(y)
    w1 = (tau - fx)[y]
    w1 = w1 ** eta / torch.mean(w1)
    w0 = 1 - y0.float()
    s1 = x1.size(0)
    s0 = x0.size(0)

    X = torch.cat([x1, x0], dim=0)
    Y = torch.cat([y1, y0], dim=0).float().view(-1, 1)
    W = torch.cat([w1 * (s1 + s0) / s1, w0 * (s1 + s0) / s0], dim=0).view(-1, 1)
    W = W / W.mean()
    return X, Y, W


# f = Forrester()
f = Ackley()

# soboleng = torch.quasirandom.SobolEngine(dimension=1)
soboleng = torch.quasirandom.SobolEngine(dimension=10)

X_obs = soboleng.draw(4)*10

# print(X_obs)

for i in range(100):
    if i < 4:
        print(f'X: {X_obs[i].numpy()}, f(X): {f(X_obs[i:i+1]).item():.4f}')
        continue

    model = Network(10, 1, 2, 32)

    X, Y, W = prepare_data(X_obs)
    model = train_model(model, X, Y, W)

    acqf = LFAcquisitionFunction(model)

    a = optimize_acqf(acqf, bounds=torch.tensor([[-5.0] * 10, [5.0] * 10]), q=1, num_restarts=5, raw_samples=100)[0]
    
    print(f'f(X): {f(a.unsqueeze(0)).item():.4f}')
    
    # print(f'X: {a.numpy()}, f(X): {f(a.unsqueeze(0)).item():.4f}, model(X): {model(a.unsqueeze(0)).item():.4f}, \
    #         Loss: {nn.BCEWithLogitsLoss(weight=W)(model(X), Y).item():.4f}')

    X_obs = torch.cat([X_obs, a], dim=0)

print(min(f(X_obs).view(-1)).item())
print(max(f(X_obs).view(-1)).item())


X: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.], f(X): 0.0000
X: [5. 5. 5. 5. 5. 5. 5. 5. 5. 5.], f(X): 12.6424
X: [7.5 2.5 2.5 2.5 7.5 7.5 2.5 7.5 7.5 7.5], f(X): 16.3515
X: [2.5 7.5 7.5 7.5 2.5 2.5 7.5 2.5 2.5 2.5], f(X): 15.1723
f(X): 11.3854
f(X): 12.3289
f(X): 12.1802
f(X): 11.0462
f(X): 12.5962
f(X): 8.3512
f(X): 11.6213
f(X): 9.5183
f(X): 12.2003
f(X): 11.7890
f(X): 11.9702
f(X): 5.8432
f(X): 12.2841
f(X): 6.9348
f(X): 11.6733
f(X): 9.6452
f(X): 8.8385
f(X): 10.9050
f(X): 11.6007
f(X): 10.5345
f(X): 9.0501
f(X): 8.4142
f(X): 5.9912
f(X): 10.2022
f(X): 6.8384


Trying again with a new set of initial conditions.
  return _optimize_acqf_batch(opt_inputs=opt_inputs)


f(X): 12.4513
f(X): 11.8671
f(X): 11.7576
f(X): 9.4977
f(X): 7.7444
f(X): 7.9529
f(X): 10.2402
f(X): 6.0546
f(X): 10.7864
f(X): 11.8035
f(X): 11.3868
f(X): 8.6895
f(X): 12.3738
f(X): 7.3663
f(X): 9.4455
f(X): 8.9964
f(X): 11.7364
f(X): 7.6053
f(X): 10.5188
f(X): 11.7650
f(X): 3.6217
f(X): 7.5133
f(X): 9.3710
f(X): 9.6010
f(X): 7.2266
f(X): 5.5721
f(X): 5.2049
f(X): 8.7001
f(X): 10.1194
f(X): 8.8762
f(X): 8.3530
f(X): 5.0968
f(X): 8.7573
f(X): 9.2478
f(X): 10.4890
f(X): 6.0577
f(X): 6.4926
f(X): 7.8739
f(X): 10.2864
f(X): 7.4390
f(X): 4.7444
f(X): 11.1101
f(X): 11.5754
f(X): 7.1262
f(X): 11.2904
f(X): 9.4231
f(X): 11.7482
f(X): 6.1862


Trying again with a new set of initial conditions.
  return _optimize_acqf_batch(opt_inputs=opt_inputs)


f(X): 9.8969
f(X): 4.4858
f(X): 7.0352
f(X): 11.1396
f(X): 7.3395
f(X): 6.7297
f(X): 8.6213
f(X): 9.3958
f(X): 11.4039
f(X): 9.2523
f(X): 11.5859
f(X): 7.5774
f(X): 7.7134
f(X): 8.1480
f(X): 8.3726
f(X): 11.1145
f(X): 8.1198
f(X): 8.3775
f(X): 6.8731
f(X): 7.6808
f(X): 8.3900
f(X): 8.7036
f(X): 5.3164
9.5367431640625e-07
16.351524353027344


In [2]:
pi = 4
import numpy as np
np.roll(np.arange(10), -pi)

array([4, 5, 6, 7, 8, 9, 0, 1, 2, 3])