In [4]:
# pip install ase pandas openpyxl
import os, re, json
from pathlib import Path
from ase.io import read, write
import pandas as pd

# --------- CONFIG ----------
XYZ_PATH = "/home/phanim/harshitrawat/summer/replay_data/mp_finetuning-just_to_get_file_comb_142_run-84.xyz"
OUT_DIR = None  # None => auto: <same_folder>/split_cifs_meta
CLEAN_SUFFIX = ".cleaned.xyz"  # cleaned copy will be written next to original
# ---------------------------

p = Path(XYZ_PATH)
if OUT_DIR is None:
    OUT_DIR = p.parent / "split_cifs_meta"
OUT_DIR = Path(OUT_DIR)
OUT_DIR.mkdir(parents=True, exist_ok=True)

CLEANED = p.with_suffix(p.suffix + CLEAN_SUFFIX)

# ---- 1) CLEAN: convert Fortran 'd' floats to standard ----
#   - Replace '1.23d-04' -> '1.23e-04'
#   - Replace stray trailing '...0d' -> '...0' (rare but seen)
# We apply only inside numeric contexts: digits followed by D/E markers.
pat_exp = re.compile(r'(?<=\d)[dD](?=[+\-]?\d)')   # 1.23d-04 -> e
pat_trail = re.compile(r'(?<=\d)[dD](?![+\-]?\d)') # 0.00000000d -> (drop)

with p.open("r", encoding="utf-8") as fin, CLEANED.open("w", encoding="utf-8") as fout:
    for line in fin:
        # Convert in header attributes and data rows alike
        line = pat_exp.sub("e", line)
        line = pat_trail.sub("", line)
        fout.write(line)

# ---- 2) READ all structures with ASE ----
atoms_list = read(str(CLEANED), index=":")  # extxyz inferred
print(f"Read {len(atoms_list)} structures from {CLEANED}")

# ---- helpers ----
from collections import Counter
import numpy as np

def to_builtin(obj):
    """Recursively convert numpy types to builtin types for JSON."""
    if obj is None:
        return None
    if isinstance(obj, (str, bytes)):
        return obj.decode() if isinstance(obj, bytes) else obj
    if isinstance(obj, (bool, int, float)):
        return obj
    if isinstance(obj, (np.bool_,)):
        return bool(obj)
    if isinstance(obj, (np.integer,)):
        return int(obj)
    if isinstance(obj, (np.floating,)):
        return float(obj)
    if isinstance(obj, (list, tuple)):
        return [to_builtin(x) for x in obj]
    if isinstance(obj, dict):
        return {str(k): to_builtin(v) for k, v in obj.items()}
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    # last resort: stringify
    return str(obj)

def safe_formula(atoms):
    """Try reduced empirical; fallback to counted formula like Li2O."""
    try:
        f = atoms.get_chemical_formula(mode="reduce", empirical=True)
        if f:
            return f
    except Exception:
        pass
    syms = atoms.get_chemical_symbols()
    c = Counter(syms)
    # order: simple Hill-ish for inorganic (C/H first not relevant) → alphabetical
    parts = []
    for el in sorted(c.keys()):
        n = c[el]
        parts.append(el if n == 1 else f"{el}{n}")
    return "".join(parts) if parts else "UNKNOWN"

# ---- 3) SPLIT to CIFs + MANIFEST ----
rows = []
for i, atoms in enumerate(atoms_list):
    info = atoms.info or {}

    mp_id = info.get("mp_id")
    if mp_id is None or str(mp_id).strip() == "":
        mp_id = safe_formula(atoms) or f"noid_{i}"
    else:
        mp_id = str(mp_id)

    # energy per atom may come as string/np types
    epa = info.get("energy_per_atom", None)
    try:
        epa = float(epa)
    except Exception:
        epa = None

    # filename
    if epa is not None:
        fname = f"{mp_id}__Epa{epa:.3f}eV.cif"
    else:
        fname = f"{mp_id}__struct_{i:03d}.cif"
    fname = fname.replace(" ", "_")
    cif_path = OUT_DIR / fname

    # write CIF
    write(str(cif_path), atoms)

    # build manifest row (flatten + convert types)
    flat = {
        "index": int(i),
        "cif_path": str(cif_path),
        "formula": safe_formula(atoms),
        "natoms": int(len(atoms)),
    }
    for k, v in (info.items()):
        flat[str(k)] = to_builtin(v)

    rows.append(flat)

