In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import StandardScaler

from botorch.models import SingleTaskGP, ModelListGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood

from scipy.spatial import Delaunay
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error

from botorch.acquisition.multi_objective.monte_carlo import qExpectedHypervolumeImprovement
from botorch.utils.multi_objective.pareto import is_non_dominated
from botorch.utils.multi_objective.hypervolume import Hypervolume
from botorch.utils.multi_objective.box_decompositions import NondominatedPartitioning
from botorch.sampling.normal import SobolQMCNormalSampler

from botorch.utils.transforms import unnormalize

In [None]:
def x_normalizer(X, var_array):
    
    def max_min_scaler(x, x_max, x_min):
        return (x-x_min)/(x_max-x_min)
    x_norm = []
    for x in (X):
           x_norm.append([max_min_scaler(x[i], 
                                         max(var_array[i]), 
                                         min(var_array[i])) for i in range(len(x))])
            
    return x_norm

def x_denormalizer(x_norm, var_array):
    
    def max_min_rescaler(x, x_max, x_min):
        return x*(x_max-x_min)+x_min
    x_original = []
    for x in (x_norm):
           x_original.append([max_min_rescaler(x[i], 
                                         max(var_array[i]), 
                                         min(var_array[i])) for i in range(len(x))])
            
    return x_original

def get_closest_value(given_value, array_list):
    absolute_difference_function = lambda list_value : abs(list_value - given_value)
    closest_value = min(array_list, key=absolute_difference_function)
    return closest_value
    
def get_closest_array(suggested_x, var_list):
    modified_array = []
    for x in suggested_x:
        modified_array.append([get_closest_value(x[i], var_list[i]) for i in range(len(x))])
    return np.array(modified_array)

In [None]:
def make_linspace(start, stop, step):
    num_points = int(round((stop - start) / step)) + 1
    return np.round(np.linspace(start, stop, num_points), 4)  # round for cleaner floats

speed_inorg_var   = make_linspace(0.25, 0.55, 0.01)  # m/min
speed_org_var     = make_linspace(0.25, 0.55, 0.01)  # m/min
inkFL_inorg_var   = make_linspace(80, 200, 1)       # uL/min
inkFL_org_var     = make_linspace(100, 280, 1)      # uL/min
conc_inorg_var    = make_linspace(0.8, 1.6, 0.05)     # M
conc_org_var      = make_linspace(0.4, 1.2, 0.05)     # M
humidity_var      = make_linspace(11, 44, 1)        # %RH
temp_var          = make_linspace(20, 65, 5)        # °C

# Tracking unique values
speed_inorg_num   = len(speed_inorg_var)
speed_org_num     = len(speed_org_var)
inkFL_inorg_num   = len(inkFL_inorg_var)
inkFL_org_num     = len(inkFL_org_var)
conc_inorg_num    = len(conc_inorg_var)
conc_org_num      = len(conc_org_var)
humidity_num      = len(humidity_var)
temp_num          = len(temp_var)

# Pack into var_array for downstream normalization
var_array = [speed_inorg_var, speed_org_var, 
             inkFL_inorg_var, inkFL_org_var,
             conc_inorg_var, conc_org_var,
             humidity_var, temp_var]

x_labels = ['Speed (Inorg) [m/min]', 'Speed (Org) [m/min]', 
            'inkFL (Inorg) [uL/min]', 'inkFL (Org) [uL/min]',
            'Conc. (Inorg) [M]', 'Conc. (Org) [M]',
            'RH [%]', 'Temp [C]']

In [None]:
# parameter_space = ParameterSpace([ContinuousParameter('x1', 0, 1),
#                                   ContinuousParameter('x2', 0, 1),
#                                   ContinuousParameter('x3', 0, 1),
#                                   ContinuousParameter('x4', 0, 1),
#                                   ContinuousParameter('x5', 0, 1),
#                                   ContinuousParameter('x6', 0, 1),
#                                   ContinuousParameter('x7', 0, 1),
#                                   ContinuousParameter('x8', 0, 1),
#                                  ])

