In [13]:
import typing as t
from enum import Enum
from pathlib import Path
from tempfile import TemporaryDirectory

from aiida import engine, load_profile, orm
from aiida_workgraph import dynamic, namespace, shelljob, task
from aiida_workgraph.utils import get_dict_from_builder
from ase import Atoms

_ = load_profile()

In [None]:
from aiida_common_workflows.plugins import WorkflowFactory
from aiida_common_workflows.workflows.pp.workchain import CommonPostProcessWorkChain
from aiida_common_workflows.workflows.relax.workchain import CommonRelaxWorkChain

CommonRelaxWorkChain._process_class = engine.Process
CommonPostProcessWorkChain._process_class = engine.Process

In [None]:

class AfmCase(Enum):
    EMPIRICAL = "empirical"
    HARTREE = "hartree"
    HARTREE_RHO = "hartree_rho"


@task
def write_afm_params(params: dict) -> orm.SinglefileData:
    with TemporaryDirectory() as tmpdir:
        afm_filepath = Path(tmpdir) / "params.ini"
        with open(afm_filepath, "w") as config_file:
            for key, value in params.items():
                if isinstance(value, (list, tuple)):
                    value = " ".join(map(str, value))
                config_file.write(f"{key} {value}\n")
        return orm.SinglefileData(file=afm_filepath.as_posix())


@task
def write_structure_file(structure: Atoms, filename: str) -> orm.SinglefileData:
    with TemporaryDirectory() as tmpdir:
        geom_filepath = Path(tmpdir) / filename
        structure.write(geom_filepath, format="xyz")
        return orm.SinglefileData(file=geom_filepath.as_posix())


@task.graph
def DftJob(
    engine: str,
    structure: orm.StructureData,
    parameters: t.Annotated[
        dict,
        namespace(
            engines=namespace(
                relax=namespace(
                    code=orm.Code,
                    options=dict,
                ),
            ),
            protocol=str,
            relax_type=str,
        ),
    ],
) -> t.Annotated[
    dict,
    task(CommonRelaxWorkChain).outputs,
]:
    workflow = WorkflowFactory(f"common_workflows.relax.{engine.value}")
    input_generator = workflow.get_input_generator()
    parameters["protocol"] = parameters["protocol"].value
    parameters["relax_type"] = parameters["relax_type"].value
    parameters["engines"]["relax"]["options"] = parameters["engines"]["relax"]["options"].value
    builder = input_generator.get_builder(structure=structure, **parameters)
    return task(builder._process_class)(**get_dict_from_builder(builder))


@task.graph
def PpJob(
    engine: str,
    parent_folder: orm.RemoteData,
    parameters: t.Annotated[
        dict,
        namespace(
            engines=namespace(
                pp=namespace(
                    code=orm.Code,
                    options=dict,
                ),
            ),
            quantity=str,
        ),
    ],
) -> t.Annotated[
    dict,
    task(CommonPostProcessWorkChain).outputs,
]:
    workflow = WorkflowFactory(f"common_workflows.pp.{engine.value}")
    input_generator = workflow.get_input_generator()
    parameters["quantity"] = parameters["quantity"].value
    parameters["engines"]["pp"]["options"] = parameters["engines"]["pp"]["options"].value
    builder = input_generator.get_builder(parent_folder=parent_folder, **parameters)
    return task(builder._process_class)(**get_dict_from_builder(builder))