# ---- 4) Save manifest (JSON + Excel) ----
df = pd.DataFrame(rows)
df_path_xlsx = OUT_DIR / "split_manifest.xlsx"
df_path_json = OUT_DIR / "split_manifest.json"

# Excel: pandas will coerce lists to strings automatically via openpyxl
df.to_excel(df_path_xlsx, index=False)

# JSON: everything converted to builtin types
with open(df_path_json, "w", encoding="utf-8") as f:
    json.dump([to_builtin(r) for r in rows], f, indent=2, ensure_ascii=False)

print(f"\nDone. Wrote {len(rows)} CIFs to: {OUT_DIR}")
print(f"Manifest: {df_path_xlsx}")
print(f"Manifest JSON: {df_path_json}")


Read 142 structures from /home/phanim/harshitrawat/summer/replay_data/mp_finetuning-just_to_get_file_comb_142_run-84.xyz.cleaned.xyz

Done. Wrote 142 CIFs to: /home/phanim/harshitrawat/summer/replay_data/split_cifs_meta
Manifest: /home/phanim/harshitrawat/summer/replay_data/split_cifs_meta/split_manifest.xlsx
Manifest JSON: /home/phanim/harshitrawat/summer/replay_data/split_cifs_meta/split_manifest.json


In [8]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Label all CIFs in IN_DIR with CHGNet (single-point) and export:
  1) Combined EXTXYZ with per-frame metadata (CHGNet labels)
  2) JSON manifest with the same fields

Fixes & robustness:
- Uses at.get_volume() (avoids .volume() callables)
- Keeps 'stress' as numpy (3x3) for EXTXYZ writer; converts to list only for JSON
- Detaches calculator before writing to avoid key collisions
- JSON-serializes numpy scalars/arrays recursively
- Safe fallback formula if ASE can't compute reduced empirical
- Skips failing frames but continues

