In [10]:
from botorch.acquisition.multi_objective.hypervolume_knowledge_gradient import (
    qHypervolumeKnowledgeGradient,
)
from botorch.acquisition.multi_objective.predictive_entropy_search import (
    qMultiObjectivePredictiveEntropySearch,
)
from botorch.sampling import SobolQMCNormalSampler
from botorch.utils.transforms import unnormalize
from gpytorch.mlls import ExactMarginalLogLikelihood


ImportError: cannot import name 'shape_to_str' from 'botorch.logging' (/home/asj53/.conda/envs/research_env/lib/python3.9/site-packages/botorch/logging.py)

In [None]:
import botorch, gpytorch, torch
print(botorch.__version__)   # -> 0.13.0  (or newer)
print(gpytorch.__version__)  # -> 1.11 or 1.12


0.7.0


In [None]:
import torch
from botorch.test_functions.multi_objective import DTLZ2
from botorch.utils.sampling import draw_sobol_samples
import torch
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.pairwise_gp import PairwiseGP, PairwiseLaplaceMarginalLogLikelihood
from botorch.fit import fit_gpytorch_model
from botorch.acquisition.monte_carlo import qExpectedImprovement
from botorch.optim.optimize import optimize_acqf
from botorch.sampling import SobolQMCNormalSampler
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood

def fit_pref_model(Y: torch.Tensor, comps: torch.Tensor) -> PairwiseGP:
    # detach Y so that no upstream graph remains
    Y_det = Y.detach().clone()
    model = PairwiseGP(
        train_X=Y_det,
        train_targets=comps,
        input_transform=Normalize(d=Y_det.size(-1)),
    )
    mll = PairwiseLaplaceMarginalLogLikelihood(model.likelihood, model)
    fit_gpytorch_model(mll)
    return model

# --- 1. Define the problem and set a seed for reproducibility ---
torch.manual_seed(0)
problem = DTLZ2(dim=5, num_objectives=4)

# --- 2. Generate initial experimental data (X_obs, Y_obs) ---
#    8 quasi-random Sobol points in [0,1]^5
X_obs = draw_sobol_samples(bounds=problem.bounds, n=1, q=8).squeeze(0)       # Tensor of shape [8, 5]
Y_obs = problem(X_obs)                                                       # Tensor of shape [8, 4]
from typing import Optional, Tuple
from botorch.models.transforms.input import Normalize
from botorch.models.transforms.outcome import Standardize
# --- 3. Fit the outcome GP to (X_obs, Y_obs) ---
outcome_model = fit_outcome_model(X_obs, Y_obs)
#    → outcome_model is now a SingleTaskGP trained on those 8 data points

def neg_l1_dist(Y: torch.Tensor, X: Optional[torch.Tensor] = None) -> torch.Tensor:
    """Negative L1 distance from a Pareto optimal points"""
    if len(Y.shape) == 1:
        Y = Y.unsqueeze(0)
    dist = torch.cdist(
        Y, torch.full(Y.shape[-1:], fill_value=0.5, dtype=Y.dtype).unsqueeze(0), p=1
    ).squeeze(-1)
    return -dist
def fit_outcome_model(X: torch.Tensor, Y: torch.Tensor) -> SingleTaskGP:
    """Fit the outcome model f"""
    outcome_model = SingleTaskGP(
        train_X=X,
        train_Y=Y,
        input_transform=Normalize(d=X.shape[-1]),
        outcome_transform=Standardize(m=Y.shape[-1]),
    )
    mll = ExactMarginalLogLikelihood(outcome_model.likelihood, outcome_model)
    fit_gpytorch_model(mll)
    return outcome_model
util_func = neg_l1_dist


def fit_pref_model(Y: torch.Tensor, comps: torch.Tensor) -> PairwiseGP:
    """Fit the preference model g."""
    model = PairwiseGP(Y, comps, input_transform=Normalize(d=Y.shape[-1]))
    mll = PairwiseLaplaceMarginalLogLikelihood(model.likelihood, model)
    fit_gpytorch_model(mll)
    return model


