# Imperative Workflow


## Why an “imperative” API?

The default WorkGraph engines ask you to **declare** every task and dependency *before* execution.
The *imperative* engine in **aiida‑workgraph** lets you write an ordinary Python `async` function instead.  Each time you call a task inside that function the engine immediately schedules it and updates the WorkGraph on‑the‑fly.

*Benefits*

* **Natural control‑flow** – use native `while`, `for`, `if/else`, exceptions.
* **Incremental graphs** – dependencies are inferred automatically.
* **Rapid prototyping** – no context variables or DSL constructs required.

If you need full static inspection before running, the declarative API is still available; you can even mix both styles in one project.

## Two mental models for building workflows


| Aspect                                       | *WorkGraph “design-time” zone* | *Imperative async function* |
| :------------------------------------------- | :--------------------------------------------------------------------- | :-------------------------------------------------------------------- |
| How the graph is produced                    | User **declares** every task and edge up-front; the full DAG exists before execution. | Graph **emerges at run time** as normal Python control-flow is evaluated. |
| Can users inspect / validate before running? | Yes – they can traverse, statically analyse, export HTML or visualise the whole DAG.  | Not until the first run (the engine can surface partial DAGs on-the-fly). |
| Expressiveness for loops / branching         | Needs explicit constructs (`active_while_zone`, context vars).         | Natural Python `while`, `for`, `if`; no extra concepts.             |
| Scheduling & re-runs                         | Easy for a scheduler to plan because every task is known.              | Scheduler must cope with tasks appearing late; re-runs need replay or state capture. |
| Cognitive load                               | Higher upfront (DSL, manual dependency wiring).                        | Lower for Python users; they “just write a function”.               |
| Implementation complexity                    | Mostly synchronous; no -- or minimal -- `async` knowledge required.    | Requires `async`/`await` or background threads so that the runner stays non-blocking. |
| Performance knobs                            | Static optimisations (parallelism, caching) are computable before run. | Need adaptive scheduling; late-bound tasks may surprise resource planners. |

## The simplest flow


In [1]:
from aiida_workgraph import task
from aiida_workgraph.engine.imperative.imperative import WorkGraphImperativeEngine
from aiida.engine import run, submit
from aiida import orm, load_profile

load_profile()

@task.pythonjob()
def add(x, y):
    return x + y

@task.pythonjob()
def multiply(x, y):
    return x * y

async def add_multiply(x, y):
    a = add(x, y)
    b = multiply(1, a.result)   # chain directly on the result
    return {"sum": a.result, "product": b.result}

results = run(
    WorkGraphImperativeEngine,
    inputs={"workgraph_data": {
        "name": "add_multiply",
        "flow": add_multiply,
        "function_inputs": {"x": orm.Int(3), "y": orm.Int(4)}
    }},
)
print("=" * 40)
print("\nResults:")
print(results)


07/03/2025 04:18:51 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:18:51 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow
07/03/2025 04:18:51 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: add


kwargs:  {'x': <Int: uuid: f338cac2-39f8-4831-81bd-05b32d852001 (pk: 138138) value: 3>, 'y': <Int: uuid: cc823646-5da9-484e-8e5f-57d72706e771 (pk: 138139) value: 4>}


07/03/2025 04:18:52 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:18:52 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|update_normal_task_state]: Task: _flow finished.
07/03/2025 04:18:52 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:18:52 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: 138144
07/03/2025 04:18:55 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|update_task_state]: Task: add, type: PythonJob, finished.
07/03/2025 04:18:55 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkCha

kwargs:  {'x': 1, 'y': <Int: uuid: c839c5db-4100-4904-862f-e67a95ea1819 (pk: 138149) value: 7>}


07/03/2025 04:18:56 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: 138154
07/03/2025 04:18:58 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|update_task_state]: Task: multiply, type: PythonJob, finished.
07/03/2025 04:18:59 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:18:59 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138140|WorkGraphImperativeEngine|finalize]: Finalize workgraph.


graph results: {'sum': {'socket_name': 'result', 'task_name': 'add'}, 'product': {'socket_name': 'result', 'task_name': 'multiply'}}

Results:
{'sum': <Int: uuid: c839c5db-4100-4904-862f-e67a95ea1819 (pk: 138149) value: 7>, 'product': <Int: uuid: cd4f7f61-8070-4d1b-ae0e-d109ca8d22a4 (pk: 138158) value: 7>}



### What happens under the hood?

1. The engine creates a fresh WorkGraph called **add\_multiply**.
2. `add()` is scheduled; its output node is stored as `a`.
3. `multiply()` is scheduled as soon as its dependency (`a.result`) is ready.
4. When the async function returns, the engine marks the WorkGraph **FINISHED** and returns `results`.

> **Tip** You use the GUI to inspect the evolving DAG.

---

## Conditional branches (`if/else`)

In [2]:
from aiida_workgraph.engine.imperative.imperative import wait_for

@task.pythonjob()
def add(x, y):
    import time; time.sleep(2)  # simulate work
    return x + y

@task.pythonjob()
def multiply(x, y):
    import time; time.sleep(2)
    return x * y

async def add_then_branch(x, y):
    a = add(x, y)
    await wait_for(a)           # don\'t read the result too early

    if a.result.value > 10:
        m = multiply(1, a.result)
    else:
        m = multiply(-1, a.result)

    return {"sum": a.result, "multiply": m.result}

results = run(
    WorkGraphImperativeEngine,
    inputs={"workgraph_data": {
        "name": "add_multiply",
        "flow": add_then_branch,
        "function_inputs": {"x": orm.Int(3), "y": orm.Int(4)}
    }},
)
print("=" * 40)
print("\nResults:")
print(results)