In [None]:
@task.graph
def AfmWorkflow(
    engine: str,
    case: AfmCase,
    structure: orm.StructureData,
    afm_params: dict,
    relax: bool = False,
    dft_params: t.Annotated[
        dict[str, dict],
        namespace(
            geom=t.Annotated[
                dict,
                DftJob.inputs.parameters,
            ],
            tip=t.Annotated[
                dict,
                DftJob.inputs.parameters,
            ],
        ),
    ] = None,
    pp_params: t.Annotated[
        dict[str, dict],
        namespace(
            hartree_potential=t.Annotated[
                dict,
                PpJob.inputs.parameters,
            ],
            charge_density=t.Annotated[
                dict,
                PpJob.inputs.parameters,
            ],
        ),
    ] = None,
    tip: orm.StructureData = None,
) -> t.Annotated[dict, dynamic(t.Any)]:
    """AFM simulation workflow."""
    if relax:
        assert dft_params, "Missing DFT parameters"
        geom_dft_params = dft_params.get("geom", {})
        dft_job = DftJob(
            engine=engine,
            structure=structure,
            parameters=geom_dft_params,
        )
        structure = dft_job.relaxed_structure
    else:
        assert structure, "Missing structure"

    geometry_file = write_structure_file(structure, "geo.xyz").result

    assert afm_params, "Missing AFM parameters"
    afm_params_file = write_afm_params(params=afm_params).result

    ljff = shelljob(
        command="ppafm-generate-ljff",
        nodes={
            "geometry": geometry_file,
            "parameters": afm_params_file,
        },
        arguments=[
            "-i",
            "geo.xyz",
            "-f",
            "npy",
        ],
        outputs=["FFLJ.npz"],
    )

    scan_nodes = {
        "parameters": afm_params_file,
        "ljff_data": ljff.FFLJ_npz,
    }

    metadata = {
        "options": {
            "use_symlinks": True,
        }
    }

    if case != AfmCase.EMPIRICAL.name:
        if not relax:
            assert dft_params, "Missing DFT parameters"
            geom_dft_params = dft_params.get("geom", {})
            geom_dft_params["relax_type"] = orm.Str("none")
            dft_job = DftJob(
                engine=engine,
                structure=structure,
                parameters=geom_dft_params,
            )

        assert pp_params, "Missing post-processing parameters"
        hartree_params = pp_params.get("hartree_potential", {})
        assert hartree_params, "Missing Hartree potential post-processing parameters"
        hartree_task = PpJob(
            engine=engine,
            parent_folder=dft_job.remote_folder,
            parameters=hartree_params,
        )

        if case == AfmCase.HARTREE.name:
            elff = shelljob(
                command="ppafm-generate-elff",
                metadata=metadata,
                nodes={
                    "parameters": afm_params_file,
                    "ljff_data": ljff.FFLJ_npz,
                    "hartree_data": hartree_task.remote_folder,
                },
                filenames={
                    "hartree_data": "hartree",
                },
                arguments=[
                    "-i",
                    "hartree/aiida.fileout",
                    "-F",
                    "cube",
                    "-f",
                    "npy",
                ],
                outputs=["FFel.npz"],
            )

            scan_nodes["elff_data"] = elff.FFel_npz

        # Experimental feature, not fully tested
        elif case == AfmCase.HARTREE_RHO.name:
            charge_params = pp_params.get("charge_density", {})
            assert charge_params, "Missing charge density post-processing parameters"
            rho_job = PpJob(
                engine=engine,
                parent_folder=dft_job.remote_folder,
                parameters=charge_params,
            )

            assert tip, "Missing tip structure"
            tip_dft_params = dft_params.get("tip", {})
            assert tip_dft_params, "Missing tip DFT parameters"
            tip_dft_job = DftJob(
                engine=engine,
                structure=tip,
                parameters=tip_dft_params,
            )

            tip_rho_job = PpJob(
                engine=engine,
                parent_folder=tip_dft_job.remote_folder,
                parameters=charge_params,
            )

            conv_rho = shelljob(
                command="ppafm-conv-rho",
                nodes={
                    "geom_density": rho_job.remote_folder,
                    "tip_density": tip_rho_job.remote_folder,
                },
                filenames={
                    "geom_density": "structure",
                    "tip_density": "tip",
                },
                arguments=[
                    "-s",
                    "structure/aiida.fileout",
                    "-t",
                    "tip/aiida.fileout",
                    "-B",
                    "1.0",
                    "-E",
                ],
            )

            charge_elff = shelljob(
                command="ppafm-generate-elff",
                nodes={
                    "conv_rho_data": conv_rho.remote_folder,
                    "hartree_data": hartree_task.remote_folder,
                    "tip_density": tip_rho_job.remote_folder,
                },
                filenames={
                    "conv_rho_data": "conv_rho",
                    "hartree_data": "hartree",
                    "tip_density": "tip",
                },
                arguments=[
                    "-i",
                    "hartree/aiida.fileout",
                    "-tip-dens",
                    "tip/aiida.fileout",
                    "--Rcode",
                    "0.7",
                    "-E",
                    "--doDensity",
                ],
                outputs=["FFel.npz"],
            )

            dftd3 = shelljob(
                command="ppafm-generate-dftd3",
                nodes={
                    "hartree_data": hartree_task.remote_folder,
                },
                filenames={
                    "hartree_data": "hartree",
                },
                arguments=[
                    "-i",
                    "hartree/aiida.fileout",
                    "--df_name",
                    "PBE",
                ],
            )

            elff = shelljob(
                command="ppafm-generate-elff",
                nodes={
                    "hartree_data": hartree_task.remote_folder,
                    "charge_elff_data": charge_elff.FFel_npz,
                },
                filenames={
                    "hartree_data": "hartree",
                },
                arguments=[
                    "-i",
                    "hartree/aiida.fileout",
                    "-f",
                    "npy",
                ],
                outputs=["FFel.npz"],
            )

        else:
            raise ValueError(f"Unsupported case: {case}")

    scan = shelljob(
        command="ppafm-relaxed-scan",
        metadata=metadata,
        nodes=scan_nodes,
        arguments=[
            "-f",
            "npy",
        ],
        outputs=["Q0.00K0.35"],
    )

    results = shelljob(
        command="ppafm-plot-results",
        metadata=metadata,
        nodes={
            "parameters": afm_params_file,
            "scan_dir": scan.Q0_00K0_35,
        },
        filenames={
            "scan_dir": "Q0.00K0.35",
        },
        arguments=[
            "--df",
            "--cbar",
            "--save_df",
            "-f",
            "npy",
        ],
        outputs=["Q0.00K0.35"],
    )

    return results


