In [1]:
import numpy as np
from sys import stdout
import h5py
import torch

In [2]:
aqm = h5py.File("/mnt/petrelfs/linchen/FoundationalModel/data/aqm/AQM-sol.hdf5", "r")
AQMmol_ids = list(aqm.keys()) 

In [4]:
import ase
from ase import atoms





In [13]:
from typing import Dict, List, Optional, Tuple
import dataclasses
from mace import data, modules
import logging
from dataclasses import dataclass

from mace.data.utils import config_from_atoms_list

@dataclasses.dataclass
class SubsetCollection:
    train: data.Configurations
    valid: data.Configurations
    tests: List[Tuple[str, data.Configurations]]

def get_dataset_from_h5(
    train_path: str,
    valid_path: str,
    valid_fraction: float,
    config_type_weights: Dict,
    test_path: str = None,
    seed: int = 1234,
    keep_isolated_atoms: bool = False,
    h5_positions_key: str = 'atXYZ',
    h5_numbers_key: str = "atNUM",
    h5_energy_key: str = 'ePBE0+MBD',
    h5_forces_key: str = 'totFOR', 
    ) -> Tuple[SubsetCollection, Optional[Dict[int, float]]]:
    """Load training and test dataset from xyz file"""
    atomic_energies_dict, all_train_configs, heads = load_from_h5(
        file_path=train_path,
        config_type_weights=config_type_weights,
        h5_positions_key=h5_positions_key,
        h5_numbers_key=h5_numbers_key,
        h5_energy_key=h5_energy_key,
        h5_forces_key=h5_forces_key,
    )
    logging.info(
        f"Loaded {len(all_train_configs)} training configurations from '{train_path}'"
    )
    if valid_path is not None:
        _, valid_configs, _ = load_from_h5(
            file_path=valid_path,
            config_type_weights=config_type_weights,
            h5_positions_key=h5_positions_key,
            h5_numbers_key=h5_numbers_key,
            h5_energy_key=h5_energy_key,
            h5_forces_key=h5_forces_key,
        )
        logging.info(
            f"Loaded {len(valid_configs)} validation configurations from '{valid_path}'"
        )
        train_configs = all_train_configs
    else:
        logging.info(
            "Using random %s%% of training set for validation", 100 * valid_fraction
        )
        train_configs, valid_configs = [], []
        for head in heads:
            all_train_configs_head = [
                config for config in all_train_configs if config.head == head
            ]
            train_configs_head, valid_configs_head = data.random_train_valid_split(
                all_train_configs_head, valid_fraction, seed
            )
            train_configs.extend(train_configs_head)
            valid_configs.extend(valid_configs_head)

    test_configs = []
    if test_path is not None:
        _, all_test_configs, _ = load_from_h5(
            file_path=test_path,
            config_type_weights=config_type_weights,
            h5_positions_key=h5_positions_key,
            h5_numbers_key=h5_numbers_key,
            h5_energy_key=h5_energy_key,
            h5_forces_key=h5_forces_key,
        )
        # create list of tuples (config_type, list(Atoms))
        test_configs = data.test_config_types(all_test_configs)
        logging.info(
            f"Loaded {len(all_test_configs)} test configurations from '{test_path}'"
        )
    return (
        SubsetCollection(train=train_configs, valid=valid_configs, tests=test_configs),
        atomic_energies_dict,
        heads,
    )

from mace.tools import AtomicNumberTable

Vector = np.ndarray  # [3,]
Positions = np.ndarray  # [..., 3]
Forces = np.ndarray  # [..., 3]
Stress = np.ndarray  # [6, ], [3,3], [9, ]
Virials = np.ndarray  # [6, ], [3,3], [9, ]
Charges = np.ndarray  # [..., 1]
Cell = np.ndarray  # [3,3]
Pbc = tuple  # (3,)

DEFAULT_CONFIG_TYPE = "Default"
DEFAULT_CONFIG_TYPE_WEIGHTS = {DEFAULT_CONFIG_TYPE: 1.0}

@dataclass
class Configuration:
    atomic_numbers: np.ndarray
    positions: Positions  # Angstrom
    energy: Optional[float] = None  # eV
    forces: Optional[Forces] = None  # eV/Angstrom
    stress: Optional[Stress] = None  # eV/Angstrom^3
    virials: Optional[Virials] = None  # eV
    dipole: Optional[Vector] = None  # Debye
    charges: Optional[Charges] = None  # atomic unit
    cell: Optional[Cell] = None
    pbc: Optional[Pbc] = None

    weight: float = 1.0  # weight of config in loss
    energy_weight: float = 1.0  # weight of config energy in loss
    forces_weight: float = 1.0  # weight of config forces in loss
    stress_weight: float = 1.0  # weight of config stress in loss
    virials_weight: float = 1.0  # weight of config virial in loss
    config_type: Optional[str] = DEFAULT_CONFIG_TYPE  # config_type of config
    head: Optional[str] = "Default"  # head used to compute the config