07/03/2025 04:19:00 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:00 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow
07/03/2025 04:19:00 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: add


kwargs:  {'x': <Int: uuid: 6c2e1767-30de-4748-a2eb-54e2f474e5c3 (pk: 138159) value: 3>, 'y': <Int: uuid: bca0323f-014b-477a-8245-a46ca27262ea (pk: 138160) value: 4>}
node state: RUNNING
node state: RUNNING


07/03/2025 04:19:07 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|update_task_state]: Task: add, type: PythonJob, finished.
07/03/2025 04:19:07 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:07 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow
07/03/2025 04:19:12 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: multiply


kwargs:  {'x': -1, 'y': <Int: uuid: 0273d040-a2a2-42e0-8a3d-7217716b8023 (pk: 138169) value: 7>}


07/03/2025 04:19:13 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|update_normal_task_state]: Task: _flow finished.
07/03/2025 04:19:13 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:13 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: 138175
07/03/2025 04:19:19 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|update_task_state]: Task: multiply, type: PythonJob, finished.
07/03/2025 04:19:19 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138161|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:20 PM <193192> aiida.orm.nodes.process.workflow.workchain.Wo

graph results: {'sum': {'socket_name': 'result', 'task_name': 'add'}, 'multiply': {'socket_name': 'result', 'task_name': 'multiply'}}

Results:
{'sum': <Int: uuid: 0273d040-a2a2-42e0-8a3d-7217716b8023 (pk: 138169) value: 7>, 'multiply': <Int: uuid: 1ec7ae31-6dd2-47f6-ad6b-bd700c5c809f (pk: 138179) value: -7>}



The `if` statement is ordinary Python; the engine only schedules the branch that is actually taken.  The skipped branch never appears in the WorkGraph.


## Loops (`while`)


In [3]:
from aiida_workgraph import task
from aiida_workgraph.engine.imperative.imperative import WorkGraphImperativeEngine, wait_for
from aiida.engine import run, submit
from aiida import orm, load_profile

load_profile()

@task.pythonjob()
def add(x, y):
    return x + y

@task.pythonjob()
def multiply(x, y):
    return x * y


async def keep_doubling(x, y):
    outputs = add(x, y)
    await wait_for(outputs)

    while outputs.result.value < 10:
        outputs1 = add(outputs.result, 1)
        outputs = multiply(outputs1.result, 2)
        await wait_for(outputs)     # make sure we can read new value

    return {"sum": outputs.result}

results = run(
    WorkGraphImperativeEngine,
    inputs={"workgraph_data": {
        "name": "keep_doubling",
        "flow": keep_doubling,
        "function_inputs": {"x": orm.Int(3), "y": orm.Int(4)}
    }},
)
print("=" * 40)
print("\nResults:")
print(results)

07/03/2025 04:19:21 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:21 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow
07/03/2025 04:19:21 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: add


kwargs:  {'x': <Int: uuid: 368ff873-6e77-4f3c-b64f-442590df9081 (pk: 138180) value: 3>, 'y': <Int: uuid: 6b4245dd-2814-409c-a4b6-f7b0550b9a28 (pk: 138181) value: 4>}
node state: RUNNING


07/03/2025 04:19:24 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|update_task_state]: Task: add, type: PythonJob, finished.
07/03/2025 04:19:24 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:24 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow
07/03/2025 04:19:27 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: add1


kwargs:  {'x': <Int: uuid: 4346f59d-d5c8-4750-980a-69896fb672fa (pk: 138190) value: 7>, 'y': 1}


07/03/2025 04:19:28 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 


node state: PLANNED


07/03/2025 04:19:31 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|update_task_state]: Task: add1, type: PythonJob, finished.
07/03/2025 04:19:31 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: multiply


kwargs:  {'x': <Int: uuid: 24016f19-6810-475f-8af1-aaf75d44d6d6 (pk: 138202) value: 8>, 'y': 2}


07/03/2025 04:19:32 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow, 138207


node state: RUNNING


07/03/2025 04:19:35 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|update_task_state]: Task: multiply, type: PythonJob, finished.
07/03/2025 04:19:35 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:35 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|on_wait]: Process status: Waiting for child processes: _flow
07/03/2025 04:19:39 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|update_normal_task_state]: Task: _flow finished.
07/03/2025 04:19:40 PM <193192> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [138182|WorkGraphImperativeEngine|continue_workgraph]: tasks ready to run: 
07/03/2025 04:19:40 PM <193192> aiida.orm.nodes.process.workflow.workchain.Wor

graph results: {'sum': {'socket_name': 'result', 'task_name': 'multiply'}}

Results:
{'sum': <Int: uuid: 9b8af3b6-6649-47bd-9341-2255c6ba85a5 (pk: 138211) value: 16>}



Every iteration adds two more tasks to the WorkGraph.  Because the graph grows dynamically you can observe it expanding live.

> **Performance note** Do not schedule thousands of tiny jobs.


## Waiting for results

`wait_for(task_socket)` suspends the *flow* until the referenced task has reached a terminal state.  This is essential in loops or branches where you need the **value**, not just the future placeholder.  Under the hood it performs an asynchronous poll so your runner remains free to execute other coroutines.

---


## Conclusion

The imperative API combines the **clarity of Python** with the **provenance guarantees of AiiDA**:

* Write workflows as natural coroutines.
* Leverage familiar control structures instead of special DSL constructs.
* Still obtain a fully queryable, shareable WorkGraph.

When you need static analysis or pre‑execution validation drop back to the declarative zones—both styles interoperate because they share the same engine under the hood.
