In [None]:
import numpy as np
from tqdm import tqdm
import os
import torch
from gen_rand_design import gen_rand_design

In [None]:
tkwargs = {'dtype': torch.float64, 'device': 'cpu'}
from botorch.models import SingleTaskGP, ModelListGP
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood
from botorch import fit_gpytorch_model
from botorch.acquisition.monte_carlo import qExpectedImprovement
from botorch.acquisition.monte_carlo import qUpperConfidenceBound
from botorch.acquisition.monte_carlo import qProbabilityOfImprovement
from botorch.optim import optimize_acqf

In [1]:
def target_function(X):
    ones = np.array([1]*X.shape[0]).reshape(-1,1)
    Z = np.concatenate((ones, X), axis=1)
    M = Z.T @ Z
    try:
        d_opt = np.array(np.linalg.det(M)).reshape(-1,1)
    except:
        d_opt = np.array([-np.infty])
    return torch.tensor(d_opt)

In [2]:
X = gen_rand_design(runs=4, feats=16)
X

NameError: name 'gen_rand_design' is not defined

In [742]:
def get_initial_data(n=1, low=None, high=None):
    x1 = torch.FloatTensor(n,1).uniform_(low[0], high[0])
    x2 = torch.FloatTensor(n,1).uniform_(low[1], high[1])
    x3 = torch.FloatTensor(n,1).uniform_(low[2], high[2])
    x4 = torch.FloatTensor(n,1).uniform_(low[3], high[3])
    x5 = torch.FloatTensor(n,1).uniform_(low[4], high[4])
    x6 = torch.FloatTensor(n,1).uniform_(low[5], high[5])
    x7 = torch.FloatTensor(n,1).uniform_(low[6], high[6])
    x8 = torch.FloatTensor(n,1).uniform_(low[7], high[7])
    x9 = torch.FloatTensor(n,1).uniform_(low[8], high[8])
    x10 = torch.FloatTensor(n,1).uniform_(low[9], high[9])
    x11 = torch.FloatTensor(n,1).uniform_(low[10], high[10])
    x12 = torch.FloatTensor(n,1).uniform_(low[11], high[11])
    X = np.vstack((x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12))
    y = target_function(X)
    best_y = y.max().item()
    return x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12, y, best_y

In [743]:
def get_initial_data_4(n=1, low=None, high=None):
    x1 = torch.FloatTensor(n,1).uniform_(low[0], high[0])
    x2 = torch.FloatTensor(n,1).uniform_(low[1], high[1])
    x3 = torch.FloatTensor(n,1).uniform_(low[2], high[2])
    x4 = torch.FloatTensor(n,1).uniform_(low[3], high[3])
    X = np.vstack((x1, x2, x3, x4))
    y = target_function(X)
    best_y = y.max().item()
    return x1, x2, x3, x4, y, best_y

In [797]:
def gen_next_point(X, y, best_y, bounds, n_exp, inequality_constraints=None):
    model = SingleTaskGP(X, y)
    mll = ExactMarginalLogLikelihood(model.likelihood, model)
    fit_gpytorch_model(mll)
    EI = qExpectedImprovement(model=model, best_f=best_y)
    PI = qProbabilityOfImprovement(model=model, best_f=best_y)
    UCB = qUpperConfidenceBound(model=model, beta=0.3)
    candidates, _ = optimize_acqf(acq_function=EI,
                                  bounds=bounds,
                                  q=n_exp,
                                  num_restarts=10,
                                  raw_samples=1000,
                                  options={"batch_limit": 5, "maxiter": 200},
                                  inequality_constraints=inequality_constraints)
    return candidates

In [1]:
x1_init,\
x2_init,\
x3_init,\
x4_init,\
y_init, best_y_init = get_initial_data_4(n=1, low=[-1,-1,-1,-1], high=[1,1,1,1])

NameError: name 'get_initial_data_4' is not defined

In [801]:
X_init = torch.tensor(np.vstack((x1_init, x2_init, x3_init, x4_init))).T
bounds = torch.Tensor([[-1,-1,-1,-1], [1,1,1,1]])

In [803]:
n_runs=250
History_Matrix = []
for i in tqdm(range(n_runs)):
    #print(f"Nr. of optimization run: {i+1}")
    new_candidates = gen_next_point(X_init, y_init, best_y_init, bounds=bounds, n_exp=1)
    X = np.vstack((new_candidates[:,0],
                   new_candidates[:,1],
                   new_candidates[:,2],
                   new_candidates[:,3],
                   new_candidates[:,4],
                   new_candidates[:,5],
                   new_candidates[:,6],
                   new_candidates[:,7],
                   new_candidates[:,8],
                   new_candidates[:,9],
                   new_candidates[:,10],
                   new_candidates[:,11]))
    new_result = target_function(X)
    #print(f"New candidate is: {new_candidates}")

    X_init = torch.cat([X_init, new_candidates])
    y_init = torch.cat([y_init, new_result])
    best_y_init = y_init.max().item()
    #print(f"Best point performs this way: {best_y_init}")
    History_Matrix.append([i, best_y_init, new_candidates])

In [804]:
n_runs=10
History_Matrix = []
for i in tqdm(range(n_runs)):
    #print(f"Nr. of optimization run: {i+1}")
    new_candidates = gen_next_point(X_init, y_init, best_y_init, bounds=bounds, n_exp=1)
    X = np.vstack((new_candidates[:,0],
                   new_candidates[:,1],
                   new_candidates[:,2],
                   new_candidates[:,3]))
    new_result = target_function(X)
    #print(f"New candidate is: {new_candidates}")

    X_init = torch.cat([X_init, new_candidates])
    y_init = torch.cat([y_init, new_result])
    best_y_init = y_init.max().item()
    #print(f"Best point performs this way: {best_y_init}")
    History_Matrix.append([i, best_y_init, new_candidates])

