In [None]:
!pip install -U optuna
#!pip install -U pfp-api-client
#!pip install pfcc_extras
!pip install -U pfp-api-client matlantis-features
!pip install pfcc-extras-v0.11.1.zip
!pip install pfcc-ase-extras-v0.3.0.zip
#In addition, please install `pfcc_extras`.

In [None]:
import os
import numpy as np
import pandas as pd
from ase import Atoms
from ase.io import read, write
from ase.md.npt import NPT
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution, Stationary
from ase.md import MDLogger
from ase.units import fs, GPa
from ase.geometry import set_distance, set_angle, set_dihedral
from ase.constraints import FixAtoms
from time import perf_counter
import optuna


In [None]:
# -----------------------------
# 计算器：这里你换成 VASP/CP2K/Matlantis
# -----------------------------
from pfp_api_client.pfp.estimator import Estimator, EstimatorCalcMode
from pfp_api_client.pfp.calculators.ase_calculator import ASECalculator

# estimator = Estimator(calc_mode=EstimatorCalcMode.CRYSTAL, model_version="latest")
estimator = Estimator(calc_mode=EstimatorCalcMode.CRYSTAL_U0, model_version="v3.0.0")
calculator = ASECalculator(estimator)

In [None]:

for fname in input_files:
    # 根据输入文件名创建专属输出目录
    base = os.path.splitext(os.path.basename(fname))[0]
    structure_dir = os.path.join("output", base)
    os.makedirs(structure_dir, exist_ok=True)

    # 输出文件路径
    output_filename = f"{base}_NVT_300K"
    log_filename    = os.path.join(structure_dir, output_filename + ".log")
    traj_filename   = os.path.join(structure_dir, output_filename + ".traj")
    com_output_file = os.path.join(structure_dir, "com.data")

    # 读入结构并初始化
    atoms = read(fname)
    atoms.pbc = True
    atoms.calc = calculator  # 请确保之前定义了 calculator 对象
    print(f"\n=== Running MD for {fname}, outputs in {structure_dir} ===")
    print("atoms =", atoms)

    # 随机速度 & 零总动量
    MaxwellBoltzmannDistribution(atoms, temperature_K=temperature, force_temp=True)
    Stationary(atoms)

    # 打印函数
    def print_dyn(dyn, atoms):
        imd        = dyn.get_number_of_steps()
        etot       = atoms.get_total_energy()
        temp_K     = atoms.get_temperature()
        stress     = atoms.get_stress(include_ideal_gas=True) / units.GPa
        stress_ave = stress[:3].mean()
        elapsed    = perf_counter() - start_time
        print(f"{imd:6d}  {etot:8.3f}  {temp_K:7.2f}  {stress_ave:6.2f}  "
              f"{stress[0]:6.2f} {stress[1]:6.2f} {stress[2]:6.2f}   {elapsed:7.3f}")

    # 写 POSCAR
    def write_poscar(dyn, atoms):
        step = dyn.get_number_of_steps()
        filename = os.path.join(structure_dir, f"POSCAR_{step:06d}.vasp")
        write(filename, atoms, format="vasp")

    # 记录吸附物质心
    def get_adsorbate_com(atoms, z_thr):
        pos  = atoms.get_positions()
        mask = pos[:, 2] > z_thr
        if not mask.any():
            raise ValueError(f"No atoms with z > {z_thr}")
        return atoms[mask].get_center_of_mass()

    def print_com(atoms=atoms):
        com_ads = get_adsorbate_com(atoms, z_threshold)
        print(f"Adsorbate COM: {com_ads}")
        with open(com_output_file, "a") as f:
            f.write(f"{com_ads[0]:.6f} {com_ads[1]:.6f} {com_ads[2]:.6f}\n")

    # --- Warm-up: NVT Berendsen ---
    dyn1 = NVTBerendsen(atoms,
                        timestep   = time_step * units.fs,
                        temperature_K = temperature,
                        taut       = tau_berendsen * units.fs,
                        trajectory = traj_filename,
                        loginterval= num_interval)
    dyn1.attach(lambda: print_dyn(dyn1, atoms), interval=num_interval)
    dyn1.attach(MDLogger(dyn1, atoms, log_filename, header=True, stress=True,
                         peratom=True, mode="w"), interval=num_interval)
    dyn1.attach(print_com, interval=500)
    dyn1.attach(lambda: write_poscar(dyn1, atoms), interval=save_interval)

    print("\n--- Warmup phase (Berendsen) ---")
    start_time = perf_counter()
    print(" step   Etot(eV)    T(K)   stress(mean,xx,yy)(GPa)  elapsed(s)")
    dyn1.run(warmup_steps)

    # --- Production: NPT Nose–Hoover ---
    dyn2 = NPT(atoms,
               timestep       = time_step * units.fs,
               temperature_K  = temperature,
               externalstress = external_stress * units.GPa,
               ttime          = ttime * units.fs,
               pfactor        = None,
               trajectory     = traj_filename,
               loginterval    = num_interval)
    dyn2.attach(lambda: print_dyn(dyn2, atoms), interval=num_interval)
    dyn2.attach(MDLogger(dyn2, atoms, log_filename, header=False, stress=True,
                         peratom=True, mode="a"), interval=num_interval)
    dyn2.attach(print_com, interval=500)
    dyn2.attach(lambda: write_poscar(dyn2, atoms), interval=save_interval)

    print("\n--- Production phase (Nose–Hoover) ---")
    start_time = perf_counter()
    print(" step   Etot(eV)    T(K)   stress(mean,xx,yy)(GPa)  elapsed(s)")
    dyn2.run(prod_steps)

    # 最终结构保存为 CIF
    write(os.path.join(structure_dir, "final.cif"), atoms)
    print(f"Finished {fname}, results in {structure_dir}")