## why is the parameter space different in the round 1 of the Joule code?

In [None]:
def check_clausius_clapeyron(humidity_vals, temp_vals):
    """
    Return boolean mask for valid points satisfying Clausius-Clapeyron constraints.
    Simplified: Ensures RH and Temp do not imply >100% saturation.
    """
    # Saturation vapor pressure approximation (T in Celsius)
    # Tetens formula: es(T) = 0.6108 * exp((17.27*T)/(T + 237.3)) [kPa]
    # RH = (actual / saturation) * 100
    # Check that vapor pressure does not exceed saturation vapor pressure
    valid_mask = []
    for RH, T in zip(humidity_vals, temp_vals):
        es = 0.6108 * np.exp((17.27 * T) / (T + 237.3))
        e = RH / 100.0 * es  # approximate check
        valid_mask.append(e <= es)
    return np.array(valid_mask)

In [None]:
#Load your real LHS experimental results (replace with real path)
dataset_final = pd.read_csv('path/to/your/real_lhs_results.csv')

#Extract input features (first 8 columns assumed to be process parameters) ===
X = dataset_final.iloc[:, 0:8].values  # shape: (8, 8)

#Extract targets
y = dataset_final[['PCE', 'Stability', 'Repeatability']].values  # shape: (8, 3)

#Standardize input and output
X_scaled = x_normalizer(X, var_array)
Y_scaled = StandardScaler().fit_transform(y) ## is this the right way to normalize this? do i need to normalize this?

#Convert to torch tensors
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_X = torch.tensor(X_scaled, dtype=torch.float32, device=device)
train_Y = torch.tensor(Y_scaled, dtype=torch.float32, device=device)

#Set seeds for reproducibility
np.random.seed(0)
torch.manual_seed(0)


In [None]:
# # compute pareto front
ref_pt = np.array([min(dataset_LHS['Jsc'].values), min(dataset_LHS['Voc'].values), min(dataset_LHS['FF'].values)])
# hv = Hypervolume(ref_point=ref_pt)

# is_feas = (train_Y <= 0).all(dim=-1)
# feas_train_obj = train_Y[is_feas]
# if feas_train_obj.shape[0] > 0:
#     pareto_mask = is_non_dominated(feas_train_obj)
#     pareto_y = feas_train_obj[pareto_mask]
#     # compute hypervolume
#     volume = hv.compute(pareto_y)
# else:
#     volume = 0.0

# compute pareto front
current_pareto_mask = is_non_dominated(train_Y)
current_pareto_y = train_Y[current_pareto_mask]
current_pareto_ind = []
for ind, boo in enumerate(current_pareto_mask.tolist()):
    if boo == True:
        current_pareto_ind.append(indices_collected[ind]) 

current_non_pareto_mask = ~current_pareto_mask
current_non_pareto_y = np.array(y_train[current_non_pareto_mask])

hv = Hypervolume(ref_point= torch.tensor(data = ref_pt))
volume = np.round(hv.compute(current_pareto_y), 2)

In [None]:
#copied from SIPS code for Pareto visiualization, may need to modify later
tri = Delaunay(current_pareto_y)

plt.figure(figsize=(10,10))

ax = plt.axes(projection='3d')

ax.scatter3D(np.array(current_pareto_y)[:,0], np.array(current_pareto_y)[:,1], np.array(current_pareto_y)[:,2], c = 'g', s = 10, depthshade = False, label='Pareto optimal')

ax.plot_trisurf(np.array(current_pareto_y)[:,0], np.array(current_pareto_y)[:,1], np.array(current_pareto_y)[:,2], triangles = tri.simplices, color ='g', alpha = 0.15, linewidth=0.2, antialiased=False)