Configurations = List[Configuration]

def load_from_h5(
    file_path: str,
    config_type_weights: Dict,
    h5_positions_key: str = 'atXYZ',
    h5_numbers_key: str = "atNUM",
    h5_energy_key: str = 'ePBE0+MBD',
    h5_forces_key: str = 'totFOR',
) -> Tuple[Dict[int, float], Configurations]:
    atoms_list = atoms_from_hdf5(
        file_path, 
        positions_key=h5_positions_key,
        numbers_key=h5_numbers_key,
        energy_key=h5_energy_key,
        forces_key=h5_forces_key
    )


    atomic_energies_dict = {}

    heads = set()
    for atoms in atoms_list:
        heads.add(atoms.info.get("head", "Default"))
    heads = list(heads)

    configs = config_from_atoms_list(
        atoms_list,
        config_type_weights=config_type_weights,
        energy_key="energy",
        forces_key="forces",
        stress_key="stress",
        virials_key="virials",
        dipole_key="dipole",
        charges_key="charges",
        head_key="head",
    )
    return atomic_energies_dict, configs, heads

def atoms_from_hdf5(file_path, positions_key='atXYZ', numbers_key="atNUM", energy_key='ePBE0+MBD', forces_key='totFOR'):
    aqm = h5py.File(file_path, "r")
    atoms_list = []
    AQMmol_ids = list(aqm.keys())

    for molid in AQMmol_ids:
        AQMconf_ids = list(aqm[molid].keys())
        for confid in AQMconf_ids:
            curr_cfg = aqm[molid][confid]
            atoms = ase.Atoms(positions=np.array(curr_cfg[positions_key]), numbers=np.array(curr_cfg[numbers_key]))
            atoms.info['energy'] = np.array(curr_cfg[energy_key]).item()
            atoms.arrays['forces'] = np.array(curr_cfg[forces_key])
            # atoms.info['head'] = "Default"
            atoms_list.append(atoms)

    return atoms_list

In [14]:
collections, atomic_energies_dict, heads = get_dataset_from_h5(train_path="/mnt/petrelfs/linchen/FoundationalModel/data/aqm/AQM-gas.hdf5", valid_path=None, valid_fraction=0.05, config_type_weights = {"Default": 1.0})

In [15]:
collections.train[0]