Customize IN_DIR/OUT paths below.
"""

import json
from pathlib import Path
from typing import Dict, Any, List
from collections import Counter

import numpy as np
from tqdm import tqdm
from ase.io import read, write
from ase import Atoms

# === Your input directory with per-structure CIFs ===
IN_DIR = Path("/home/phanim/harshitrawat/summer/replay_data/split_cifs_meta")

# === Outputs (written next to IN_DIR) ===
OUT_EXTXYZ = IN_DIR.parent / "replay_labeled_by_chgnet.extxyz"
OUT_JSON   = IN_DIR.parent / "replay_labeled_by_chgnet.json"

# === CHGNet import (your env preference) ===
# User preference from memory: import from chgnet.model.dynamics
from chgnet.model.dynamics import CHGNetCalculator

# ---- config ----
DEVICE = "cuda"   # will auto-fallback to cpu if CUDA not available
HEAD_NAME = "chgnet_universal"
CONFIG_WEIGHT = 1.0
PRETRAINED = True
CALC_ID_START = 0
IONIC_STEP = 0
VERBOSE = True
# ----------------


def to_builtin(x):
    """Recursively cast numpy types to Python builtins (JSON-safe)."""
    if x is None or isinstance(x, (bool, int, float, str)):
        return x
    if isinstance(x, bytes):
        return x.decode(errors="ignore")
    if isinstance(x, (np.bool_,)):
        return bool(x)
    if isinstance(x, (np.integer,)):
        return int(x)
    if isinstance(x, (np.floating,)):
        return float(x)
    if isinstance(x, (list, tuple)):
        return [to_builtin(v) for v in x]
    if isinstance(x, dict):
        return {str(k): to_builtin(v) for k, v in x.items()}
    if isinstance(x, np.ndarray):
        return x.tolist()
    return str(x)


def safe_formula(atoms: Atoms) -> str:
    """Try reduced empirical; fallback to alphabetical counted formula."""
    try:
        f = atoms.get_chemical_formula(mode="reduce", empirical=True)
        if f:
            return f
    except Exception:
        pass
    syms = atoms.get_chemical_symbols()
    c = Counter(syms)
    parts = []
    for el in sorted(c.keys()):
        n = c[el]
        parts.append(el if n == 1 else f"{el}{n}")
    return "".join(parts) if parts else "UNKNOWN"


def atoms_to_labels(at: Atoms, calc_id: int):
    """
    Run CHGNet single-point and return:
      info  : dict with frame-level metadata (numpy stress kept as 3x3 array)
      forces: (N,3) numpy
      mag   : (N,) numpy (zeros if absent)
      stress: (3,3) numpy
    """
    # Potential energy (eV)
    energy = float(at.get_potential_energy(apply_constraint=False))
    natoms = len(at)
    epa    = energy / natoms if natoms > 0 else float("nan")

    # Forces (eV/Å)
    forces = at.get_forces(apply_constraint=False)  # (N,3) numpy

    # Stress (eV/Å^3) as (3,3) numpy; guard for zero-volume / non-PBC
    try:
        vol = at.get_volume()
        if vol and vol > 1e-12 and any(at.get_pbc()):
            stress_3x3 = at.get_stress(voigt=False)  # (3,3)
        else:
            stress_3x3 = np.zeros((3, 3), float)
    except Exception:
        stress_3x3 = np.zeros((3, 3), float)

    # Magmoms (per-atom)
    if "initial_magmoms" in at.arrays:
        mag = np.array(at.get_initial_magnetic_moments(), dtype=float)
    elif "magmoms" in at.arrays:
        mag = np.array(at.arrays["magmoms"], dtype=float)
    else:
        mag = np.zeros((natoms,), dtype=float)

    # Frame-level info (keep stress as numpy array for writer)
    info: Dict[str, Any] = {
        "energy": energy,
        "energy_per_atom": epa,
        "ef_per_atom": epa,
        "ef_per_atom_relaxed": epa,
        "e_per_atom_relaxed": epa,
        "bandgap": None,  # CHGNet doesn't output this; kept for schema compat
        "pretrained": PRETRAINED,
        "config_weight": CONFIG_WEIGHT,
        "head": HEAD_NAME,
        "calc_id": int(calc_id),
        "ionic_step": int(IONIC_STEP),
        "stress": np.array(stress_3x3, dtype=float),  # IMPORTANT: numpy for extxyz
        "pbc": " ".join("T" if b else "F" for b in at.get_pbc()),
        "REF_energy": energy,
        "formula": safe_formula(at),
        "natoms": natoms,
    }
    return info, forces, mag, stress_3x3


def main():
    # Calculator
    try:
        calc = CHGNetCalculator(device=DEVICE)
    except Exception:
        calc = CHGNetCalculator(device="cpu")
        if VERBOSE:
            print("CHGNet will run on cpu")

    # Collect CIFs
    cif_paths: List[Path] = sorted([p for p in IN_DIR.glob("*.cif") if p.is_file()])
    print(f"Found {len(cif_paths)} CIFs in {IN_DIR}")

    frames: List[Atoms] = []
    manifest: List[Dict[str, Any]] = []
    calc_id = CALC_ID_START

    for path in tqdm(cif_paths, desc="Labeling with CHGNet"):
        try:
            at: Atoms = read(path)

            # Ensure periodic for solids if the cell has volume
            try:
                vol = at.get_volume()
                if vol and vol > 1e-12:
                    at.set_pbc([True, True, True])
            except Exception:
                pass

            # Run single-point
            at.calc = calc
            info, forces, mag, stress = atoms_to_labels(at, calc_id=calc_id)

            # Embed arrays/info for EXTXYZ
            at.info.update(info)                                   # stress kept as numpy (3,3)
            at.arrays["forces"] = np.array(forces, dtype=float)
            at.arrays["magmoms"] = np.array(mag, dtype=float)

            # Detach calculator to avoid writer key collisions
            at.calc = None
            frames.append(at)

            # Manifest row (JSON builtins; convert stress to list here)
            row = {k: v for k, v in info.items() if k != "stress"}
            row["stress"] = np.asarray(info["stress"]).tolist()
            row["filename"] = path.name
            row["elements"] = "-".join(sorted(set(at.get_chemical_symbols())))
            manifest.append(to_builtin(row))

            calc_id += 1

        except Exception as e:
            print(f"[WARN] Failed on {path.name}: {e}")

    # Write outputs
    if frames:
        write(str(OUT_EXTXYZ), frames, format="extxyz")
    with open(OUT_JSON, "w", encoding="utf-8") as f:
        json.dump([to_builtin(r) for r in manifest], f, indent=2, ensure_ascii=False)

    print("\n=== DONE ===")
    print(f"EXTXYZ : {OUT_EXTXYZ}")
    print(f"JSON   : {OUT_JSON}")
    print(f"Frames : {len(frames)} / {len(cif_paths)}")


if __name__ == "__main__":
    main()


CHGNet v0.3.0 initialized with 412,525 parameters
CHGNet will run on cpu
Found 142 CIFs in /home/phanim/harshitrawat/summer/replay_data/split_cifs_meta


Labeling with CHGNet:   5%|▍         | 7/142 [00:00<00:17,  7.50it/s]

Labeling with CHGNet: 100%|██████████| 142/142 [00:22<00:00,  6.18it/s]


=== DONE ===
EXTXYZ : /home/phanim/harshitrawat/summer/replay_data/replay_labeled_by_chgnet.extxyz
JSON   : /home/phanim/harshitrawat/summer/replay_data/replay_labeled_by_chgnet.json
Frames : 142 / 142





In [9]:
from ase.io import read, write

# Input files
replay_path = "/home/phanim/harshitrawat/summer/replay_data/replay_labeled_by_chgnet.extxyz"
t3_path     = "/home/phanim/harshitrawat/summer/T1_T2_T3_data/T3_chgnet_labeled.extxyz"

# Output file
out_path    = "/home/phanim/harshitrawat/summer/T1_T2_T3_data/T3withreplay_chgnet_labeled.extxyz"

# Read all frames from both extxyz files
replay_frames = read(replay_path, index=":")
t3_frames     = read(t3_path, index=":")

print(f"Replay frames: {len(replay_frames)}")
print(f"T3 frames    : {len(t3_frames)}")

# Concatenate
all_frames = t3_frames + replay_frames

# Write combined file
write(out_path, all_frames, format="extxyz")

print(f"Done. Wrote {len(all_frames)} frames to {out_path}")


Replay frames: 142
T3 frames    : 1612
Done. Wrote 1754 frames to /home/phanim/harshitrawat/summer/T1_T2_T3_data/T3withreplay_chgnet_labeled.extxyz


In [10]:
from ase.io import read, write

# Input files
replay_path = "/home/phanim/harshitrawat/summer/replay_data/replay_labeled_by_chgnet.extxyz"
t3_path     = "/home/phanim/harshitrawat/summer/T1_T2_T3_data/T2_chgnet_labeled.extxyz"

# Output file
out_path    = "/home/phanim/harshitrawat/summer/T1_T2_T3_data/T2withreplay_chgnet_labeled.extxyz"

# Read all frames from both extxyz files
replay_frames = read(replay_path, index=":")
t3_frames     = read(t3_path, index=":")

print(f"Replay frames: {len(replay_frames)}")
print(f"T3 frames    : {len(t3_frames)}")

# Concatenate
all_frames = t3_frames + replay_frames

# Write combined file
write(out_path, all_frames, format="extxyz")

print(f"Done. Wrote {len(all_frames)} frames to {out_path}")


Replay frames: 142
T3 frames    : 705
Done. Wrote 847 frames to /home/phanim/harshitrawat/summer/T1_T2_T3_data/T2withreplay_chgnet_labeled.extxyz