def gen_rand_X(problem, n: int) -> torch.Tensor:
    """Generate n quasi-random Sobol points in the design space."""
    return draw_sobol_samples(bounds=problem.bounds, n=1, q=n).squeeze(0)


def generate_random_exp_data(problem, n: int) -> Tuple[torch.Tensor, torch.Tensor]:
    """Generate n observations of (X, Y) Pairs"""
    X = gen_rand_X(problem, n)
    Y = problem(X)
    return X, Y

def generate_random_pref_data(
    outcome_model: SingleTaskGP, n: int) -> Tuple[torch.Tensor, torch.Tensor]:
    X = gen_rand_X(problem, 2 * n)
    # Draw *detached* samples from the GP posterior:
    with torch.no_grad():
        # use .sample() to avoid a reparameterized graph, or
        # use .rsample().detach() if you really need rsample
        Y = outcome_model.posterior(X).sample().squeeze(0)
    util = util_func(Y)
    comps = gen_comps(util)
    return Y, comps


def gen_comps(util: torch.Tensor) -> torch.Tensor:
    """Given an 1-d tensor of utility, create pairwise comparisons between adjacent items."""
    util = util.reshape(-1, 2)
    comps = torch.arange(util.numel()).reshape(-1, 2)
    flip = util[:, 0] < util[:, 1]
    comps[flip, [0]], comps[flip, [1]] = comps[flip, [1]], comps[flip, [0]]

    return comps
# --- 4. Generate an initial set of preference‐comparison data ---
#    Here “n=4” means we’ll sample 2*n = 8 new outcomes and get 4 pairwise comps
Y_pref_baseline, pair_indices = generate_random_pref_data(outcome_model, n=4)
#    Y_pref_baseline: Tensor of shape [8, 4] (the sampled outcome vectors)
#    pair_indices:     Tensor of shape [4, 2] (each row is a pair of indices into Y_pref_baseline)

# --- 5. Fit the preference GP to those comparisons ---
pref_model = fit_pref_model(Y_pref_baseline, pair_indices)
from botorch.acquisition import LearnedObjective

with torch.no_grad():
    # Posterior mean utility for each baseline Y
    baseline_util = pref_model.posterior(Y_pref_baseline).mean.squeeze(-1)
best_f = baseline_util.max()  # scalar Tensor
pref_obj = LearnedObjective(pref_model=pref_model)
# Assume we have:
# - outcome_model: SingleTaskGP fitted to (X_obs, Y_obs)
# - pref_model: PairwiseGP fitted to preference data (Y_obs, pair_indices)
# - X_obs: previously evaluated inputs for the outcome model (n x d tensor)
# - Y_obs: corresponding outcomes (n x k tensor)
# - Y_pref_baseline: set of outcome vectors used as the baseline for pref GP (m x k tensor)

# 3. Configure Monte Carlo sampler for outcome model (f) 

outcome_sampler = SobolQMCNormalSampler(num_samples = 100 )

# 4. Instantiate qNEIUU acquisition function
acqf_qnei_uu = qExpectedImprovement(
    model=outcome_model,     # SingleTaskGP on (X_obs, Y_obs)
    best_f=best_f,           # current best utility
    sampler=outcome_sampler, # QMC sampler for f
    objective=pref_obj,      # LearnedObjective wrapping pref_model
    # posterior_transform=None, X_pending=None, constraints=None, eta=1e-3 are all optional
)

from botorch.optim import optimize_acqf
d = 5
bounds = torch.stack([torch.zeros(d), torch.ones(d)])  # example [0,1]^d
candidates, acq_vals = optimize_acqf(
    acq_function=acqf_qnei_uu,
    bounds=bounds,
    q=1,              # batch size
    num_restarts=8,
    raw_samples=64,
    options={"batch_limit": 4},
    sequential=False, # joint batch optimization
)