Configuration(atomic_numbers=array([6, 1, 1, 1, 8, 6, 6, 6, 6, 6, 6, 1, 1, 6, 7, 7, 6, 6, 6, 6, 1, 1,
       1, 6, 1, 1, 1, 7, 6, 1, 6, 7, 6, 7, 6, 6, 6, 7, 1, 6, 6, 1, 1, 1,
       6, 1, 1, 1, 8, 1, 6, 9, 9, 9, 6, 1, 6, 1, 1, 1, 1, 1, 1, 1, 1]), positions=array([[ 1.0700636e+01,  2.3041720e+00, -9.1733000e-01],
       [ 1.1663771e+01,  2.7014630e+00, -5.8402300e-01],
       [ 1.0879353e+01,  1.4294920e+00, -1.5624940e+00],
       [ 1.0180701e+01,  3.0755490e+00, -1.5067480e+00],
       [ 9.9625250e+00,  1.9519610e+00,  2.3677800e-01],
       [ 8.7116390e+00,  1.4286700e+00,  8.3497000e-02],
       [ 8.1082210e+00,  1.2104870e+00, -1.1688280e+00],
       [ 6.8242850e+00,  6.6826300e-01, -1.2257470e+00],
       [ 6.1209910e+00,  3.1718200e-01, -6.1006000e-02],
       [ 6.7337320e+00,  5.6064300e-01,  1.1834540e+00],
       [ 8.0109360e+00,  1.1069670e+00,  1.2607700e+00],
       [ 8.4709450e+00,  1.2967830e+00,  2.2261000e+00],
       [ 6.1936280e+00,  3.4659500e-01,  2.1026690e+00],
  

In [19]:
import h5py
from tqdm import tqdm
import numpy as np

# ds_path = '/mnt/petrelfs/linchen/FoundationalModel/data/ani-2x/final_h5/ANI-2x-wB97X-def2TZVPP.h5'

# su = 0
# with h5py.File(ds_path) as h5:
#     for num_atoms, properties in tqdm(h5.items()):        #Iterate thorugh like a dictionary
#         coordinates = properties['coordinates']     #Output of properties is of type h5py Dataset
#         species = properties['species']
#         print(coordinates)
#         su += np.array(coordinates).shape[0]
# print(su)

ds_path = '/mnt/petrelfs/linchen/FoundationalModel/data/ani-2x/final_h5/ANI-2x-wB97X-631Gd.h5'
with h5py.File(ds_path) as h5:
    for num_atoms, properties in tqdm(h5.items()):        #Iterate thorugh like a dictionary
        coordinates = np.array(properties['coordinates'])   #Output of properties is of type h5py Dataset
        species = np.array(properties['species'])
        energies = np.array(properties['energies'])
        forces = np.array(properties['forces'])
        # print(coordinates)
        # print(species)
        # print(energies)
        # print(forces)
        break

for c, s, e, f in zip(coordinates, species, energies, forces):
    print(c.shape)
    print(s.shape)
    print(e)
    print(f.shape)
    break



  0%|          | 0/59 [00:00<?, ?it/s]

(2, 3)
(2,)
-113.278573616
(2, 3)





In [29]:
from typing import Dict, List, Optional, Tuple
import dataclasses
from mace import data, modules
import logging
from dataclasses import dataclass

from mace.data.utils import config_from_atoms_list

@dataclasses.dataclass
class SubsetCollection:
    train: data.Configurations
    valid: data.Configurations
    tests: List[Tuple[str, data.Configurations]]

def get_dataset_from_h5(
    train_path: str,
    valid_path: str,
    valid_fraction: float,
    config_type_weights: Dict,
    test_path: str = None,
    seed: int = 1234,
    keep_isolated_atoms: bool = False,
    h5_positions_key: str = 'atXYZ',
    h5_numbers_key: str = "atNUM",
    h5_energy_key: str = 'ePBE0+MBD',
    h5_forces_key: str = 'totFOR', 
    ) -> Tuple[SubsetCollection, Optional[Dict[int, float]]]:
    """Load training and test dataset from xyz file"""
    atomic_energies_dict, all_train_configs, heads = load_from_h5(
        file_path=train_path,
        config_type_weights=config_type_weights,
        h5_positions_key=h5_positions_key,
        h5_numbers_key=h5_numbers_key,
        h5_energy_key=h5_energy_key,
        h5_forces_key=h5_forces_key,
    )
    logging.info(
        f"Loaded {len(all_train_configs)} training configurations from '{train_path}'"
    )
    if valid_path is not None:
        _, valid_configs, _ = load_from_h5(
            file_path=valid_path,
            config_type_weights=config_type_weights,
            h5_positions_key=h5_positions_key,
            h5_numbers_key=h5_numbers_key,
            h5_energy_key=h5_energy_key,
            h5_forces_key=h5_forces_key,
        )
        logging.info(
            f"Loaded {len(valid_configs)} validation configurations from '{valid_path}'"
        )
        train_configs = all_train_configs
    else:
        logging.info(
            "Using random %s%% of training set for validation", 100 * valid_fraction
        )
        train_configs, valid_configs = [], []
        for head in heads:
            all_train_configs_head = [
                config for config in all_train_configs if config.head == head
            ]
            train_configs_head, valid_configs_head = data.random_train_valid_split(
                all_train_configs_head, valid_fraction, seed
            )
            train_configs.extend(train_configs_head)
            valid_configs.extend(valid_configs_head)

    test_configs = []
    if test_path is not None:
        _, all_test_configs, _ = load_from_h5(
            file_path=test_path,
            config_type_weights=config_type_weights,
            h5_positions_key=h5_positions_key,
            h5_numbers_key=h5_numbers_key,
            h5_energy_key=h5_energy_key,
            h5_forces_key=h5_forces_key,
        )
        # create list of tuples (config_type, list(Atoms))
        test_configs = data.test_config_types(all_test_configs)
        logging.info(
            f"Loaded {len(all_test_configs)} test configurations from '{test_path}'"
        )
    return (
        SubsetCollection(train=train_configs, valid=valid_configs, tests=test_configs),
        atomic_energies_dict,
        heads,
    )

from mace.tools import AtomicNumberTable

Vector = np.ndarray  # [3,]
Positions = np.ndarray  # [..., 3]
Forces = np.ndarray  # [..., 3]
Stress = np.ndarray  # [6, ], [3,3], [9, ]
Virials = np.ndarray  # [6, ], [3,3], [9, ]
Charges = np.ndarray  # [..., 1]
Cell = np.ndarray  # [3,3]
Pbc = tuple  # (3,)

DEFAULT_CONFIG_TYPE = "Default"
DEFAULT_CONFIG_TYPE_WEIGHTS = {DEFAULT_CONFIG_TYPE: 1.0}

@dataclass
class Configuration:
    atomic_numbers: np.ndarray
    positions: Positions  # Angstrom
    energy: Optional[float] = None  # eV
    forces: Optional[Forces] = None  # eV/Angstrom
    stress: Optional[Stress] = None  # eV/Angstrom^3
    virials: Optional[Virials] = None  # eV
    dipole: Optional[Vector] = None  # Debye
    charges: Optional[Charges] = None  # atomic unit
    cell: Optional[Cell] = None
    pbc: Optional[Pbc] = None

    weight: float = 1.0  # weight of config in loss
    energy_weight: float = 1.0  # weight of config energy in loss
    forces_weight: float = 1.0  # weight of config forces in loss
    stress_weight: float = 1.0  # weight of config stress in loss
    virials_weight: float = 1.0  # weight of config virial in loss
    config_type: Optional[str] = DEFAULT_CONFIG_TYPE  # config_type of config
    head: Optional[str] = "Default"  # head used to compute the config


Configurations = List[Configuration]

def load_from_h5(
    file_path: str,
    config_type_weights: Dict,
    h5_positions_key: str = 'atXYZ',
    h5_numbers_key: str = "atNUM",
    h5_energy_key: str = 'ePBE0+MBD',
    h5_forces_key: str = 'totFOR',
) -> Tuple[Dict[int, float], Configurations]:
    atoms_list = atoms_from_hdf5(
        file_path, 
        positions_key=h5_positions_key,
        numbers_key=h5_numbers_key,
        energy_key=h5_energy_key,
        forces_key=h5_forces_key
    )


    atomic_energies_dict = {}

    heads = set()
    for atoms in atoms_list:
        heads.add(atoms.info.get("head", "Default"))
    heads = list(heads)

    configs = config_from_atoms_list(
        atoms_list,
        config_type_weights=config_type_weights,
        energy_key="energy",
        forces_key="forces",
        stress_key="stress",
        virials_key="virials",
        dipole_key="dipole",
        charges_key="charges",
        head_key="head",
    )
    return atomic_energies_dict, configs, heads

def atoms_from_hdf5_ani(file_path, positions_key='coordinates', numbers_key="species", energy_key='energies', forces_key='forces'):
    file_path = '/mnt/petrelfs/linchen/FoundationalModel/data/ani-2x/final_h5/ANI-2x-wB97X-631Gd.h5'
    h5 = h5py.File(file_path, "r")
    atoms_list = []
    for num_atoms, properties in tqdm(h5.items()):        #Iterate thorugh like a dictionary
        coordinates = np.array(properties['coordinates'])     #Output of properties is of type h5py Dataset
        species = np.array(properties['species'])
        energies = np.array(properties['energies'])
        forces = np.array(properties['forces'])
        for c, s, e, f in zip(coordinates, species, energies, forces):
            atoms = ase.Atoms(positions=c, numbers=s)
            atoms.info['energy'] = e * ase.units.Hartree # convert to eV
            atoms.arrays['forces'] = f  * ase.units.Hartree # convert to eV
            atoms_list.append(atoms)
    return atoms_list

def atoms_from_hdf5(file_path, positions_key='atXYZ', numbers_key="atNUM", energy_key='ePBE0+MBD', forces_key='totFOR'):
    aqm = h5py.File(file_path, "r")
    atoms_list = []
    AQMmol_ids = list(aqm.keys())

    for molid in AQMmol_ids:
        AQMconf_ids = list(aqm[molid].keys())
        for confid in AQMconf_ids:
            curr_cfg = aqm[molid][confid]
            atoms = ase.Atoms(positions=np.array(curr_cfg[positions_key]), numbers=np.array(curr_cfg[numbers_key]))
            atoms.info['energy'] = np.array(curr_cfg[energy_key]).item()
            atoms.arrays['forces'] = np.array(curr_cfg[forces_key])
            # atoms.info['head'] = "Default"
            atoms_list.append(atoms)

    return atoms_list


In [30]:
import ase
al = atoms_from_hdf5_ani("shit")

  0%|          | 0/59 [00:00<?, ?it/s]


In [34]:
al[0].arrays


{'numbers': array([6, 8]),
 'positions': array([[ 0.        ,  0.        , -0.66948843],
        [ 0.        ,  0.        ,  0.50212127]]),
 'forces': array([[-0.        , -0.        ,  4.07651101],
        [-0.        , -0.        , -4.07651101]])}

In [7]:
import ase
from ase.io import extxyz

identifier = "/mnt/petrelfs/linchen/FoundationalModel/data/oc20/s2ef_train_2M/s2ef_train_2M/18.extxyz.xz"
data_iter = ase.io.iread(f"{identifier}", index=":")

In [9]:
data = next(data_iter)

KeyboardInterrupt: 

In [22]:
import os
import lzma
from tqdm import tqdm

# Directory containing the .xz files
directory = '/mnt/petrelfs/linchen/FoundationalModel/data/oc20/s2ef_train_2M/s2ef_train_2M'

# Decompress each .xz file in the directory
for filename in tqdm(os.listdir(directory)):
    if filename.endswith(".xz"):
        file_path = os.path.join(directory, filename)
        with lzma.open(file_path) as compressed_file:
            decompressed_data = compressed_file.read()
        output_file_path = os.path.splitext(file_path)[0]
        with open(output_file_path, 'wb') as output_file:
            output_file.write(decompressed_data)
        os.remove(file_path)



  0%|          | 0/800 [00:00<?, ?it/s]

100%|██████████| 800/800 [14:47<00:00,  1.11s/it]


In [11]:
import ase
from ase.io import extxyz

identifier = "/mnt/petrelfs/linchen/FoundationalModel/data/oc20/s2ef_train_2M/s2ef_train_2M/181.extxyz"
datas = ase.io.read(f"{identifier}", index=":")

In [23]:
type(datas)

list

In [26]:
datas[0].info['energy']
datas[0].arrays['forces']
datas[0].numbers

array([40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40,
       40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40,
       40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40, 40,
       40, 40, 40, 40, 40, 40, 40, 40, 40, 50, 50, 50, 50, 50, 50, 50, 50,
       50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50,
       50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50,  6,  6,  1,  1,  1,  8,
        8])

In [28]:
def atoms_from_oc20(file_path, positions_key='coordinates', numbers_key="species", energy_key='energies', forces_key='forces'):
    atoms_list = []
    for filename in tqdm(os.listdir(file_path)):
        if filename.endswith(".extxyz"):
            identifier = os.path.join(file_path, filename)
            atoms_list += ase.io.read(identifier, index=":")

    return atoms_list

In [29]:
atom_list = atoms_from_oc20("/mnt/petrelfs/linchen/FoundationalModel/data/oc20/s2ef_train_2M/s2ef_train_2M")

  2%|▎         | 20/800 [01:44<1:08:05,  5.24s/it]


KeyboardInterrupt: 

In [3]:
import os
from tqdm import tqdm
import ase.io
from multiprocessing import Pool, cpu_count



def atoms_from_oc20(file_path):
    filenames = [f for f in os.listdir(file_path) if f.endswith(".extxyz")]
    identifiers = [os.path.join(file_path, f) for f in filenames if f.endswith(".extxyz")]

    def read_atoms_file(identifier):
        return ase.io.read(identifier, index=":")
    
    with Pool(16) as pool:
        results = list(tqdm(pool.imap(read_atoms_file, identifiers), total=len(identifiers)))
    
    # Flatten the list of lists
    atoms_list = [atom for sublist in results for atom in sublist]
    
    return atoms_list

In [4]:
atom_list = atoms_from_oc20("/mnt/petrelfs/linchen/FoundationalModel/data/oc20/s2ef_train_2M/s2ef_train_2M")

  0%|          | 0/400 [00:00<?, ?it/s]

100%|██████████| 400/400 [15:55<00:00,  2.39s/it]


In [5]:
atom_list[0]

Atoms(symbols='C2H4OZn48Zr16', pbc=True, cell=[[11.91010939, 0.0, 0.0], [5.95505469, 10.31445729, 4.43373582], [0.0, 0.0, 31.03615074]], forces=..., tags=..., constraint=FixAtoms(indices=[0, 1, 2, 3, 4, 8, 9, 10, 11, 12, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 29, 31, 33, 36, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 53, 55, 57, 60]), calculator=SinglePointCalculator(...))