Loop over lambda:
  • Find best NN hyperparams (n_hidden, n_layers, dropout, lr) via GroupKFold CV on ALL isotopes
    (λ is fixed)
  • Then do leave-one-out over isotopes: train on N−1, test on the held-out. 
    Save metrics + plots.
  • Write a summary.csv of per-isotope metrics (and best params) in each folder.

  Run the cells below

In [None]:
pip install pandas numpy scikit-learn torch optuna matplotlib


In [None]:


import random, json, math, pathlib
import numpy as np
import pandas as pd
import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Sampler
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GroupKFold
import optuna
import matplotlib.pyplot as plt

# ----------------------------- CONFIG ---------------------------------
CSV_PATH   = "merged_8_july_d_lowmassremoved.csv"
DEVICE     = "cuda" if torch.cuda.is_available() else "cpu"
EPS        = 1e-12
SEED       = 42
TRIALS     = 1
N_SPLITS   = 5
MAX_EPOCHS = 200
PATIENCE   = 15

FEATURES = [
    'ERG','Radius','n_rms_radius','p_rms_radius','octopole_deformation',
    'n_chem_erg','Sn','Z','S2p_compound','gamma_deformation','ME',
    'BEA_A_daughter','A','Radius_daughter','BEA_A','AM','Radius_compound',
    'Pairing_daughter','BEA_compound','beta_deformation','Theory_thresh',
    'shell_P','pairing_delta','symmetry_S','excitation_energy',
    'Thresh_n3n','Excite_n3n','Thresh_n4n','Excite_n4n'
]
TARGET, SIGMA_EXP, FLUENCE, GAMMA, ISO = (
    "XS","sigma_exp","fluence","gamma","iso_id"
)

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)


# ----------------------- DATASET & SAMPLER ----------------------------
class XSData(Dataset):
    def __init__(self, df, scaler:StandardScaler, fit_scaler=False):
        if fit_scaler:
            scaler.fit(df[FEATURES])
        self.X = torch.tensor(scaler.transform(df[FEATURES]).astype(np.float32))
        self.y = torch.tensor(df[TARGET].to_numpy(np.float32))
        self.fluence   = torch.tensor(df[FLUENCE].to_numpy(np.float32))
        self.sigma_exp = torch.tensor(df[SIGMA_EXP].fillna(0).to_numpy(np.float32))
        self.gamma     = torch.tensor(df[GAMMA].to_numpy(np.float32))
        self.iso_code  = df[ISO].astype("category").cat.codes.to_numpy()

    def __len__(self): return len(self.y)
    def __getitem__(self,i):
        return {
            "x": self.X[i],
            "y": self.y[i],
            "fluence": self.fluence[i],
            "sigma_exp": self.sigma_exp[i],
            "gamma": self.gamma[i],
            "iso_code": self.iso_code[i]
        }

class IsoBatchSampler(Sampler):
    """One whole isotope per batch."""
    def __init__(self, iso_codes, shuffle=True):
        self.ids = {}
        for idx, iso in enumerate(iso_codes):
            self.ids.setdefault(int(iso), []).append(idx)
        self.keys = list(self.ids.keys())
        self.shuffle = shuffle

    def __iter__(self):
        keys = self.keys.copy()
        if self.shuffle: random.shuffle(keys)
        for k in keys:
            yield self.ids[k]

    def __len__(self): return len(self.keys)


# ------------------------------ MODEL ---------------------------------
class MLP(nn.Module):
    def __init__(self, n_in, n_hidden, n_layers, p_drop):
        super().__init__()
        layers = []
        for l in range(n_layers):
            layers += [
                nn.Linear(n_in if l==0 else n_hidden, n_hidden),
                nn.ReLU(), nn.Dropout(p_drop)
            ]
        layers += [nn.Linear(n_hidden,1)]
        self.net = nn.Sequential(*layers)

    def forward(self,x): return self.net(x).squeeze(1)


# -------------------------- CUSTOM LOSS -------------------------------
def iso_loss(batch, y_hat, lam):
    mse = nn.functional.mse_loss(y_hat, batch["y"])
    if batch["gamma"][0] == 0:
        return mse
    flu        = batch["fluence"]
    sigma_pred = (flu * y_hat).sum() / (flu.sum() + EPS)
    sigma_true = batch["sigma_exp"][0]
    rel_err_sq = ((sigma_pred - sigma_true)/(sigma_true+EPS))**2
    return mse + lam * rel_err_sq


# ------------------------ TRAIN / VALID LOOP --------------------------
def run_epoch(model, loader, opt, lam, train=True):
    model.train(mode=train)
    total, n = 0.0, 0
    for batch in loader:
        batch = {k:(v.to(DEVICE) if torch.is_tensor(v) else v)
                 for k,v in batch.items()}
        if train: opt.zero_grad(set_to_none=True)
        preds = model(batch["x"])
        loss  = iso_loss(batch, preds, lam)
        if train:
            loss.backward()
            opt.step()
        total += loss.item() * len(preds)
        n += len(preds)
    return total / n