In [None]:
# -----------------------------
# 参数
# -----------------------------
temperature = 298.15  # K
time_step = 1.0       # fs
prod_steps = 300000   # ~300 ps
save_interval = 1000  # 每1000步保存一次
surface_file = "surface.cif"
mol_file = "2-ketone.cif"

# -----------------------------
# 阶段1: MD采样
# -----------------------------
def run_md_sampling():
    slab = read(surface_file)
    mol = read(mol_file)
    slab.calc = calculator
    mol.calc = calculator

    # 吸附：直接放在中心，初始高度2 Å
    from ase.build import add_adsorbate
    xy_pos = np.mean(slab.get_positions()[:, :2], axis=0)
    add_adsorbate(slab, mol, height=2.0, position=xy_pos)
    atoms = slab
    atoms.pbc = True
    atoms.calc = calculator

    # 初速度
    MaxwellBoltzmannDistribution(atoms, temperature_K=temperature, force_temp=True)
    Stationary(atoms)

    # NPT 动力学
    dyn = NPT(atoms, timestep=time_step*fs,
              temperature_K=temperature,
              externalstress=0.1e-6*GPa,
              ttime=100*fs,
              pfactor=None,
              trajectory="md.traj",
              logfile="md.log",
              loginterval=100)

    os.makedirs("structs", exist_ok=True)

    def save_structure():
        step = dyn.get_number_of_steps()
        if step % save_interval == 0:
            fname = f"structs/POSCAR_{step:06d}.vasp"
            write(fname, atoms, format="vasp")

    dyn.attach(save_structure, interval=save_interval)
    print("🚀 Starting MD sampling ...")
    dyn.run(prod_steps)
    print("✅ MD sampling finished.")

# -----------------------------
# 阶段2: 第一性原理能量计算 (示例用NNP代替)
# -----------------------------
def dft_screening():
    results = []
    for fname in sorted(os.listdir("structs")):
        if fname.endswith(".vasp"):
            atoms = read(os.path.join("structs", fname))
            atoms.calc = calculator
            energy = atoms.get_potential_energy()
            results.append({"struct": fname, "energy": energy})
    df = pd.DataFrame(results)
    df.to_csv("energies.csv", index=False)
    best = df.sort_values("energy").iloc[0]
    print(f"🌟 Lowest energy structure: {best['struct']} (E={best['energy']:.3f} eV)")
    return best["struct"]

# -----------------------------
# 阶段3: Optuna 精调碳链
# -----------------------------
def run_optuna(best_struct):
    atoms = read(os.path.join("structs", best_struct))
    atoms.calc = calculator
    E_ref = atoms.get_potential_energy()

    # 假设碳链是 index 10-13，可以根据实际 cif 检查
    chain_indices = [10, 11, 12, 13]

    def atoms_to_json(atoms):
        import io
        from ase.io import write
        f = io.StringIO()
        write(f, atoms, format="json")
        return f.getvalue()

    def json_to_atoms(s):
        import io
        from ase.io import read
        return read(io.StringIO(s), format="json")

    def objective(trial):
        mol = json_to_atoms(trial.study.user_attrs["slab"])
        c1, c2, c3, c4 = trial.study.user_attrs["chain_indices"]

        new_len   = trial.suggest_float("C2-C3_length", 1.45, 1.65)
        new_angle = trial.suggest_float("C1-C2-C3_angle", 105, 125)
        new_dihed = trial.suggest_float("C1-C2-C3-C4_torsion", -30, 30)

        set_distance(mol, c2, c3, new_len, fix=0)
        set_angle(mol, c1, c2, c3, new_angle, mask=[1,1,1])
        set_dihedral(mol, c1, c2, c3, c4, new_dihed, mask=[1,1,1,1])

        mol.calc = calculator
        E_tot = mol.get_potential_energy()
        return E_tot - trial.study.user_attrs["E_ref"]

    study = optuna.create_study(direction="minimize")
    study.set_user_attr("slab", atoms_to_json(atoms))
    study.set_user_attr("E_ref", E_ref)
    study.set_user_attr("chain_indices", chain_indices)

    print("🔍 Starting Optuna refinement ...")
    study.optimize(objective, n_trials=200)

    print(f"✅ Best trial: #{study.best_trial.number}")
    print(f"   ΔE = {study.best_value:.3f} eV")
    print(f"   Params = {study.best_trial.params}")

    best_atoms = json_to_atoms(study.best_trial.user_attrs["slab"])
    write("best_refined.cif", best_atoms)
    return best_atoms

# -----------------------------
# 主程序
# -----------------------------
if __name__ == "__main__":
    run_md_sampling()
    best_struct = dft_screening()
    run_optuna(best_struct)