In [None]:
from aiida_pseudo.groups.family.pseudo import PseudoPotentialFamily

structure = orm.StructureData()
structure.set_cell(
    [
        [14.9412827110, 0.0, 0.0],
        [0.0, 14.5091262213, 0.0],
        [0.0, 0.0, 10.0820001747],
    ]
)
structure.set_pbc((False, False, False))
structure.append_atom(symbols="C", position=(8.2766055186, 6.1177492411, 5.0595246063))
structure.append_atom(symbols="C", position=(8.8582363932, 7.3839854920, 5.0363959612))
structure.append_atom(symbols="C", position=(8.0523006568, 8.5208073256, 5.0178902940))
structure.append_atom(symbols="C", position=(6.6646772714, 8.3913773689, 5.0224782237))
structure.append_atom(symbols="C", position=(6.0830463468, 7.1251409580, 5.0456056988))
structure.append_atom(symbols="C", position=(6.8889822232, 5.9883191444, 5.0641114560))
structure.append_atom(symbols="H", position=(8.9093056269, 5.2253257080, 5.0739448106))
structure.append_atom(symbols="H", position=(9.9475591624, 7.4856249613, 5.0328185108))
structure.append_atom(symbols="H", position=(8.5089059407, 9.5148483491, 4.9998780126))
structure.append_atom(symbols="H", position=(6.0319773431, 9.2838011020, 5.0080584394))
structure.append_atom(symbols="H", position=(4.9937235476, 7.0235011287, 5.0491811392))
structure.append_atom(symbols="H", position=(6.4323774593, 4.9942778709, 5.0821221574))

