Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options for transport tasks #4583

Merged
merged 3 commits into from Nov 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 13 additions & 12 deletions aiida/engine/processes/calcjobs/tasks.py
Expand Up @@ -22,6 +22,7 @@
from aiida.engine.daemon import execmanager
from aiida.engine.utils import exponential_backoff_retry, interruptable_task
from aiida.schedulers.datastructures import JobState
from aiida.manage.configuration import get_config

from ..process import ProcessState

Expand All @@ -31,8 +32,8 @@
RETRIEVE_COMMAND = 'retrieve'
KILL_COMMAND = 'kill'

TRANSPORT_TASK_RETRY_INITIAL_INTERVAL = 20
TRANSPORT_TASK_MAXIMUM_ATTEMTPS = 5
RETRY_INTERVAL_OPTION = 'transport.task_retry_initial_interval'
MAX_ATTEMPTS_OPTION = 'transport.task_maximum_attempts'

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,8 +64,8 @@ def task_upload_job(process, transport_queue, cancellable):
logger.warning(f'CalcJob<{node.pk}> already marked as SUBMITTING, skipping task_update_job')
raise Return

initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
initial_interval = get_config().get_option(RETRY_INTERVAL_OPTION)
max_attempts = get_config().get_option(MAX_ATTEMPTS_OPTION)

authinfo = node.computer.get_authinfo(node.user)

Expand Down Expand Up @@ -124,8 +125,8 @@ def task_submit_job(node, transport_queue, cancellable):
logger.warning(f'CalcJob<{node.pk}> already marked as WITHSCHEDULER, skipping task_submit_job')
raise Return(node.get_job_id())

initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
initial_interval = get_config().get_option(RETRY_INTERVAL_OPTION)
max_attempts = get_config().get_option(MAX_ATTEMPTS_OPTION)

authinfo = node.computer.get_authinfo(node.user)

Expand Down Expand Up @@ -172,8 +173,8 @@ def task_update_job(node, job_manager, cancellable):
logger.warning(f'CalcJob<{node.pk}> already marked as RETRIEVING, skipping task_update_job')
raise Return(True)

initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
initial_interval = get_config().get_option(RETRY_INTERVAL_OPTION)
max_attempts = get_config().get_option(MAX_ATTEMPTS_OPTION)

authinfo = node.computer.get_authinfo(node.user)
job_id = node.get_job_id()
Expand Down Expand Up @@ -233,8 +234,8 @@ def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancell
logger.warning(f'CalcJob<{node.pk}> already marked as PARSING, skipping task_retrieve_job')
raise Return

initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
initial_interval = get_config().get_option(RETRY_INTERVAL_OPTION)
max_attempts = get_config().get_option(MAX_ATTEMPTS_OPTION)

authinfo = node.computer.get_authinfo(node.user)

Expand Down Expand Up @@ -291,8 +292,8 @@ def task_kill_job(node, transport_queue, cancellable):
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
initial_interval = TRANSPORT_TASK_RETRY_INITIAL_INTERVAL
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS
initial_interval = get_config().get_option(RETRY_INTERVAL_OPTION)
max_attempts = get_config().get_option(MAX_ATTEMPTS_OPTION)

if node.get_state() in [CalcJobState.UPLOADING, CalcJobState.SUBMITTING]:
logger.warning(f'CalcJob<{node.pk}> killed, it was in the {node.get_state()} state')
Expand Down
3 changes: 2 additions & 1 deletion aiida/engine/utils.py
Expand Up @@ -154,7 +154,8 @@ def exponential_backoff_retry(fct, initial_interval=10.0, max_attempts=5, logger
This coroutine will loop ``max_attempts`` times, calling the ``fct`` function, breaking immediately when the call
finished without raising an exception, at which point the returned result will be raised, wrapped in a
``tornado.gen.Result`` instance. If an exception is caught, the function will yield a ``tornado.gen.sleep`` with a
time interval equal to the ``initial_interval`` multiplied by ``2*N`` where ``N`` is the number of excepted calls.
time interval equal to the ``initial_interval`` multiplied by ``2 ** (N - 1)`` where ``N`` is the number of
excepted calls.

:param fct: the function to call, which will be turned into a coroutine first if it is not already
:param initial_interval: the time to wait after the first caught exception before calling the coroutine again
Expand Down
16 changes: 16 additions & 0 deletions aiida/manage/configuration/options.py
Expand Up @@ -185,6 +185,22 @@
'description': 'Boolean whether to print AiiDA deprecation warnings',
'global_only': False,
},
'transport.task_retry_initial_interval': {
'key': 'task_retry_initial_interval',
'valid_type': 'int',
'valid_values': None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good here if we could validate for >0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could imagine merging valid_type/valid_values into something akin to a jsonschema or just allowing it to be callable.

e.g.

{
 "a": {
  "validator": {"type": "integer", "minimum": 1}
 },
 "b": {
  "validator": {"type": "string", "enum": ["value1", "value2"]}
 }
}

or

}
 "a": {
  "validator": lambda: v: isinstance(v, int) and v > 0
 },
 "b": {
  "validator": lambda: v: isinstance(v, str) and v in ("value1", "value2")
 }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending the validation is definitely a good idea, but perhaps we should open a new issue/PR for this? Note that the valid_type is also used to parse the option value, so I'm not sure if we can just merge valid_type/valid_values into one validator function.

'default': 20,
'description': 'Initial time interval for the exponential backoff mechanism.',
'global_only': False,
},
'transport.task_maximum_attempts': {
'key': 'task_maximum_attempts',
'valid_type': 'int',
'valid_values': None,
'default': 5,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I also noticed is that some defaults are just set in the option dict, and some are set by global variables at the top of the module. We should probably be consistent in this. I'd be in favour of removing the global variables, I think the defaults are easy enough to find.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh makes sense if they are not used anywhere else

'description': 'Maximum number of transport task attempts before a Process is Paused.',
'global_only': False,
},
}


Expand Down