ax.scatter3D(current_non_pareto_y[:,0], current_non_pareto_y[:,1], current_non_pareto_y[:,2], c = 'b', s = 10, depthshade = False, label='NOT Pareto optimal')
ax.scatter3D(np.array(y_pool)[:,0], np.array(y_pool)[:,1], np.array(y_pool)[:,2], c = 'gray', s = 10, alpha = 0.10, depthshade = False, label='NOT discovered')


ax.scatter3D(ref_pt[0], ref_pt[1], ref_pt[2], c = 'r', s = 20, depthshade = False, label='Reference Pt')




ax.set_xlabel('Jsc', fontsize=16)
ax.set_ylabel('Voc', fontsize=16)
ax.set_zlabel('FF', fontsize=16)

plt.title('Batch ' +str(b)+' | ' + 'Undiscovered:'+str(len(indices_pool)) +' | ' + 'Collected:'+str(len(indices_collected)) +' | ' + 'Pareto front: '+str(percent)+'%' +' | ' + 'HyperVol: '+str(volume), fontsize = 14)

plt.legend(numpoints=1, fontsize = 14, loc = 'center left')

ax.view_init(25, -61)

plt.savefig('SIPS_Batch'+str(b)+'.png')

In [None]:
gp_models = []
for i in range(train_Y.shape[1]):

    likelihood = gpytorch.likelihoods.GaussianLikelihood()
    gp = SingleTaskGP(train_X, train_Y[:, i:i+1])
    mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
    fit_gpytorch_mll(mll)
    gp_models.append(gp)

# Wrap the models into a multi-output model
model = ModelListGP(*gp_models)

In [None]:
model.eval()
with torch.no_grad():
    preds_std = torch.cat([gp.posterior(train_X).mean for gp in model.models], dim=-1)  # shape (N, 3)
preds = Y_scaler.inverse_transform(preds_std.cpu().numpy())  # shape (N, 3)
targets = Y_scaler.inverse_transform(train_Y.cpu().numpy())

labels = ['PCE', 'Stability', 'Repeatability']

for i in range(3):
    y_true = targets[:, i]
    y_pred = preds[:, i]

    plt.figure(figsize=(6, 6))
    ax = plt.gca()
    ax.scatter(y_true, y_pred, c='blue', alpha=0.7, s=50)
    ax.plot([min(y_true), max(y_true)], [min(y_true), max(y_true)], 'k--', lw=2)

    r2 = r2_score(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)

    ax.set_xlabel(f'True {labels[i]}', fontsize=14)
    ax.set_ylabel(f'Predicted {labels[i]}', fontsize=14)
    ax.set_title(f'Parity Plot: {labels[i]}', fontsize=16)
    ax.text(0.05, 0.9, f'$R^2$ = {r2:.3f}\\nMAE = {mae:.3f}\\nRMSE = {rmse:.3f}',
            transform=ax.transAxes, fontsize=12, verticalalignment='top')
    ax.set_aspect('equal', 'box')
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:
# Construct partitioning

partitioning = NondominatedPartitioning(ref_point=torch.tensor(ref_pt, device=device), Y=train_Y) #only need this for EHVI, built in to NEHVI
# partitioning = NondominatedPartitioning(ref_point=torch.tensor([1.1, 1.1, 1.1], device=device),  # in standardized space
#                                         Y=train_Y)

# Create acquisition function
sampler = SobolQMCNormalSampler(num_samples=128)
acq_func = qNoisyExpectedHypervolumeImprovement(
    model=model,
    ref_point=ref_pt,
    X_baseline=train_X,
    #partitioning=partitioning,
    sampler=sampler,
    prune_baseline=True,
    objective=IdentityMCMultiOutputObjective(outcomes=[0, 1, 2])
)

In [None]:
NUM_RESTARTS = 20
RAW_SAMPLES = 512

MAX_ATTEMPTS = 100
valid_candidates = None
attempts = 0

