In [1]:
import torch, botorch, gpytorch
print("Torch:", torch.__version__)
print("BoTorch:", botorch.__version__)
print("GPyTorch:", gpytorch.__version__)
from botorch.acquisition.logei import qLogNoisyExpectedImprovement
print("qLogNoisyExpectedImprovement import OK")


Torch: 2.3.0+cu118
BoTorch: 0.8.5
GPyTorch: 1.10


ModuleNotFoundError: No module named 'botorch.acquisition.logei'

In [5]:
# bo_logei_script.py
import os, sys, time, csv
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, List

import torch
from torch import Tensor

# ---- BoTorch / GPyTorch ----
# --- replace your imports (위쪽 블록) ---
# ---- imports (위쪽 일부 교체) ----
from botorch.models import SingleTaskGP
from botorch.models.transforms import Standardize, Normalize
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood

from botorch.acquisition.monte_carlo import qUpperConfidenceBound, qNoisyExpectedImprovement
from botorch.optim import optimize_acqf

# Sampler import: version-agnostic
try:
    from botorch.sampling.samplers import SobolQMCNormalSampler
except Exception:
    from botorch.sampling.normal import SobolQMCNormalSampler  # newer path

# --- logEI 호환 계층: 새 버전이면 진짜 qLogNEI, 아니면 log(qNEI+eps) 래퍼 ---
from botorch.acquisition.acquisition import AcquisitionFunction
import torch




In [4]:


# =========================
# User Config (EDIT HERE)
# =========================
SEED = 123
MAX_ITERS = 20                 # 총 이터레이션 수
N_RANDOM = 5                   # 초기 랜덤 탐색 이터레이션
N_UCB = 5                      # 그 다음 UCB 이터레이션
UCB_BETA = 0.25                # qUCB beta
OBJECTIVE_SENSE = "min"        # "min" or "max"
LOG_CSV = "bo_log.csv"         # 로그 파일 경로

# 파라미터 범위 (예시) → 꼭 실험 범위로 수정
PBONDS: Dict[str, Tuple[float, float]] = {
    "pressure"     : (250.0, 450.0),   # kPa 등
    "velocity"     : (2.0, 40.0),      # mm/s
    "wall_spacing" : (0.20, 1.00),     # mm
    "layer_spacing": (0.10, 0.60),     # mm
}

# =========================
# Helpers
# =========================
torch.manual_seed(SEED)
tkwargs = {"dtype": torch.double, "device": "cpu"}

PARAMS = list(PBONDS.keys())
LB = torch.tensor([PBONDS[p][0] for p in PARAMS], **tkwargs)
UB = torch.tensor([PBONDS[p][1] for p in PARAMS], **tkwargs)
BOUNDS = torch.stack([LB, UB])  # shape [2, d]
D = len(PARAMS)