# Run your expensive evaluation (or simulator) to get true outcomes
Y_new = problem(candidates)  # or f_true(candidates)
# Append new points to your observed data
X_obs = torch.cat([X_obs, candidates], dim=0)   # now (n+q)×d
Y_obs = torch.cat([Y_obs, Y_new],      dim=0)   # now (n+q)×k
# Example: compare each new Y_new[i] against a random existing outcome
# (you’d replace this with your actual DM interface)
pairs = []
for i in range(Y_new.shape[0]):
    # pick a random baseline index
    j = torch.randint(0, Y_pref_baseline.shape[0], (1,)).item()
    # create a pair [baseline_idx, new_idx]
    pairs.append([j, Y_pref_baseline.shape[0] + i])
pairs = torch.tensor(pairs, dtype=torch.long)  # shape (q, 2)

# Augment the preference dataset
Y_pref_baseline = torch.cat([Y_pref_baseline, Y_new], dim=0)   # (m+q)×k
pair_indices     = torch.cat([pair_indices, pairs],     dim=0)  # (#pairs_old+q)×2



In [40]:
# Refit outcome GP on the expanded (X_obs, Y_obs)
outcome_model = SingleTaskGP(
    train_X=X_obs,
    train_Y=Y_obs,
    # … same transforms as before …
)
fit_gpytorch_model(ExactMarginalLogLikelihood(outcome_model.likelihood, outcome_model))

fit_gpytorch_model(PairwiseLaplaceMarginalLogLikelihood(pref_model.likelihood, pref_model))


PairwiseLaplaceMarginalLogLikelihood(
  (likelihood): PairwiseProbitLikelihood()
  (model): PairwiseGP(
    (input_transform): Normalize()
    (likelihood): PairwiseProbitLikelihood()
    (mean_module): ConstantMean()
    (covar_module): ScaleKernel(
      (base_kernel): RBFKernel(
        (lengthscale_prior): GammaPrior()
        (raw_lengthscale_constraint): Positive()
      )
      (outputscale_prior): SmoothedBoxPrior()
      (raw_outputscale_constraint): Positive()
    )
  )
)

In [45]:
import torch
from botorch.test_functions.multi_objective import DTLZ2
from botorch.utils.sampling import draw_sobol_samples
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.pairwise_gp import PairwiseGP, PairwiseLaplaceMarginalLogLikelihood
from botorch.models.transforms.input import Normalize
from botorch.models.transforms.outcome import Standardize
from botorch.fit import fit_gpytorch_model
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood
from botorch.acquisition.monte_carlo import qExpectedImprovement
from botorch.acquisition.objective import LearnedObjective
from botorch.optim import optimize_acqf
from botorch.sampling import SobolQMCNormalSampler
from typing import Optional, Tuple

# -- User‐defined utilities --------------------------------------------------

def neg_l1_dist(Y: torch.Tensor, X: Optional[torch.Tensor] = None) -> torch.Tensor:
    if Y.dim() == 1:
        Y = Y.unsqueeze(0)
    dist = torch.cdist(
        Y, torch.full((1, Y.size(-1)), 0.5, dtype=Y.dtype), p=1
    ).squeeze(-1)
    return -dist

def gen_comps(util: torch.Tensor) -> torch.Tensor:
    """Pairwise indices so that the higher‐utility item comes first."""
    util = util.view(-1, 2)
    pairs = torch.arange(util.numel()).view(-1, 2)
    swap = util[:, 0] < util[:, 1]
    pairs[swap] = pairs[swap].flip(1)
    return pairs

def gen_rand_X(problem, n: int) -> torch.Tensor:
    return draw_sobol_samples(bounds=problem.bounds, n=1, q=n).squeeze(0)

def generate_random_pref_data(
    outcome_model: SingleTaskGP, n: int
) -> Tuple[torch.Tensor, torch.Tensor]:
    X = gen_rand_X(problem, 2 * n)
    Y = outcome_model.posterior(X).rsample().squeeze(0)
    comps = gen_comps(neg_l1_dist(Y))
    return Y, comps

# -- Model‐fitting helpers ---------------------------------------------------