100%|██████████| 10/10 [00:04<00:00,  2.27it/s]


In [805]:
History_Matrix = np.array(History_Matrix, dtype=object)
epoch_max_det_id = History_Matrix[:, 1].argmax()
Best_design = History_Matrix[epoch_max_det_id, 2]
Best_criterion = History_Matrix[epoch_max_det_id, 1]

In [806]:
Best_criterion

15.999999999999998

In [807]:
Best_design.T

tensor([[ 1.],
        [ 1.],
        [-1.],
        [-1.]])

In [354]:
x1 = torch.FloatTensor(1,1).uniform_(-1, 1)
x2 = torch.FloatTensor(1,1).uniform_(-1, 1)
x = np.vstack((x1, x2))
x

array([[ 0.11435723],
       [-0.01103151]], dtype=float32)

In [322]:
(torch.FloatTensor(1,1).uniform_(-1, 1))

tensor([[-0.8926]])

In [360]:
x_iter = np.array([torch.FloatTensor(1,1).uniform_(-1,1) for n in range(2)], dtype=object)

  x_iter = np.array([torch.FloatTensor(1,1).uniform_(-1,1) for n in range(2)], dtype=object)


In [361]:
x_iter

array([tensor([[0.1906]]), tensor([[-0.8332]])], dtype=object)

## BO sklearn

In [1]:
from scipy.stats import norm
import random
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import ConstantKernel
from sklearn.gaussian_process.kernels import WhiteKernel
from sklearn.gaussian_process.kernels import RBF
from sklearn.gaussian_process.kernels import Matern

from gen_rand_design import gen_rand_design
from Jcb import calc_basis_matrix

In [2]:
def surrogate(model, X):
    return model.predict(X, return_std=True)

In [12]:
def acq_pi(X, Xsamples, model, beta=None):
    yhat, _ = surrogate(model, X)
    best = max(yhat) # calculate the best so far

    mu, std = surrogate(model, Xsamples)
    mu = mu.reshape(-1,1)[:, 0]
    gamma = (mu - best) / (std + 1E-9)
    probs = norm.cdf(gamma) # add a very small number to std to avoid dividing with 0
    return probs

def acq_ei(X, Xsamples, model, beta=None):
    yhat, _ = surrogate(model, X)
    best = max(yhat) # calculate the best so far

    mu, std = surrogate(model, Xsamples)
    mu = mu.reshape(-1,1)[:, 0]
    gamma = (mu-best)/(std + 1E-9) # add a very small number to std to avoid dividing with 0
    probs = std * (gamma * norm.cdf(gamma)) + norm.pdf(gamma)
    return probs

def acq_ucb(X, Xsamples, model, beta=1):
    yhat, _ = surrogate(model, X)
    mu, std = surrogate(model, Xsamples)
    mu = mu.reshape(-1, 1)[:, 0]
    probs = mu + np.sqrt(beta)*std
    return probs

def opt_acq(X, y, model, acq, low, high, beta=0.5):
    Xsamples = get_random_X(low=low, high=high, samples=10000)
    scores = acq(X, Xsamples, model, beta=beta)
    ix = np.argmax(scores)
    return Xsamples[ix]

def get_random_X(low, high, samples):
    ALL = np.array(np.meshgrid(*[np.arange(low[k], high[k], 0.1) for k in range(len(low))])).T.reshape(-1,len(low))
    idx = random.sample(range(0, ALL.shape[0]-1), samples)
    return ALL[idx]

In [13]:
def objective(X, runs, feats, optimality='A', J_cb=None):
    ones = np.array([1]*runs).reshape(-1,1)
    X = X.reshape(runs, feats)
    Zetta = np.concatenate((ones, X), axis=1) if J_cb is None else np.concatenate((ones, X @ J_cb), axis=1)
    M = Zetta.T @ Zetta
    if optimality == 'A':
        try:
            cr = np.trace(np.linalg.inv(M))
        except np.linalg.LinAlgError:
            cr = np.infty
        return np.array([cr])
    elif optimality == 'D':
        try:
            cr = np.linalg.det(M)
        except np.linalg.LinAlgError:
            cr = np.infty
        return np.array([-cr])
    else:
        raise ValueError(f"Invalid criterion {optimality}. "
                         "Criterion should be one of 'D', or 'A'.")

In [14]:
runs = 4
feats = 8
J_cb = calc_basis_matrix(x_basis=feats, b_basis=2)
X = gen_rand_design(runs, feats)
X_flat = X.flatten().reshape(1, X.shape[0]*X.shape[1])
y = objective(X=X_flat, runs=runs, feats=feats, optimality='A', J_cb=J_cb)

In [15]:
kernel=None
model = GaussianProcessRegressor(kernel=kernel)

def random_start(runs, feats, model=model, J_cb=None):
    data = gen_rand_design(runs=runs, feats=feats)
    data_flat = data.flatten().reshape(1, runs*feats)
    target = objective(data_flat, runs=runs, feats=feats, optimality='A', J_cb=J_cb)
    model.fit(data_flat, target)
    return data_flat, target, model

In [16]:
X_init, y_init, model = random_start(runs=4, feats=8, model=GaussianProcessRegressor(kernel=None), J_cb=J_cb)

In [None]:
x_next = opt_acq(X_init, y_init, model=model, acq=acq_ei, low=[-1]*feats, high=[1]*feats, beta=None)