tip = orm.StructureData()
tip.set_cell(
    [
        [20.0, 0.0, 0.0],
        [0.0, 20.0, 0.0],
        [0.0, 0.0, 20.0],
    ]
)
tip.set_pbc((False, False, False))
tip.append_atom(symbols="C", position=(0.0, 0.0, 1.15))
tip.append_atom(symbols="O", position=(0.0, 0.0, 0.0))

kpoints = orm.KpointsData()
kpoints.set_kpoints_mesh([1, 1, 1])

pseudo_family = t.cast(PseudoPotentialFamily, orm.load_group(4))
C_pp = pseudo_family.get_pseudo("C")
H_pp = pseudo_family.get_pseudo("H")
O_pp = pseudo_family.get_pseudo("O")

In [None]:
afm_params = {
    "PBC": False,
    "tip": "s",
    "klat": 0.3490127886809,
    "krad": 21.913190531846,
    "gridA": [14.9412827110, 0.0000000000, 0.0000000000],
    "gridB": [0.0000000000, 14.5091262213, 0.0000000000],
    "gridC": [0.0000000000, 0.0000000000, 10.0820001747],
    "sigma": 0.7,
    "charge": 0.0,
    "r0Probe": [0.0, 0.0, 2.97],
    "scanMax": [14.9412827110, 14.5091262213, 11],
    "scanMin": [0.0, 0.0, 8],
    "scanStep": [0.1, 0.1, 0.1],
    "Amplitude": 1.4,
    "probeType": "O",
    "f0Cantilever": 22352.5,
    "gridN": [-1, -1, -1],
}

In [None]:
dft_params = {
    "geom": {
        "engines": {
            "relax": {
                "code": orm.load_code("pw-7.4@localhost"),
                "options": {
                    "resources": {
                        "num_machines": 1,
                    },
                    "max_wallclock_seconds": 43200,
                },
            },
        },
        "protocol": "fast",
        "relax_type": "positions",
    },
    "tip": {
        "engines": {
            "relax": {
                "code": orm.load_code("pw-7.4@localhost"),
                "options": {
                    "resources": {
                        "num_machines": 1,
                    },
                    "max_wallclock_seconds": 43200,
                },
            }
        },
        "protocol": "fast",
        "relax_type": "none",
    },
}

In [None]:
pp_params = {
    "hartree_potential": {
        "engines": {
            "pp": {
                "code": orm.load_code("pp-7.4@localhost"),
                "options": {
                    "resources": {
                        "num_machines": 1,
                    },
                    "max_wallclock_seconds": 43200,
                },
            }
        },
        "quantity": "potential",
    },
    "charge_density": {
        "engines": {
            "pp": {
                "code": orm.load_code("pp-7.4@localhost"),
                "options": {
                    "resources": {
                        "num_machines": 1,
                    },
                    "max_wallclock_seconds": 43200,
                },
            }
        },
        "quantity": "charge_density",
    },
}


In [None]:
wg = AfmWorkflow.build(
    engine="quantum_espresso",
    case=AfmCase.HARTREE.name,
    structure=structure,
    afm_params=afm_params,
    relax=False,
    dft_params=dft_params,
    pp_params=pp_params,
    tip=tip,
)

In [None]:
wg

In [None]:
wg.run()

In [None]:
import base64

from IPython.display import HTML

fd: orm.FolderData
if wg.outputs.Q0_00K0_35.value:
    fd = wg.outputs.Q0_00K0_35.value
else:
    node = orm.load_node(wg.pk)
    fd = node.outputs.Q0_00K0_35

imgs = []

png_folder = "Amp1.40"

for obj in fd.list_objects(png_folder):
    if obj.name.endswith(".png"):
        with fd.open(f"{png_folder}/{obj.name}", "rb") as handle:
            data = handle.read()
            data64 = base64.b64encode(data).decode("utf-8")
            imgs.append(f"""
                <img
                    src="data:image/png;base64,{data64}"
                    style="max-width:150px; margin:5px;"
                />
            """)

HTML("".join(imgs))