In this tutorial, you'll learn how to read and export records.

In [1]:
%load_ext autoreload
%autoreload 2

from typing import cast

import os
import dotenv

from tqdm.auto import tqdm

import datamol as dm
import pandas as pd

import zarr

from openff.toolkit import Molecule
import qcelemental as qcel
from qcportal import PortalClient

_ = dotenv.load_dotenv("../../openfractal_test_secrets.env")

## Initialize the client and list the datasets

In [5]:
client = PortalClient(
    address="https://openfractal-test-pgzbs3yryq-uc.a.run.app",
    username=os.environ["OPENFRACTAL_USER_3_USERNAME"],
    password=os.environ["OPENFRACTAL_USER_3_PASSWORD"],
)

client.server_info

{'name': 'openfractal-test',
 'manager_heartbeat_frequency': 10,
 'manager_heartbeat_max_missed': 5,
 'version': '0.50b11.post13+gc0062725',
 'api_limits': {'get_records': 1000,
  'add_records': 500,
  'get_dataset_entries': 2000,
  'get_molecules': 1000,
  'add_molecules': 1000,
  'get_managers': 1000,
  'manager_tasks_claim': 200,
  'manager_tasks_return': 10,
  'get_server_stats': 25,
  'get_access_logs': 1000,
  'get_error_logs': 100,
  'get_internal_jobs': 1000},
 'client_version_lower_limit': '0.50b11',
 'client_version_upper_limit': '1',
 'manager_version_lower_limit': '0.50b11',
 'manager_version_upper_limit': '1',
 'motd': ''}

Let's list the available dataset.

In [4]:
client.list_datasets()

[]

## End-to-end export to Zarr

The below function is opiniated en-to-end export pipeline. It's very opiniated and probably not optimal since many of the outputs are not stored as array.

Use it as a guide to export and store only the relevant informations for your usecase.

In [None]:
def export_dataset_to_zarr(
    client: PortalClient,
    dataset_name: str,
    dataset_type: str,
    output_file: str,
    chunksize: int = 1_000,
    progress: bool = True,
    progress_leave: bool = False,
):
    # This could be easily parallelized if we want to.
    # Parallelization will not work if using ZIP.

    # Get the dataset
    ds = client.get_dataset(dataset_type=dataset_type, dataset_name=dataset_name)

    root = zarr.open(output_file, mode="w")
    root = cast(zarr.Group, root)
    molecules_group = root.create_group("/molecules")

    for i in tqdm(
        range(0, len(ds.entry_names), chunksize),
        disable=not progress,
        leave=progress_leave,
    ):
        # Pull completed records for those entry names
        chunk_entry_names = ds.entry_names[i : i + chunksize]
        records = ds.iterate_records(
            entry_names=chunk_entry_names, status="complete", force_refetch=True
        )

        for entry_name, specification_name, record in records:
            # Fetch molecule
            record.molecule
            # Fetch wavefunction
            record.wavefunction
            # Fetch compute_history
            record.compute_history_  # consider not fetching it

            # Get dict
            record_dict = record.dict()

            # Get infos about the molecule (3D system)
            mol = record_dict["molecule_"]

            smiles = mol["extras"]["canonical_isomeric_explicit_hydrogen_mapped_smiles"]
            molecule_hash = mol["identifiers"]["molecule_hash"]
            conformations = mol["geometry"]

            # Get a group for that molecule
            if molecule_hash in molecules_group:
                molecule_group = molecules_group[molecule_hash]
            else:
                molecule_group = molecules_group.create_group(molecule_hash)

            # Save infos about the molecule
            molecule_group.attrs["smiles"] = smiles
            molecule_group.attrs["molecule_hash"] = molecule_hash
            molecule_group["conformations"] = conformations

            # Get group for all the specifications for that molecule
            if "specifications" in molecule_group:
                specifications_group = molecule_group["specifications"]
            else:
                specifications_group = molecule_group.create_group("specifications")

            # Get a group for that specification
            if specification_name in specifications_group:
                raise ValueError(
                    f"The specification '{specification_name}' already exists."
                )
            else:
                specification_group = specifications_group.create_group(
                    specification_name
                )

            specification_group.attrs["entry_name"] = entry_name
            specification_group.attrs["specification_name"] = specification_name

            # Save the infos of the specification as attributes
            specification_group.attrs["id"] = record_dict["id"]
            specification_group.attrs["record_type"] = record_dict["record_type"]
            specification_group.attrs["is_service"] = record_dict["is_service"]
            specification_group.attrs["extras"] = record_dict["extras"]
            specification_group.attrs["status"] = record_dict["status"]
            specification_group.attrs["manager_name"] = record_dict["manager_name"]
            specification_group.attrs["created_on"] = record_dict[
                "created_on"
            ].isoformat()
            specification_group.attrs["modified_on"] = record_dict[
                "modified_on"
            ].isoformat()
            specification_group.attrs["owner_user"] = record_dict["owner_user"]
            specification_group.attrs["owner_group"] = record_dict["owner_group"]
            specification_group.attrs["compute_history_"] = record_dict[
                "compute_history_"
            ]
            specification_group.attrs["task_"] = record_dict["task_"]
            specification_group.attrs["service_"] = record_dict["service_"]
            specification_group.attrs["comments_"] = record_dict["comments_"]
            specification_group.attrs["native_files_"] = record_dict["native_files_"]
            specification_group.attrs["molecule_id"] = record_dict["molecule_id"]

            # For now we save the QM properties and the wavefunction as attributes as well.
            # This is obviously NOT IDEAL.
            specification_group.attrs["wavefunction_"] = record_dict["specification"]
            specification_group.attrs["properties"] = record_dict["properties"]

    # Cleanup (only needed when zip file)
    root.store.close()

    return root

In [92]:
root = export_dataset_to_zarr(
    client=client,
    dataset_name="dataset_1",
    dataset_type="singlepoint",
    output_file="/home/hadim/Data/openfractal/export_test.zarr",
    chunksize=1_000,
)

  0%|          | 0/1 [00:00<?, ?it/s]