def ensure_csv(path: str):
    """Create CSV with header if missing."""
    if not Path(path).exists():
        with open(path, "w", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            header = ["iter", "timestamp"] + PARAMS + ["objective_raw", "objective_for_BO", "acquisition"]
            w.writerow(header)


def log_row(path: str, iteration: int, x: Tensor, y_raw: float, y_bo: float, acq_name: str):
    """Append one row to CSV."""
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    x_list = [float(v) for v in x.view(-1).tolist()]
    row = [iteration, ts] + x_list + [float(y_raw), float(y_bo), acq_name]
    with open(path, "a", newline="", encoding="utf-8") as f:
        csv.writer(f).writerow(row)


def sample_random(n: int = 1) -> Tensor:
    """Uniform random sample within bounds."""
    u = torch.rand(n, D, **tkwargs)
    return LB + (UB - LB) * u

def build_sampler(N: int) -> "SobolQMCNormalSampler":
    """BoTorch version-agnostic sampler builder."""
    try:
        # newer BoTorch
        from botorch.sampling.normal import SobolQMCNormalSampler as _Sampler
        return _Sampler(sample_shape=torch.Size([N]))
    except Exception:
        # older BoTorch paths/args
        try:
            from botorch.sampling.samplers import SobolQMCNormalSampler as _Sampler
        except Exception:
            from botorch.sampling.normal import SobolQMCNormalSampler as _Sampler
        try:
            return _Sampler(num_samples=N)
        except TypeError:
            return _Sampler(sample_shape=torch.Size([N]))


def ask_user_evaluate(x: Tensor) -> float:
    """Print x and ask user for objective value."""
    print("\n--- Evaluate Candidate ---")
    for i, p in enumerate(PARAMS):
        print(f"{p:>14s} : {float(x[i]):.6g}")
    while True:
        s = input("Enter objective value (float): ").strip()
        try:
            return float(s)
        except ValueError:
            print("⚠️  Not a float. Try again.")


def y_for_bo(y_raw: float) -> float:
    """Convert raw objective to 'maximize' convention for BoTorch."""
    if OBJECTIVE_SENSE.lower().startswith("min"):
        return -float(y_raw)  # minimize → maximize by sign flip
    return float(y_raw)


def fit_model(train_X: Tensor, train_Y: Tensor) -> SingleTaskGP:
    """Fit SingleTaskGP with normalization/standardization."""
    model = SingleTaskGP(
        train_X,
        train_Y,
        input_transform=Normalize(d=D),
        outcome_transform=Standardize(m=1),
    )
    mll = ExactMarginalLogLikelihood(model.likelihood, model)
    fit_gpytorch_mll(mll)
    return model


def next_via_ucb(model: SingleTaskGP, q: int = 1) -> Tensor:
    sampler = build_sampler(512)
    acq = qUpperConfidenceBound(model, beta=UCB_BETA, sampler=sampler)
    cand, _ = optimize_acqf(
        acq_function=acq,
        bounds=BOUNDS,
        q=q,
        num_restarts=10,
        raw_samples=256,
        options={"batch_limit": 5, "maxiter": 200},
    )
    return cand.detach()

def build_lognei(model, X_baseline, num_samples: int = 1024, eps: float = 1e-12):
    try:
        from botorch.acquisition.logei import qLogNoisyExpectedImprovement
        sampler = build_sampler(num_samples)
        return qLogNoisyExpectedImprovement(model=model, X_baseline=X_baseline, sampler=sampler)
    except Exception:
        from botorch.acquisition.monte_carlo import qNoisyExpectedImprovement
        from botorch.acquisition.acquisition import AcquisitionFunction

        sampler = build_sampler(num_samples)
        base_nei = qNoisyExpectedImprovement(model=model, X_baseline=X_baseline, sampler=sampler)

        class LogOfqNEI(AcquisitionFunction):
            def __init__(self, base, eps: float = 1e-12):
                super().__init__(model=base.model)
                self.base = base; self.eps = eps
            def forward(self, X: torch.Tensor) -> torch.Tensor:
                return torch.log(self.base(X).clamp_min(self.eps))

        return LogOfqNEI(base_nei, eps=eps)


def next_via_lognei(model: SingleTaskGP, train_X: torch.Tensor, train_Y: torch.Tensor, q: int = 1) -> torch.Tensor:
    acq = build_lognei(model=model, X_baseline=train_X, num_samples=1024)
    cand, _ = optimize_acqf(
        acq_function=acq,
        bounds=BOUNDS,
        q=q,
        num_restarts=15,
        raw_samples=256,
        options={"batch_limit": 5, "maxiter": 200},
    )
    return cand.detach()



def tensorize_logs(path: str) -> Tuple[Tensor, Tensor]:
    """Load existing CSV (if any) to X,Y tensors for warm start."""
    if not Path(path).exists():
        return None, None
    import pandas as pd
    df = pd.read_csv(path)
    if df.empty:
        return None, None
    X = torch.tensor(df[PARAMS].values, **tkwargs)
    Y_bo = torch.tensor(df["objective_for_BO"].values.reshape(-1, 1), **tkwargs)
    return X, Y_bo


def main():
    ensure_csv(LOG_CSV)

    # Warm-start from existing log (optional)
    X_all, Y_all = tensorize_logs(LOG_CSV)
    if X_all is None:
        X_all = torch.empty(0, D, **tkwargs)
        Y_all = torch.empty(0, 1, **tkwargs)
        iter_start = 1
    else:
        iter_start = int(X_all.shape[0]) + 1
        print(f"✅ Resuming from {LOG_CSV}: {iter_start-1} rows loaded.")

    # BO Loop
    for it in range(iter_start, MAX_ITERS + 1):
        if it <= N_RANDOM:
            acq_name = "RANDOM"
            x_next = sample_random(1).squeeze(0)
        else:
            # need at least 2 points to fit GP sensibly
            if X_all.shape[0] < 2:
                x_next = sample_random(1).squeeze(0)
                acq_name = "RANDOM"
            else:
                # Fit model on current data
                try:
                    model = fit_model(X_all, Y_all)
                except Exception as e:
                    print(f"⚠️ GP fit failed ({e}). Fallback to RANDOM.")
                    x_next = sample_random(1).squeeze(0)
                    acq_name = "RANDOM"
                else:
                    if it <= N_RANDOM + N_UCB:
                        acq_name = f"qUCB(beta={UCB_BETA})"
                        x_next = next_via_ucb(model).squeeze(0)
                    else:
                        acq_name = "qLogNEI"  # logEI phase
                        x_next = next_via_lognei(model, X_all, Y_all).squeeze(0)

        # Evaluate (user input)
        y_raw = ask_user_evaluate(x_next)
        y_bo = y_for_bo(y_raw)

        # Update data
        X_all = torch.cat([X_all, x_next.view(1, -1)], dim=0)
        Y_all = torch.cat([Y_all, torch.tensor([[y_bo]], **tkwargs)], dim=0)

        # Log to CSV
        log_row(LOG_CSV, it, x_next, y_raw, y_bo, acq_name)
        print(f"📎 Logged iter {it} ({acq_name}) → raw={y_raw:.6g}  for_BO={y_bo:.6g}")

    print("\n🎉 BO finished.")
    print(f"Log saved to: {Path(LOG_CSV).resolve()}")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\nInterrupted by user.")
        sys.exit(0)



--- Evaluate Candidate ---
      pressure : 323.779
      velocity : 2.50791
  wall_spacing : 0.673424
 layer_spacing : 0.14632
Enter objective value (float): 11
📎 Logged iter 1 (RANDOM) → raw=11  for_BO=-11

--- Evaluate Candidate ---
      pressure : 344.49
      velocity : 21.8372
  wall_spacing : 0.684066
 layer_spacing : 0.365648
Enter objective value (float): 15
📎 Logged iter 2 (RANDOM) → raw=15  for_BO=-15

--- Evaluate Candidate ---
      pressure : 439.709
      velocity : 4.15367
  wall_spacing : 0.86286
 layer_spacing : 0.146902
Enter objective value (float): 12
📎 Logged iter 3 (RANDOM) → raw=12  for_BO=-12

--- Evaluate Candidate ---
      pressure : 289.193
      velocity : 10.3728
  wall_spacing : 0.355701
 layer_spacing : 0.307292
Enter objective value (float): 13
📎 Logged iter 4 (RANDOM) → raw=13  for_BO=-13

--- Evaluate Candidate ---
      pressure : 398.614
      velocity : 14.3575
  wall_spacing : 0.693765
 layer_spacing : 0.191451
Enter objective value (float): 16

TypeError: __init__() missing 1 required positional argument: 'sample_shape'

In [6]:
import botorch, torch, gpytorch
print("BoTorch:", botorch.__version__)
print("Torch:", torch.__version__)
print("GPyTorch:", gpytorch.__version__)


BoTorch: 0.8.5
Torch: 2.3.0+cu118
GPyTorch: 1.10


In [13]:
pip install --upgrade botorch gpytorch


Collecting gpytorch
  Using cached gpytorch-1.13-py3-none-any.whl (277 kB)
Note: you may need to restart the kernel to use updated packages.