def fit_outcome_model(X: torch.Tensor, Y: torch.Tensor) -> SingleTaskGP:
    m = SingleTaskGP(
        train_X=X, train_Y=Y,
        input_transform=Normalize(d=X.size(-1)),
        outcome_transform=Standardize(m=Y.size(-1)),
    )
    mll = ExactMarginalLogLikelihood(m.likelihood, m)
    fit_gpytorch_model(mll)
    return m

def fit_pref_model(Y: torch.Tensor, comps: torch.Tensor) -> PairwiseGP:
    m = PairwiseGP(Y, comps, input_transform=Normalize(d=Y.size(-1)))
    mll = PairwiseLaplaceMarginalLogLikelihood(m.likelihood, m)
    fit_gpytorch_model(mll)
    return m

# -- BOPE Loop ---------------------------------------------------------------

# 1) Problem setup
torch.manual_seed(0)
problem = DTLZ2(dim=5, num_objectives=4)
util_func = neg_l1_dist

# 2) Initialization
#    a) Initial experiments
X_obs, Y_obs = gen_rand_X(problem, 8), None
Y_obs = problem(X_obs)

#    b) Fit initial outcome GP
outcome_model = fit_outcome_model(X_obs, Y_obs)

#    c) Initial preference data (e.g., 4 comparisons)
Y_pref, pair_indices = generate_random_pref_data(outcome_model, n=4)
pref_model = fit_pref_model(Y_pref, pair_indices)

# 3) BOPE iteration parameters
N_ITER = 5               # total BOPE cycles
BATCH_SIZE = 1           # q
NUM_OUT_SAMPLES = 100    # f fantasies
NUM_PREF_SAMPLES = 10    # g fantasies
NUM_RESTARTS = 8
RAW_SAMPLES = 64
BATCH_LIMIT = 4
d = problem.dim

for it in range(1, N_ITER + 1):
    print(f"\n=== BOPE Iteration {it} ===")

    # 4) Compute current best utility baseline
    with torch.no_grad():
        u_mean = pref_model.posterior(Y_pref).mean.squeeze(-1)
    best_f = u_mean.max()

    # 5) Build LearnedObjective and sampler
    pref_obj = LearnedObjective(pref_model=pref_model, sample_shape=torch.Size([NUM_PREF_SAMPLES]))
    sampler = SobolQMCNormalSampler(sample_shape=torch.Size([NUM_OUT_SAMPLES]))

    # 6) Instantiate qExpectedImprovement for EIUU
    acqf = qExpectedImprovement(
        model=outcome_model,
        best_f=best_f,
        sampler=sampler,
        objective=pref_obj,
        prune_baseline=True,
    )

    # 7) Optimize acquisition to get new X
    bounds = torch.stack([torch.zeros(d), torch.ones(d)])
    X_next, acq_val = optimize_acqf(
        acq_function=acqf,
        bounds=bounds,
        q=BATCH_SIZE,
        num_restarts=NUM_RESTARTS,
        raw_samples=RAW_SAMPLES,
        options={"batch_limit": BATCH_LIMIT},
        sequential=False,
    )
    print(f"Acq value: {acq_val.item():.4f}")

    # 8) Evaluate true function
    Y_next = problem(X_next)
    print(f"Y_next: {Y_next}")

    # 9) Augment outcome data
    X_obs = torch.cat([X_obs, X_next], dim=0)
    Y_obs = torch.cat([Y_obs, Y_next], dim=0)

    # 10) Refit outcome GP
    outcome_model = fit_outcome_model(X_obs, Y_obs)

    # 11) Gather new preference pairs for Y_next
    #     (replace with real DM queries)
    new_pairs = []
    for i in range(Y_next.size(0)):
        j = torch.randint(0, Y_pref.size(0), (1,)).item()
        new_pairs.append([j, Y_pref.size(0) + i])
    new_pairs = torch.tensor(new_pairs, dtype=torch.long)

    # 12) Augment preference data and refit
    Y_pref = torch.cat([Y_pref, Y_next], dim=0)
    pair_indices = torch.cat([pair_indices, new_pairs], dim=0)
    pref_model = fit_pref_model(Y_pref, pair_indices)

print("\nBOPE loop complete.")


RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.