while attempts < MAX_ATTEMPTS:
    attempts += 1

    #Optimize acquisition function in normalized space
    candidates, _ = optimize_acqf(
        acq_function=acq_func,
        bounds=torch.tensor([[0.0] * 8, [1.0] * 8], device=device),
        q=5,
        num_restarts=20,
        raw_samples=512,
        options={"batch_limit": 5, "maxiter": 200},
    )

    #Convert to original space and snap to valid grid
    X_cont = x_denormalizer(unnormalize(candidates, bounds).cpu().numpy(), var_array)
    X_snapped = get_closest_array(X_cont, var_array)

    #Check CC constraint
    RH_vals = X_snapped[:, 6]   # RH
    Temp_vals = X_snapped[:, 7] # Temp
    cc_mask = check_clausius_clapeyron(RH_vals, Temp_vals)

    if cc_mask.all():
        valid_candidates = X_snapped
        print(f"Found valid candidates after {attempts} attempts.")
        break

print("Final candidate batch:" if valid_candidates is not None else "No valid batch found.")

In [None]:
X_new_norm = torch.tensor(x_normalizer(X_snapped, var_array), dtype=torch.float32, device=device)
model.eval()
with torch.no_grad():
    Y_new_std = torch.cat([gp.posterior(X_new_norm).mean for gp in model.models], dim=-1)
Y_new = Y_scaler.inverse_transform(Y_new_std.cpu().numpy())
df_new_batch = pd.DataFrame(X_snapped, columns=x_labels)
df_new_batch[['PCE', 'Stability', 'Repeatability']] = Y_new


In [None]:
#copied from old emukit code. this will eventually plot target variable vs process condition...

f_obj =  objective_model.model.predict       

device_eff = perov_df['PCE']

fig = plt.figure(figsize = (9, 9), dpi=150)
ax = fig.add_subplot(111)

fs = 20
exp_cond = np.arange(device_eff.size)
#print(exp_cond)
exp_eff = np.transpose(device_eff)

ax.scatter(exp_cond, exp_eff, #facecolor = 'none',
            edgecolor = 'navy', s = 20, alpha = 0.6, label = 'experiment')


all_cond = device_eff

X_sorted = x_normalizer(perov_df.iloc[:, 0:-1].values)
y_pred, y_uncer = f_obj(X_sorted)
y_pred = -y_pred[:,-1]
y_uncer = np.sqrt(y_uncer[:, -1])

ax.scatter(np.arange(len(X_sorted)), y_pred,
                s = 50, facecolors='none', alpha = 0.6, edgecolor = 'red', label = 'predicted')
ax.errorbar(np.arange(len(X_sorted)), y_pred, yerr = y_uncer,  
                 ms = 0, ls = '', capsize = 2, alpha = 0.6, 
                 color = 'red', zorder = 0)


ax.scatter(np.arange(len(X_new))+len(exp_cond), y_pred_new,
                s = 50, facecolors='none', alpha = 0.6, edgecolor = 'darkgreen', label = 'suggested')
ax.errorbar(np.arange(len(X_new))+len(exp_cond), y_pred_new, yerr = y_uncer_new,  
                 ms = 0, ls = '', capsize = 2, alpha = 0.6, 
                 color = 'darkgreen', zorder = 0)


ax.set_ylabel('Current Best Efficiency', fontsize = 20)
ax.set_xlabel('Process Condition', fontsize = 20)

ax.set_ylim(-1, 25)
ax.set_xlim(-1, 48)
ax.set_xticks(np.arange(0,41,10))
ax.legend(fontsize = fs*0.7)

ax.tick_params(direction='in', length=5, width=1, labelsize = fs*.8, grid_alpha = 0.5)
ax.grid(True, linestyle='-.')
plt.subplots_adjust(wspace = 0.4)
plt.legend(fontsize = fs*0.7)
plt.show()
