Packages

In [None]:
import pandas as pd
import torch
import numpy as np
from sklearn.model_selection import KFold
from sklearn.preprocessing import MinMaxScaler, StandardScaler 
from sklearn.metrics import r2_score, mean_absolute_error
from gpytorch.mlls import ExactMarginalLogLikelihood
import gpytorch
import altair as alt
import matplotlib.pyplot as plt
import re
from scipy.stats import norm
import os
from sklearn.cluster import KMeans
from torch.distributions import Normal

Utility

In [None]:
def is_bin_column(col) -> bool:
    """
    BIN recognition
    """
    if isinstance(col, (int, np.integer)):
        return True

    s = str(col)
    if s.isdigit():                 # '0', '1', ...
        return True
    if re.fullmatch(r"bin_\d+", s): # 'bin_0', 'bin_1', ...
        return True
    return False

GP Model

In [None]:
class ExactGPModel(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(ExactGPModel, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

def train_gp(xt_train, yt_train, training_iterations=100):
    likelihood = gpytorch.likelihoods.GaussianLikelihood()
    model = ExactGPModel(xt_train, yt_train, likelihood)

    model.train()
    likelihood.train()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.2)
    mll = ExactMarginalLogLikelihood(likelihood, model)

    losses = []

    for i in range(training_iterations):
        optimizer.zero_grad()
        output = model(xt_train)
        loss = -mll(output, yt_train)
        losses.append(loss.item())
        loss.backward()
        optimizer.step()

    return model.eval(), likelihood.eval(), losses

Data & Preprocessing

In [None]:
dft_data = pd.read_csv("/Users/danielbock/MASTERTHESIS/MASTA/DataArchiv/dft_data_temp_pressure_swingswingswing.csv")
Vext_data = pd.read_csv("/Users/danielbock/MASTERTHESIS/MASTA/DataArchiv/Vext_allTEMP_hist_no_pressure_no_chem_20b_swing.csv")

data = pd.merge(dft_data, Vext_data, 'inner', on=["structure_name", "temperature_kelvin"])
feature_columns = [col for col in data.columns if is_bin_column(col)]

data = data[data.beladung_mol_per_kg > 0]
data = data[(data.temperature_kelvin == 298) & (data.pressure_bar == 0.1)]

Feature & Label customization

In [None]:
data["beladung_pro_vol"] = (
    data["beladung_atoms"]
    #.div(data["density_bulk"], axis=0)
    .div(data["volume_kubAng"], axis=0)
)

data[feature_columns] = (
    data[feature_columns]
    .multiply(data["grid.dv"], axis=0)
    .div(data["volume_kubAng"], axis=0)
)

Normalize

In [None]:
normalize_feature = True
normalize_labels = True

Fold & Training & Prediction

In [None]:
kf = KFold(n_splits=10, shuffle=True, random_state=42)

label = "beladung_pro_vol"
X = data[feature_columns].values 
y = data[label].values 

ids = data.index.values

split_info = []

for fold, (train_idx, test_idx) in enumerate(kf.split(X), start=1):
    x_train = torch.tensor(X[train_idx], dtype=torch.float64)
    y_train = torch.tensor(y[train_idx], dtype=torch.float64)
    x_test = torch.tensor(X[test_idx], dtype=torch.float64)
    y_test = torch.tensor(y[test_idx], dtype=torch.float64)

    train_ids = ids[train_idx]
    test_ids = ids[test_idx]

    test_df = data.iloc[test_idx].copy()
    test_df["fold"] = fold

    if normalize_feature:
        feature_transformer = MinMaxScaler()
        feature_transformer.fit(x_train)
        xt_train = torch.tensor(feature_transformer.transform(x_train), dtype=torch.float64)
        xt_test = torch.tensor(feature_transformer.transform(x_test), dtype=torch.float64) #*2
    else:
        xt_train = x_train
        xt_test = x_test

    if normalize_labels:
        label_transformer = MinMaxScaler()  # oder StandardScaler()
        label_transformer.fit(y_train.unsqueeze(1))
        yt_train = torch.tensor(label_transformer.transform(y_train.unsqueeze(1)).flatten(), dtype=torch.float64)
        yt_test = torch.tensor(label_transformer.transform(y_test.unsqueeze(1)).flatten(), dtype=torch.float64)
    else:
        yt_train = y_train
        yt_test = y_test

    # Training
    model, likelihood, losses = train_gp(xt_train, yt_train, training_iterations=200)

    # Prediction
    with torch.no_grad():
        prediction = model(xt_test)
        inverse_transformed_prediction = label_transformer.inverse_transform(
            prediction.mean.unsqueeze(1)
        ).squeeze()
        inverse_transformed_prediction = np.where(
            inverse_transformed_prediction > 0, inverse_transformed_prediction, 0
        )

    test_df[f"{label}_pred"] = inverse_transformed_prediction
    test_df["abs_rel_deviation"] = np.abs(
        (test_df[label] - test_df[f"{label}_pred"]) / test_df[label] * 100
    )

    split_info.append(test_df)

results = pd.concat(split_info, ignore_index=True)
r2 = r2_score(results[label], results[f"{label}_pred"])
mae = mean_absolute_error(results[label], results[f"{label}_pred"])
median_ape = results["abs_rel_deviation"].median()

results["R2"] = r2
results["MAE"] = mae
results["Median_APE_percent"] = median_ape

Results

In [None]:
print(f"R²                        : {r2_score(results[label], results[f'{label}_pred']):.4f}")
print(f"MAE                       : {mean_absolute_error(results[label], results[f'{label}_pred']):.4f}")
print(f"Median APE                : {results['abs_rel_deviation'].median():.2f}%")
print(f"Mean APE                  : {results['abs_rel_deviation'].mean():.2f}%")
print(f"Final Loss                : {losses[-1]:.4f}")

count = (results['abs_rel_deviation'] > 20).sum()
print(f"Abs rel dev > 20%         : {count} out of {len(results)}")
print(f"Max abs rel dev           : {results['abs_rel_deviation'].max():.2f}%")

Result by Fold

In [None]:
for fold, group in results.groupby("fold"):
    print(f"\nFold {fold}")
    print(f"R²           : {r2_score(group[label], group[f'{label}_pred']):.4f}")
    print(f"Median APE   : {group['abs_rel_deviation'].median():.2f}%")
    print(f"Mean APE     : {group['abs_rel_deviation'].mean():.2f}%")
    print(f"Max ARD      : {group['abs_rel_deviation'].max():.2f}%")
    print(f"Final Loss   : {losses[-1]:.4f}")

Acquisition Function

In [None]:
def log_expected_improvement(mean, var, best_f, xi=0.0):
    std = torch.sqrt(var)
    std_safe = torch.clamp(std, min=1e-9)
    z = (mean - best_f - xi) / std_safe

    normal = Normal(torch.zeros_like(z), torch.ones_like(z))
    cdf = normal.cdf(z)
    pdf = torch.exp(normal.log_prob(z))

    ei = std * (z * cdf + pdf)
    ei_safe = torch.clamp(ei, min=1e-10)
    return torch.log(ei_safe)

def ucb(mean, var, beta=2.0):
    std = torch.sqrt(var)
    return mean + beta * std

In [None]:
def initial_indices_kmeans(df, feature_columns, n_initial, random_state=0):
    """Pick diverse initial points via KMeans (closest point to each centroid)."""
    X = df[feature_columns].values
    if len(df) <= n_initial:
        return df.index

    km = KMeans(n_clusters=n_initial, n_init=1, random_state=random_state)
    labels = km.fit_predict(X)
    centers = km.cluster_centers_

    picked = []
    for k in range(n_initial):
        members = np.where(labels == k)[0]
        if len(members) == 0:
            continue

        d = np.linalg.norm(X[members] - centers[k], axis=1)
        picked.append(df.index[members[np.argmin(d)]])

    if len(picked) < n_initial:
        rest = df.index.difference(picked)
        extra = np.random.default_rng(random_state).choice(rest, size=(n_initial - len(picked)), replace=False)
        picked.extend(list(extra))
    return pd.Index(picked)

In [None]:
candidates = data.copy()
n_candidates = int(len(candidates))

max_iter = 100
patience = 10

init = "kmeans"

acq = "ucb"

xi_mode = "anneal"   
xi0 = 0.05           
xi_min = 0.005      

beta_mode = "anneal"  
beta0 = 3.0
beta_min = 1.0

n_initial = max(3, min(10, n_candidates - 1))  

feature_transformer = MinMaxScaler()
label_transformer = MinMaxScaler()

feature_transformer.fit(candidates[feature_columns].values)
label_transformer.fit(candidates[[label]].values)

if init == "kmeans":
    initial_indices = initial_indices_kmeans(candidates, feature_columns, n_initial, random_state=0)
else:
    initial_indices = candidates.sample(n=n_initial, replace=False, random_state=0).index

print("Initial selections:")
for idx in initial_indices:
    print(f"  Index={idx}, Structure={candidates.loc[idx].structure_name}, {label}={candidates.loc[idx][label]:.2e}")

selected = candidates.loc[initial_indices].copy()
candidates = candidates.drop(initial_indices).copy()

best = [float(selected[label].max())]

for i in range(max_iter):

    if len(best) >= patience and len(np.unique(np.round(best[-patience:], 12))) == 1:
        print(f"Early stopping at iteration {i} (no improvement in last {patience}).")
        break

    train_x = torch.tensor(
        feature_transformer.transform(selected[feature_columns].values),
        dtype=torch.float32
    )
    train_y = torch.tensor(
        label_transformer.transform(selected[[label]].values),
        dtype=torch.float32
    ).flatten()

    test_x = torch.tensor(
        feature_transformer.transform(candidates[feature_columns].values),
        dtype=torch.float32
    )

    model, likelihood, _ = train_gp(train_x, train_y, 250)

    with torch.no_grad():
        pred = model(test_x)
        mean, var = pred.mean, pred.variance

    best_f = train_y.max()

    if acq == "ei":
        if xi_mode == "anneal":
            xi = max(xi_min, xi0 * (0.95 ** i))
        else:
            xi = xi0

        score = log_expected_improvement(mean, var, best_f, xi=xi)
        pick = torch.argmax(score).item()

    elif acq == "ucb":
        if beta_mode == "anneal":
            beta = max(beta_min, beta0 * (0.97 ** i))
        else:
            beta = beta0

        score = ucb(mean, var, beta=beta)
        pick = torch.argmax(score).item()

    else:
        raise ValueError("acq must be 'ei' or 'ucb'")

    current_best = float(selected[label].max())
    picked_row = candidates.iloc[pick]
    picked_true = float(picked_row[label])
    picked_name = picked_row["structure_name"]

    print(
        f"Iter {i:03d} | best={current_best:.3e} | pick={picked_name} | true={picked_true:.3e} | "
        f"acq={acq} " +
        (f"(xi={xi:.3g})" if acq == "ei" else f"(beta={beta:.3g})")
    )

    selected = pd.concat([selected, candidates.iloc[[pick]]])
    candidates = candidates.drop(candidates.index[pick])

    best.append(float(selected[label].max()))

print(f"Best value after {len(best)-1} iterations: {best[-1]:.6g}")