Skip to content

Commit

Permalink
Engine: Add the wait argument to submit
Browse files Browse the repository at this point in the history
For demos and tutorials, often in interactive notebooks, it is often
preferred to use `run` instead of `submit` because in this way the cell
will block until the process is done. The cell blocking will signal to
the user that the process is still running and as soon as it returns it
is immediately clear that the results are ready. With `submit` the cell
returns immediately, but the user will now have to resort to manually
checking when the process is done. Solutions are to instruct the user to
call `verdi process list` manually (which they will have to do
repeatedly) or implement some automated loop that checks for the process
to terminate.

However, using `run` has downsides as well, most notably that the
process will be lost if the notebook gets disconnected. For processes
that are expected to run longer, this can be really problematic, and so
`submit` will have to be used regardless.

Here, the `wait` argument is added to `submit`. Set to `False` by
default to keep current behavior, when set to `True`, the function will
mimic the behavior of `run` and only return when the process has
terminated at which point the node is returned. A `REPORT` log message
is emitted each time the state of the process is checked in intervals
of `wait_interval`.
  • Loading branch information
sphuber committed Oct 27, 2023
1 parent 4203f16 commit 8f5e929
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 44 deletions.
50 changes: 29 additions & 21 deletions aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Top level functions that can be used to launch a Process."""
from typing import Any, Dict, Tuple, Type, Union
from __future__ import annotations

import time
import typing as t

from aiida.common import InvalidOperation
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import manager
from aiida.orm import ProcessNode

Expand All @@ -22,19 +26,18 @@

__all__ = ('run', 'run_get_pk', 'run_get_node', 'submit')

TYPE_RUN_PROCESS = Union[Process, Type[Process], ProcessBuilder] # pylint: disable=invalid-name
TYPE_RUN_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder] # pylint: disable=invalid-name
# run can also be process function, but it is not clear what type this should be
TYPE_SUBMIT_PROCESS = Union[Process, Type[Process], ProcessBuilder] # pylint: disable=invalid-name
TYPE_SUBMIT_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder] # pylint: disable=invalid-name
LOGGER = AIIDA_LOGGER.getChild('engine.launch')


def run(process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Dict[str, Any]:
def run(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> dict[str, t.Any]:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class or process function to run
:param inputs: the inputs to be passed to the process
:return: the outputs of the process
"""
if isinstance(process, Process):
runner = process.runner
Expand All @@ -44,14 +47,12 @@ def run(process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Dict[str, Any]:
return runner.run(process, *args, **inputs)


def run_get_node(process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Tuple[Dict[str, Any], ProcessNode]:
def run_get_node(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> tuple[dict[str, t.Any], ProcessNode]:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class, instance, builder or function to run
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and the process node
"""
if isinstance(process, Process):
runner = process.runner
Expand All @@ -61,14 +62,12 @@ def run_get_node(process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Tuple[
return runner.run_get_node(process, *args, **inputs)


def run_get_pk(process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> ResultAndPk:
def run_get_pk(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> ResultAndPk:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class, instance, builder or function to run
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and process node pk
"""
if isinstance(process, Process):
runner = process.runner
Expand All @@ -78,22 +77,23 @@ def run_get_pk(process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> ResultAn
return runner.run_get_pk(process, *args, **inputs)


def submit(process: TYPE_SUBMIT_PROCESS, **inputs: Any) -> ProcessNode:
def submit(process: TYPE_SUBMIT_PROCESS, wait: bool = False, wait_interval: int = 5, **inputs: t.Any) -> ProcessNode:
"""Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.
.. warning: this should not be used within another process. Instead, there one should use the `submit` method of
the wrapping process itself, i.e. use `self.submit`.
.. warning: this should not be used within another process. Instead, there one should use the ``submit`` method of
the wrapping process itself, i.e. use ``self.submit``.
.. warning: submission of processes requires `store_provenance=True`
.. warning: submission of processes requires ``store_provenance=True``.
:param process: the process class, instance or builder to submit
:param inputs: the inputs to be passed to the process
:param wait: when set to ``True``, the submission will be blocking and wait for the process to complete at which
point the function returns the calculation node.
:param wait_interval: the number of seconds to wait between checking the state of the process when ``wait=True``.
:return: the calculation node of the process
"""
# Submitting from within another process requires `self.submit` unless it is a work function, in which case the
# current process in the scope should be an instance of `FunctionProcess`
# Submitting from within another process requires ``self.submit``` unless it is a work function, in which case the
# current process in the scope should be an instance of ``FunctionProcess``.
if is_process_scoped() and not isinstance(Process.current(), FunctionProcess):
raise InvalidOperation('Cannot use top-level `submit` from within another process, use `self.submit` instead')

Expand All @@ -118,8 +118,16 @@ def submit(process: TYPE_SUBMIT_PROCESS, **inputs: Any) -> ProcessNode:

# Do not wait for the future's result, because in the case of a single worker this would cock-block itself
runner.controller.continue_process(process_inited.pid, nowait=False, no_reply=True)
node = process_inited.node

if not wait:
return node

while not node.is_terminated:
time.sleep(wait_interval)
LOGGER.report(f'Process<{node.pk}> has not yet terminated, current state is `{node.process_state}`.')

return process_inited.node
return node


# Allow one to also use run.get_node and run.get_pk as a shortcut, without having to import the functions themselves
Expand Down
6 changes: 6 additions & 0 deletions docs/source/topics/processes/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ The function will submit the calculation to the daemon and immediately return co
.. warning::
For a process to be submittable, the class or function needs to be importable in the daemon environment by a) giving it an :ref:`associated entry point<how-to:plugin-codes:entry-points>` or b) :ref:`including its module path<how-to:faq:process-not-importable-daemon>` in the ``PYTHONPATH`` that the daemon workers will have.

.. tip::
Use ``wait=True`` when calling ``submit`` to wait for the process to complete before returning the node.
This can be useful for tutorials and demos in interactive notebooks where the user should not continue before the process is done.
One could of course also use ``run`` (see below), but then the process would be lost if the interpreter gets accidentally shut down.
By using ``submit``, the process is run by the daemon which takes care of saving checkpoints so it can always be restarted in case of problems.

The ``run`` function is called identically:

.. include:: include/snippets/launch/launch_run.py
Expand Down
45 changes: 22 additions & 23 deletions tests/engine/test_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from aiida import orm
from aiida.common import exceptions
from aiida.engine import CalcJob, Process, WorkChain, calcfunction, launch
from aiida.plugins import CalculationFactory

ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add') # pylint: disable=invalid-name


@calcfunction
Expand Down Expand Up @@ -66,6 +69,19 @@ def add(self):
self.out('result', orm.Int(self.inputs.term_a + self.inputs.term_b).store())


@pytest.mark.usefixtures('started_daemon_client')
def test_submit_wait(aiida_local_code_factory):
"""Test the ``wait`` argument of :meth:`aiida.engine.launch.submit`."""
builder = ArithmeticAddCalculation.get_builder()
builder.code = aiida_local_code_factory('core.arithmetic.add', '/bin/bash')
builder.x = orm.Int(1)
builder.y = orm.Int(1)
builder.metadata = {'options': {'resources': {'num_machines': 1, 'num_mpiprocs_per_machine': 1}}}
node = launch.submit(builder, wait=True, wait_interval=0.1)
assert node.is_finished, node.process_state
assert node.is_finished_ok, node.exit_code


@pytest.mark.requires_rmq
class TestLaunchers:
"""Class to test process launchers."""
Expand Down Expand Up @@ -157,6 +173,9 @@ def init_profile(self, aiida_localhost): # pylint: disable=unused-argument
from aiida.common.folders import CALC_JOB_DRY_RUN_BASE_PATH
assert Process.current() is None
self.computer = aiida_localhost
self.code = orm.InstalledCode(
default_calc_job_plugin='core.arithmetic.add', computer=self.computer, filepath_executable='/bin/bash'
).store()
yield
assert Process.current() is None
# Make sure to clean the test directory that will be generated by the dry-run
Expand All @@ -168,16 +187,8 @@ def init_profile(self, aiida_localhost): # pylint: disable=unused-argument

def test_launchers_dry_run(self):
"""All launchers should work with `dry_run=True`, even `submit` which forwards to `run`."""
from aiida.plugins import CalculationFactory

ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add') # pylint: disable=invalid-name

code = orm.InstalledCode(
default_calc_job_plugin='core.arithmetic.add', computer=self.computer, filepath_executable='/bin/true'
).store()

inputs = {
'code': code,
'code': self.code,
'x': orm.Int(1),
'y': orm.Int(1),
'metadata': {
Expand Down Expand Up @@ -210,16 +221,8 @@ def test_launchers_dry_run(self):

def test_launchers_dry_run_no_provenance(self):
"""Test the launchers in `dry_run` mode with `store_provenance=False`."""
from aiida.plugins import CalculationFactory

ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add') # pylint: disable=invalid-name

code = orm.InstalledCode(
default_calc_job_plugin='core.arithmetic.add', computer=self.computer, filepath_executable='/bin/true'
).store()

inputs = {
'code': code,
'code': self.code,
'x': orm.Int(1),
'y': orm.Int(1),
'metadata': {
Expand Down Expand Up @@ -263,10 +266,6 @@ def test_calcjob_dry_run_no_provenance(self):
"""
import tempfile

code = orm.InstalledCode(
default_calc_job_plugin='core.arithmetic.add', computer=self.computer, filepath_executable='/bin/true'
).store()

with tempfile.NamedTemporaryFile('w+') as handle:
handle.write('dummy_content')
handle.flush()
Expand All @@ -275,7 +274,7 @@ def test_calcjob_dry_run_no_provenance(self):
file_two = orm.SinglefileData(file=handle.name)

inputs = {
'code': code,
'code': self.code,
'single_file': single_file,
'files': {
'file_one': file_one,
Expand Down

0 comments on commit 8f5e929

Please sign in to comment.