In [1]:
import os
import numpy as np
from numpy.linalg import lstsq

AMINO_ACID_ATOM_COUNTS = {
    "ALA": 12, "ARG": 26, "ASN": 16, "ASP": 14, "CYS": 13, "GLN": 19, "GLU": 17,
    "GLY": 9, "HID": 19, "ILE": 21, "LEU": 21, "LYS": 24, "MET": 19, "PHE": 22,
    "PRO": 16, "SER": 13, "THR": 16, "TRP": 26, "TYR": 23, "VAL": 18, "ACE": 7, "NME": 7,
}

def _read_feature_from_ef(ef_file: str, n_atoms: int, aggregate: bool = True):
    """
    Read features from a single .ef file.
    If aggregate=True, return 5 aggregated features (summed over n_atoms):
      [sum(E_paral^2), sum(E_verti^2), sum(E_paral*E_verti), sum(|E_paral|), sum(|E_verti|)]
    If aggregate=False, return 5*n_atoms features concatenated per atom (same as the original implementation).
    """
    ef_data = np.loadtxt(ef_file, skiprows=1)
    ef_data = np.atleast_2d(ef_data)

    if ef_data.shape[0] < n_atoms:
        raise ValueError(f"Not enough lines in EF file: expected {n_atoms}, got {ef_data.shape[0]} -> {ef_file}")

    # Column indices: 11 -> |E_parallel|, 12 -> |E_vertical|
    E_paral = ef_data[:n_atoms, 11]
    E_verti = ef_data[:n_atoms, 12]

    paral_sq = E_paral**2
    verti_sq = E_verti**2
    prod = E_paral * E_verti
    paral_abs = np.abs(E_paral)
    verti_abs = np.abs(E_verti)

    if aggregate:
        # Aggregate into 5 molecule-level features (sum by default; switch to mean if preferred)
        return np.array([
            paral_sq.sum(),
            verti_sq.sum(),
            prod.sum(),
            paral_abs.sum(),
            verti_abs.sum(),
        ])
    else:
        # Original per-atom concatenation
        return np.concatenate([paral_sq, verti_sq, prod, paral_abs, verti_abs])

def fit_and_predict_U(work_dir: str = "example", residue_name: str = "ALA", n_train: int = 1000):
    """
    Fit five polarizability parameters using 5 aggregated features and write a combined output file:
    name, real_U, fit_U, 5 features, 5 alphas
    """
    if residue_name not in AMINO_ACID_ATOM_COUNTS:
        raise KeyError(f"Residue {residue_name} not found in AMINO_ACID_ATOM_COUNTS")
    n_atoms = AMINO_ACID_ATOM_COUNTS[residue_name]

    ef_dir = os.path.join(work_dir, residue_name, "ef")
    raw_realU_file = os.path.join(work_dir, residue_name, "raw_realU")
    output_alpha_file = os.path.join(work_dir, residue_name, "fit_alpha")
    output_combined_file = os.path.join(work_dir, residue_name, "raw_realU_fitU")

    # Read name and reference U
    data = []
    with open(raw_realU_file, "r") as f:
        for line in f:
            if not line.strip():
                continue
            parts = line.split()
            if len(parts) < 2:
                continue
            data.append((parts[0].strip(), float(parts[1])))

    if not data:
        raise RuntimeError("Failed to read valid entries from raw_realU")

    print(f"Read {len(data)} samples for {residue_name}; first {n_train} used for fitting")

    # ---------------- Fitting stage (5D features) ----------------
    train_data = data[:n_train]
    A_all, U_all = [], []

    for name, real_U in train_data:
        ef_file = os.path.join(ef_dir, name + ".ef")
        if not os.path.isfile(ef_file):
            print(f"⚠️ Missing EF file, skipping: {ef_file}")
            continue
        try:
            feats5 = _read_feature_from_ef(ef_file, n_atoms, aggregate=True)  # 5D
        except Exception as e:
            print(f"⚠️ Failed to read EF file, skipping: {ef_file}, error: {e}")
            continue

        A_all.append(feats5)
        U_all.append(real_U)

    A_all = np.array(A_all)
    U_all = np.array(U_all)

    if A_all.shape[0] == 0:
        raise RuntimeError("No usable training samples; cannot perform fitting")

    print("Starting least-squares fitting...")
    alpha, residuals, rank, s = lstsq(A_all, U_all, rcond=None)  # alpha is a length-5 vector

    # Save alpha
    np.savetxt(
        output_alpha_file, alpha.reshape(1, -1),
        header="alpha_paral_sq alpha_verti_sq alpha_prod alpha_paral_abs alpha_verti_abs"
    )
    print(f"✅ Polarizability parameters saved to: {output_alpha_file}")

    # ---------------- Prediction and merged output ----------------
    # Write header
    header_cols = [
        "name", "real_U", "fit_U",
        "feat_paral_sq", "feat_verti_sq", "feat_prod", "feat_paral_abs", "feat_verti_abs",
        "alpha_paral_sq", "alpha_verti_sq", "alpha_prod", "alpha_paral_abs", "alpha_verti_abs"
    ]
    with open(output_combined_file, "w") as f:
        f.write("# " + " ".join(header_cols) + "\n")

        for (name, real_U) in data:
            ef_file = os.path.join(ef_dir, name + ".ef")
            if not os.path.isfile(ef_file):
                print(f"⚠️ Missing EF file, skipping: {ef_file}")
                continue
            try:
                feats5 = _read_feature_from_ef(ef_file, n_atoms, aggregate=True)  # 5D
            except Exception as e:
                print(f"⚠️ Failed to read EF file, skipping: {ef_file}, error: {e}")
                continue

            U_fit = float(feats5 @ alpha)

            # Per-line output: name, real_U, fit_U, 5 features, 5 alpha parameters
            f.write(
                f"{name} {real_U:.6f} {U_fit:.6f} "
                + " ".join([f"{v:.6f}" for v in feats5.tolist()])
                + " "
                + " ".join([f"{a:.6f}" for a in alpha.tolist()])
                + "\n"
            )

    print(f"✅ All results written to: {output_combined_file}")
    return alpha


In [None]:
work_dir = "/mnt/xyz_folder"
for amino in ["ALA"]:
    fit_and_predict_U(work_dir, residue_name=amino, n_train=1000)