# --------------------- OPTUNA OBJECTIVE -------------------------------
def make_objective(df, lam):
    def objective(trial):
        hp = dict(
            n_hidden = trial.suggest_int("n_hidden", 32,256,32),
            n_layers = trial.suggest_int("n_layers", 1,4),
            dropout  = trial.suggest_float("dropout",0.0,0.5),
            lr       = trial.suggest_loguniform("lr",1e-4,1e-2)
        )
        gkf = GroupKFold(n_splits=N_SPLITS)
        cv_losses = []
        for tr_idx, va_idx in gkf.split(df, groups=df[ISO]):
            scaler = StandardScaler()
            ds_tr = XSData(df.iloc[tr_idx], scaler, fit_scaler=True)
            ds_va = XSData(df.iloc[va_idx], scaler, fit_scaler=False)
            dl_tr = DataLoader(ds_tr, batch_sampler=IsoBatchSampler(ds_tr.iso_code))
            dl_va = DataLoader(ds_va, batch_sampler=IsoBatchSampler(ds_va.iso_code,shuffle=False))
            model = MLP(len(FEATURES), hp["n_hidden"], hp["n_layers"], hp["dropout"]).to(DEVICE)
            opt   = torch.optim.Adam(model.parameters(), lr=hp["lr"])
            best_va = float("inf"); patience=0
            for ep in range(MAX_EPOCHS):
                run_epoch(model, dl_tr, opt, lam, train=True)
                v = run_epoch(model, dl_va, opt, lam, train=False)
                if v < best_va:
                    best_va = v; patience = 0
                else:
                    patience += 1
                    if patience >= PATIENCE:
                        break
            cv_losses.append(best_va)
        return np.mean(cv_losses)
    return objective


# ------------------------------- MAIN ---------------------------------
if __name__=="__main__":
    df_all = pd.read_csv(CSV_PATH)
    iso_list = df_all[ISO].unique()

    for lam in np.round(np.arange(0,1.01,0.1),1):
        # 1) prepare folder
        out = pathlib.Path(f"results_lam{lam:.1f}")
        out.mkdir(parents=True, exist_ok=True)

        # 2) hyperparam search on ALL isotopes
        study = optuna.create_study(direction="minimize",
                                   study_name=f"lam{lam:.1f}",
                                   storage=f"sqlite:///{out/'study.db'}",
                                   load_if_exists=True)
        study.optimize(make_objective(df_all, lam),
                       n_trials=TRIALS, show_progress_bar=True)
        best = study.best_trial.params
        # save best params
        with open(out/"best_params.json","w") as fh:
            json.dump({"lam":lam,**best}, fh, indent=2)

        # 3) leave-one-out test
        records = []
        for iso in iso_list:
            df_tr = df_all[df_all[ISO]!=iso].reset_index(drop=True)
            df_te = df_all[df_all[ISO]==iso].reset_index(drop=True)

            # train on df_tr
            scaler = StandardScaler()
            ds_tr = XSData(df_tr, scaler, fit_scaler=True)
            dl_tr = DataLoader(ds_tr, batch_sampler=IsoBatchSampler(ds_tr.iso_code))
            model = MLP(len(FEATURES),
                        best["n_hidden"], best["n_layers"], best["dropout"]).to(DEVICE)
            opt   = torch.optim.Adam(model.parameters(), lr=best["lr"])
            for ep in range(MAX_EPOCHS):
                _ = run_epoch(model, dl_tr, opt, lam, train=True)

            # eval on df_te
            ds_te = XSData(df_te, scaler, fit_scaler=False)
            dl_te = DataLoader(ds_te, batch_sampler=IsoBatchSampler(ds_te.iso_code,shuffle=False))
            model.eval()
            preds, flu, sigma_true = [], None, None
            with torch.no_grad():
                for b in dl_te:
                    b = {k:(v.to(DEVICE) if torch.is_tensor(v) else v) 
                         for k,v in b.items()}
                    outp = model(b["x"]).cpu()
                    preds.append(outp)
                    flu = b["fluence"].cpu()
                    sigma_true = float(b["sigma_exp"][0].cpu())
            preds = torch.cat(preds)
            mse = float(nn.functional.mse_loss(preds, ds_te.y).cpu())
            sigma_pred = float((flu*preds).sum() / (flu.sum()+EPS))
            rel_err = (sigma_pred - sigma_true)/(sigma_true+EPS)

            # save per-iso plots
            # 3a) scatter True vs Pred
            plt.figure(figsize=(4,4))
            plt.scatter(ds_te.y, preds, s=10)
            mn, mx = min(ds_te.y.min(), preds.min()), max(ds_te.y.max(), preds.max())
            plt.plot([mn,mx],[mn,mx],"--",lw=1)
            plt.xlabel("True XS"); plt.ylabel("Pred XS")
            plt.title(f"{iso} XS scatter")
            plt.tight_layout()
            plt.savefig(out/f"{iso}_scatter.png")
            plt.close()

            # 3b) ERG vs curves
            plt.figure()
            plt.plot(df_te["ERG"], df_te["XS"],    label="True")
            plt.plot(df_te["ERG"], preds.numpy(),  label="Pred")
            plt.xlabel("ERG"); plt.ylabel("XS")
            plt.title(f"{iso} ERG vs XS")
            plt.legend()
            plt.tight_layout()
            plt.savefig(out/f"{iso}_erg_xs.png")
            plt.close()

            # record metrics
            rec = {
                "iso_id": iso,
                "mse": mse,
                "sigma1g_pred": sigma_pred,
                "sigma1g_true": sigma_true,
                "rel_err": rel_err,
                **best, "lam": lam
            }
            records.append(rec)

        # 4) summary CSV
        pd.DataFrame(records).to_csv(out/"summary.csv", index=False)
        print(f"Done λ={lam:.1f} → folder: {out}")
