In [1]:
from aiida_workgraph import task
from aiida import orm

# define add task
@task()
def add(x, y):
   return x + y

# define multiply calcfunction task
@task.calcfunction()
def multiply(x, y):
   return orm.Float(x + y)

# export the task to html file so that it can be visualized in a browser
add.task()

# visualize the task in jupyter-notebook
# add.task()

NodeGraphWidget(settings={'minmap': False}, style={'width': '80%', 'height': '600px'}, value={'name': 'add0', …

In [2]:
add1 = add.task()
print("Inputs:", add1.inputs, type(add1.inputs))
print("Inputs:", add1.inputs.keys())
print("Outputs:", add1.outputs.keys())


Inputs: InputCollection(node = "add0", sockets = ["x", "y", "_wait"]) <class 'aiida_workgraph.collection.WorkGraphInputSocketCollection'>
Inputs: ['x', 'y', '_wait']
Outputs: ['result', '_wait', '_outputs']


In [3]:
from aiida_workgraph import WorkGraph, task
wg = WorkGraph(name="my_first_workgraph")

# define add calcfunction task
@task.calcfunction()
def add(x, y):
   return x + y

add1 = wg.add_task(add, name="add1")
add2 = wg.add_task(add, name="add2", x=add1.outputs["result"])

# wg.add_link(add1.outputs["result"], add2.inputs["x"])
wg

NodeGraphWidget(settings={'minimap': True}, style={'width': '90%', 'height': '600px'}, value={'name': 'my_firs…

In [4]:
from aiida_workgraph import task


@task(
    outputs=[
        {"identifier": "Any", "name": "sum"},
        {"identifier": "Any", "name": "diff"},
    ]
)
def add_minus(x, y):
    return {"sum": x + y, "difference": x - y}


print("Input ports: ", add_minus.task().inputs.keys())
print("Ouput ports: ", add_minus.task().outputs.keys())
add_minus.task().to_html()

Input ports:  ['x', 'y', '_wait']
Ouput ports:  ['sum', 'diff', '_wait', '_outputs']


In [5]:
from aiida_workgraph import task

@task.calcfunction()
def add(x: int, y: float) -> float:
   return x + y

for input in add.task().inputs:
   print("{:30s}: {:20s}".format(input.name, input.identifier))

metadata                      : Namespace           
metadata.store_provenance     : Any                 
metadata.description          : Any                 
metadata.label                : Any                 
metadata.call_link_label      : Any                 
metadata.disable_cache        : Any                 
x                             : AiiDAInt            
y                             : AiiDAFloat          
_wait                         : Any                 


In [6]:
from aiida_workgraph import task
from aiida import orm

@task.calcfunction(
    inputs=[[orm.Int, "x"], [orm.Float, "y"]],
    outputs=[[orm.Float, "result"]]
)
def add(x, y):
    result = x + y
    return result

TypeError: list indices must be integers or slices, not str

In [None]:
%load_ext aiida
%aiida

The aiida extension is already loaded. To reload it, use:
  %reload_ext aiida


In [None]:
from aiida.engine import calcfunction

@calcfunction
def add(x, y):
    return x + y

@calcfunction
def multiply(x, y):
    return x * y

In [8]:
from aiida_workgraph import WorkGraph

wg = WorkGraph("add_multiply_workflow")
wg.add_task(add, name="add1")
wg.add_task(multiply, name="multiply1", x = wg.tasks["add1"].outputs.result)
# export the workgraph to html file so that it can be visualized in a browser
wg
# visualize the workgraph in jupyter-notebook
# wg

NodeGraphWidget(settings={'minimap': True}, style={'width': '90%', 'height': '600px'}, value={'name': 'add_mul…

In [21]:
from aiida_workgraph import WorkGraph

wg = WorkGraph("add_multiply_workflow")
task_of_add = wg.add_task(add, name="add1")
# wg.add_task(multiply, name="multiply1", x=task_of_add.outputs["result"])
# wg.add_task(multiply, name="multiply1", x=task_of_add.outputs.)
# wg.add_task(multiply, name="multiply1", x=wg.tasks['add1'].outputs["result"])
# export the workgraph to html file so that it can be visualized in a browser
wg
# visualize the workgraph in jupyter-notebook
# wg

SyntaxError: invalid syntax (3304946348.py, line 6)

In [24]:
wg = WorkGraph()
add_task = wg.add_task(add, name="add1", x=0)
multiply_task = wg.add_task(multiply, name="multiply1")
wg.add_link(add_task.outputs.result, multiply_task.inputs.x)


NodeLink(from="add1.result", to="multiply1.x")

In [25]:
from aiida_workgraph import WorkGraph
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation

wg = WorkGraph("test_calcjob")
new = wg.add_task
new(ArithmeticAddCalculation, name="add1")
wg.add_task(ArithmeticAddCalculation, name="add2", x=wg.tasks["add1"].outputs["sum"])

NodeGraphWidget(settings={'minmap': False}, style={'width': '80%', 'height': '600px'}, value={'name': 'add2', …

In [26]:
from aiida_workgraph import WorkGraph
from aiida.engine import calcfunction
from aiida_quantumespresso.calculations.pw import PwCalculation


@calcfunction
def atomization_energy(output_atom, output_mol):
    from aiida.orm import Float

    e = output_atom["energy"] * output_mol["number_of_atoms"] - output_mol["energy"]
    return Float(e)


wg = WorkGraph("atomization_energy")
pw_atom = wg.add_task(PwCalculation, name="pw_atom")
pw_mol = wg.add_task(PwCalculation, name="pw_mol")
# create the task to calculate the atomization energy
wg.add_task(
    atomization_energy,
    name="atomization_energy",
    output_atom=pw_atom.outputs["output_parameters"],
    output_mol=pw_mol.outputs["output_parameters"],
)

wg

NodeGraphWidget(settings={'minimap': True}, style={'width': '90%', 'height': '600px'}, value={'name': 'atomiza…

In [29]:
# Create a WorkGraph which is dynamically generated based on the input
# then we output the result of from the context
from aiida_workgraph import task

@task.graph_builder(outputs = [{"name": "result", "from": "context.result"}])
def add_multiply_if_generator(x, y):
    wg = WorkGraph()
    if x.value > 0:
        add1 = wg.add_task(add, name="add1", x=x, y=y)
        # export the result of add1 to the context, so that context.result = add1.results
        add1.set_context({"result": "result"})
    else:
        multiply1 = wg.add_task(multiply, name="multiply1", x=x, y=y)
        # export the result of multiply1 to the context
        multiply1.set_context({"result": "result"})
    return wg

wg = WorkGraph("if_task")
wg.add_task(add, name="add1")
wg.add_task(add_multiply_if_generator, name="add_multiply_if1", x = wg.tasks["add1"].outputs["result"])
wg.add_task(add, name="add2", x = wg.tasks["add_multiply_if1"].outputs["result"])
# wg

NodeGraphWidget(settings={'minmap': False}, style={'width': '80%', 'height': '600px'}, value={'name': 'add2', …

In [30]:
wg.submit(
    inputs={
        "add1": {"x": 1, "y": 2},
        "add_multiply_if1": {"y": 2},
        "add2": {"y": 2},
    },
    wait=True,
)
# ------------------------- Print the output -------------------------
assert wg.tasks["add2"].outputs["result"].value == 7
print("\nResult of add2 is {} \n\n".format(wg.tasks["add2"].outputs["result"].value))

WorkGraph process created, PK: 1671

Result of add2 is uuid: a01f08aa-e308-464a-bf27-01cb02c50f10 (pk: 1683) value: 7 




In [32]:
from aiida import orm
from aiida_workgraph import task

# explicitly define the output socket name to match the return value of the function
@task.calcfunction(outputs=[{"name": "structures"}])
def scale_structure(structure, scales):
    """Scale the structure by the given scales."""
    atoms = structure.get_ase()
    structures = {}
    for i in range(len(scales)):
        atoms1 = atoms.copy()
        atoms1.set_cell(atoms.cell * scales[i], scale_atoms=True)
        structure = orm.StructureData(ase=atoms1)
        structures[f"s_{i}"] = structure
    return {"structures": structures}

# Output result from context to the output socket
@task.graph_builder(outputs=[{"name": "result", "from": "context.result"}])
def all_scf(structures, scf_inputs):
    """Run the scf calculation for each structure."""
    from aiida_workgraph import WorkGraph
    from aiida_quantumespresso.calculations.pw import PwCalculation
    wg = WorkGraph()
    for key, structure in structures.items():
        pw1 = wg.add_task(PwCalculation, name=f"pw1_{key}", structure=structure)
        pw1.set(scf_inputs)
        # save the output parameters to the context
        pw1.set_context({"output_parameters": f"result.{key}"})
    return wg


@task.calcfunction()
# because this is a calcfunction, and the input datas are dynamic, we need use **datas.
def eos(**datas):
    """Fit the EOS of the data."""
    from ase.eos import EquationOfState

    volumes = []
    energies = []
    for _, data in datas.items():
        volumes.append(data.dict.volume)
        energies.append(data.dict.energy)
        unit = data.dict.energy_units
    #
    eos = EquationOfState(volumes, energies)
    v0, e0, B = eos.fit()
    eos = orm.Dict({"unit": unit, "v0": v0, "e0": e0, "B": B})
    return eos

from aiida_workgraph import WorkGraph

wg = WorkGraph("eos")
scale_structure1 = wg.add_task(scale_structure, name="scale_structure1")
all_scf1 = wg.add_task(all_scf, name="all_scf1", structures=scale_structure1.outputs["structures"])
eos1 = wg.add_task(eos, name="eos1", datas=all_scf1.outputs["result"])

from aiida_workgraph import WorkGraph, task
from aiida_quantumespresso.calculations.pw import PwCalculation

@task.graph_builder(outputs=[{"name": "result", "from": "eos1.result"}])
def eos_workgraph(structure=None, scales=None, scf_inputs=None):
    wg = WorkGraph("eos")
    scale_structure1 = wg.add_task(scale_structure, name="scale_structure1", structure=structure, scales=scales)
    all_scf1 = wg.add_task(all_scf, name="all_scf1", scf_inputs=scf_inputs)
    eos1 = wg.add_task(eos, name="eos1")
    wg.add_link(scale_structure1.outputs["structures"], all_scf1.inputs["structures"])
    wg.add_link(all_scf1.outputs["result"], eos1.inputs["datas"])
    return wg


#-------------------------------------------------------
wg = WorkGraph("relax_eos")
relax_task = wg.add_task(PwCalculation, name="relax1")
eos_wg_task = wg.add_task(eos_workgraph, name="eos1", structure=relax_task.outputs["output_structure"])
wg

NodeGraphWidget(settings={'minimap': True}, style={'width': '90%', 'height': '600px'}, value={'name': 'relax_e…