In [13]:
import numpy as np
import pandas as pd
from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel, ConstantKernel as C


In [14]:
EPS = 1e-9

def _expected_improvement(mu, sigma, y_best, xi=0.0):
    sigma_safe = sigma + EPS
    imp        = mu - y_best - xi
    z          = imp / sigma_safe
    ei         = imp * norm.cdf(z) + sigma_safe * norm.pdf(z)
    ei[sigma == 0] = 0.0
    return ei

def propose_next_experiment(
    X_train,
    y_train,
    bounds,
    n_candidates=1000,
    xi=0.0,
    random_state=None,
    return_all=False
):
    rng = np.random.default_rng(random_state)
    cols = list(bounds)
    d = len(cols)
    if X_train.shape[1] != d:
        raise ValueError("X_train dimension does not match number of bounds")

    # Step 1: Monte‑Carlo sampling in parameter space
    cand_mat = np.column_stack([
        rng.uniform(lo, hi, n_candidates) for lo, hi in bounds.values()
    ])
    cand_df = pd.DataFrame(cand_mat, columns=cols)

    # Step 2: Fit Gaussian Process surrogate
    kernel = (
        C(1.0, (1e-3, 1e3))
        * RBF(length_scale=np.ones(d), length_scale_bounds=(1e-2, 1e2))
        + WhiteKernel(noise_level=1e-6, noise_level_bounds=(1e-10, 1e-2))
    )
    gp = GaussianProcessRegressor(
    kernel=kernel,
    n_restarts_optimizer=5,
    alpha=0.0,
    normalize_y=True,
    random_state=42,  
)
    gp.fit(X_train, y_train)
    
    mu, sigma = gp.predict(cand_mat, return_std=True)

    # Step 3: Calculate Expected Improvement (EI)
    y_best = y_train.max()
    ei = _expected_improvement(mu, sigma, y_best, xi=xi)
    best_idx = np.argmax(ei)

    best_params = cand_df.iloc[best_idx].to_dict()
    best_ei = ei[best_idx]

    if return_all:
        cand_df = cand_df.assign(mu=mu, sigma=sigma, EI=ei)
        return cand_df, best_params, best_ei
    return best_params, best_ei


In [15]:

df = pd.read_csv("data/aryltrifluoromethylation_cytosine.csv")


key_cols = [
    "Cytosine_Conc_(M)",
    "CF3SO2Na_loading_(equiv.)",
    "(NH4)2S2O8_loading_(equiv.)",
    "Residence_time_(min)",
    "Light_intensity_(W)",
]

# random select 10 rows as initial experiment data
rng = np.random.default_rng(42)
seed_idx = rng.choice(len(df), size=10, replace=False)
      
X_train = df.loc[seed_idx, key_cols].to_numpy()
y_train = df.loc[seed_idx, "Yield_(%)"].to_numpy() # when get the real y, replace y_train with the real y

# define the bounds of the parameter space
bounds = {c: (df[c].min(), df[c].max()) for c in key_cols}


In [16]:
def explore_exploit_score(mu, sigma,
                          mu_min, mu_max,
                          sigma_min, sigma_max):
    """
    mu: predicted mean
    sigma: predicted standard deviation
    mu_min: minimum predicted mean
    mu_max: maximum predicted mean
    sigma_min: minimum predicted standard deviation
    sigma_max: maximum predicted standard deviation
    """
    mu_norm    = (mu    - mu_min   ) / (mu_max    - mu_min    + 1e-12)
    sigma_norm = (sigma - sigma_min) / (sigma_max - sigma_min + 1e-12)
    return mu_norm - sigma_norm


In [17]:
def run_experiment(x, mode='random', true_func=None):
    if mode == 'random':
        return float(np.random.uniform(60, 85))   # by default, the yield is between 60 and 85
    elif mode == 'manual':
        return float(input(f"Input real yield for {x}: "))
    elif mode == 'simulate' and true_func is not None:
        noise = np.random.normal(0, 2)
        return float(true_func(x) + noise)
    else:
        raise ValueError("Invalid mode or missing true_func!")

In [18]:
# X_loop: first 10 rows, y_loop: corresponding yields
# bounds: dict of (min, max) for each variable
X_loop = X_train[:10].copy()
y_loop = y_train[:10].copy()

cand_df, next_params, next_ei = propose_next_experiment(
    X_loop, y_loop, bounds, random_state=42, return_all=True)

best_idx   = cand_df['EI'].idxmax()
best_mu    = cand_df.loc[best_idx, 'mu']
best_sigma = cand_df.loc[best_idx, 'sigma']

score = explore_exploit_score(
    best_mu, best_sigma,
    mu_min=cand_df['mu'].min(),  mu_max=cand_df['mu'].max(),
    sigma_min=cand_df['sigma'].min(), sigma_max=cand_df['sigma'].max()
)

print("Step 1")
for k, v in next_params.items():
    print(f"  {k:<32}: {v:.4g}")
print(f"  Pred μ={best_mu:.2f}, σ={best_sigma:.2f}, EI={next_ei:.2f}")
print(f"  Explore–Exploit score: {score:+.2f}")

# run experiment (replace with actual lab call)
y_new = run_experiment(np.array(list(next_params.values())), mode='random')
print(f"  Experiment yield: {y_new:.2f}\n")

# update training set
new_x = np.array([[next_params[c] for c in bounds]])  # shape (1, d)
X_loop = np.vstack([X_loop, new_x])
y_loop = np.append(y_loop, y_new)

Step 1
  Cytosine_Conc_(M)               : 0.06181
  CF3SO2Na_loading_(equiv.)       : 2.239
  (NH4)2S2O8_loading_(equiv.)     : 0.8799
  Residence_time_(min)            : 4.774
  Light_intensity_(W)             : 174.6
  Pred μ=55.82, σ=6.47, EI=1.21
  Explore–Exploit score: +0.39
  Experiment yield: 80.90



