Generate X_pool

In [None]:
import itertools
import csv

values = list(range(0, 201, 10))

with open('x_pool.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['x1', 'x2', 'x3', 'x4', 'x5'])
    # 20^5 种笛卡尔积
    for row in itertools.product(values, repeat=5):
        writer.writerow(row)

print('x_pool.csv generated')

Gerate 192 initail data points using LHS

In [None]:
import pandas as pd
import numpy as np
from skopt.sampler import Lhs
from skopt.space import Space
from scipy.spatial.distance import cdist
import warnings
warnings.filterwarnings('ignore')

sample_num = 192

ion_levels = {
    'Pb': np.linspace(0, 200, 21, dtype=int),
    'Cd': np.linspace(0, 200, 21, dtype=int),
    'Cu': np.linspace(0, 200, 21, dtype=int),
    'Fe': np.linspace(0, 200, 21, dtype=int),
    'K':  np.linspace(0, 200, 21, dtype=int)
}

df_pool = pd.read_csv('/mnt/nas_llm/zzb/cyg/al_test/x_pool.csv')

X_pool = df_pool[['Pb', 'Cd', 'Cu', 'Fe', 'K']].values

print("生成拉丁超立方样本")
space = Space([(0.0, 1.0)] * 5)
sampler = Lhs(criterion='maximin')
lhs_points = sampler.generate(space.dimensions, n_samples=sample_num)
lhs_array = np.array(lhs_points) 

def continuous_to_discrete_point(cont_point, ion_levels):
    """
    将 [0,1]^5 的连续点 映射为 最接近的离散浓度组合
    cont_point: shape (5,) in [0,1]
    返回: [Pb_val, Cd_val, Cu_val, Fe_val, K_val]
    """
    ions = ['Pb', 'Cd', 'Cu', 'Fe', 'K']
    discrete_point = []
    for i, ion in enumerate(ions):
        # 将 [0,1] 映射到实际浓度范围
        min_val, max_val = ion_levels[ion][0], ion_levels[ion][-1]
        real_val = cont_point[i] * (max_val - min_val) + min_val
        # 找到离散水平中最接近的值
        closest = ion_levels[ion][np.argmin(np.abs(ion_levels[ion] - real_val))]
        discrete_point.append(closest)
    return discrete_point

print("映射到离散浓度")
discrete_candidates = []
for point in lhs_array:
    discrete_point = continuous_to_discrete_point(point, ion_levels)
    discrete_candidates.append(discrete_point)

df_candidates = pd.DataFrame(discrete_candidates, columns=['Pb', 'Cd', 'Cu', 'Fe', 'K'])
print(f"生成 {len(df_candidates)} 个目标离散点")

# 找到每个目标点欧氏距离最近邻
print("计算距离")
dist_matrix = cdist(df_candidates.values, X_pool, metric='euclidean')
closest_indices = np.argmin(dist_matrix, axis=1)  # 每行找最近的pool中的索引

# 提取最终选中的行（保持顺序，去重）
unique_indices = list(dict.fromkeys(closest_indices))  # 保持顺序去重
df_initial = df_pool.iloc[unique_indices[:192]].copy()  # 取前n个

print("\n前5个选中的样本：")
print(df_initial.head())

# 保存结果
df_initial.to_csv('initial_192_samples.csv', index=False)
print("\n已保存到 'initial_192_samples.csv'")

Iteratively generate next 16 samples

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# -----------------------------
# 1. Load data
# -----------------------------
train_df = pd.read_csv(r"previous_result.csv", header=None) # wet experiment of previous 192/208/224 samples have been excuted
X_train_np = train_df.iloc[:, 1:6].values.astype(np.float32)
y_train_np = train_df.iloc[:, -3:].values.astype(np.float32)

X_train = torch.tensor(X_train_np).to(device)
y_train = torch.tensor(y_train_np).to(device)

x_mean = X_train.mean(dim=0, keepdim=True)
x_std = X_train.std(dim=0, keepdim=True) + 1e-8
y_mean = y_train.mean(dim=0, keepdim=True)
y_std = y_train.std(dim=0, keepdim=True) + 1e-8

X_train_norm = (X_train - x_mean) / x_std
y_train_norm = (y_train - y_mean) / y_std

# -----------------------------
# 2. def MLP model
# -----------------------------
class MLP(nn.Module):
    def __init__(self, input_dim=5, output_dim=3, hidden_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )
    
    def forward(self, x):
        return self.net(x)

# -----------------------------
# 3. model train (Deep Ensemble)
# -----------------------------
def train_model(X, y, epochs=2000, lr=1e-3):
    model = MLP().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()
    
    for _ in range(epochs):
        optimizer.zero_grad()
        pred = model(X)
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()
    return model

print("Training ensemble models...")
ensemble = []
M = 7
for i in range(M):
    print(f"Training model {i+1}/{M}")
    model = train_model(X_train_norm, y_train_norm, epochs=2500, lr=1e-3)
    model.eval()
    ensemble.append(model)

# -----------------------------
# 4. load and predict x_pool
# -----------------------------
print("Loading x_pool...")

x_pool_df = pd.read_csv("x_pool.csv", header=None)
x_pool_np = x_pool_df.iloc[:, :5].values.astype(np.float32)
x_pool = torch.tensor(x_pool_np).to(device)

x_pool_norm = (x_pool - x_mean) / x_std

# batched to avoid OOM
batch_size = 20000
n_samples = x_pool.shape[0]
all_preds = []

with torch.no_grad():
    for model in ensemble:
        preds = []
        for i in tqdm(range(0, n_samples, batch_size), desc=f"Predicting with model"):
            batch = x_pool_norm[i:i+batch_size]
            out_norm = model(batch)
            out = out_norm * y_std + y_mean  # 反标准化
            preds.append(out.cpu().numpy())
        all_preds.append(np.vstack(preds))


all_preds = np.stack(all_preds, axis=-1)

# calculate uncertainties
uncertainties = np.std(all_preds, axis=-1).sum(axis=1)  # shape: (n_samples,)

# -----------------------------
# 5. chose next 16 samples
# -----------------------------
top_16_idx = np.argsort(uncertainties)[-16:][::-1]
selected_points = x_pool_np[top_16_idx]

feature_names = ['Pb', 'Cd', 'Cu', 'Fe', 'K']
selected_df = pd.DataFrame(selected_points, columns=feature_names)
selected_df.to_csv("next_16_samples_AL_GPU.csv", index=False)
print("✅ Saved next_16_samples_AL_GPU.csv")