Skip to content

Commit

Permalink
Engine: Dynamically update maximum stack size close to overflow (#6052)
Browse files Browse the repository at this point in the history
The Python interpreter maintains a stack of frames when executing code
which has a limit. As soon as a frame is added to the stack that were to
exceed this limit a `RecursionError` is raised. Note that, unlike the
name suggests, the cause doesn't need to involve recursion necessarily
although that is a common cause for the problem. Simply creating a deep
but non-recursive call stack will have the same effect.

This `RecursionError` was routinely hit when submitting large numbers of
workflows to the daemon that call one or more process functions. This is
due to the process function being called synchronously in an async
context, namely the workchain, which is being executed as a task on the
event loop of the `Runner` in the daemon worker. To make this possible,
the event loop has to be made reentrant, but this is not supported by
vanilla `asyncio`. This blockade is circumvented in `plumpy` through the
use of `nest-asyncio` which makes a running event loop reentrant.

The problem is that when the event loop is reentered, instead of
creating a separate stack for that task, it reuses the current one.
Consequently, each process function adds frames to the current stack
that are not resolved and removed until after the execution finished. If
many process functions are started before they are finished, these
frames accumulate and can ultimately hit the stack limit. Since the task
queue of the event loop uses a FIFO, it would very often lead to this
situation because all process function tasks would be created first,
before being finalized.

Since an actual solution for this problem is not trivial and this is
causing a lot problems, a temporary workaround is implemented. Each time
when a process function is executed, the current stack size is compared
to the current stack limit. If the stack is more than 80% filled, the
limit is increased by a 1000 and a warning message is logged. This
should give some more leeway for the created process function tasks to
be resolved.

Note that the workaround will keep increasing the limit if necessary
which can and will eventually lead to an actual stack overflow in the
interpreter. When this happens will be machine dependent so it is
difficult to put an absolute limit.

The function to get the stack size is using a custom implementation
instead of the naive `len(inspect.stack())`. This is because the
performance is three order of magnitudes better and it scales well for
deep stacks, which is typically the case for AiiDA daemon workers. See
https://stackoverflow.com/questions/34115298 for a discussion on the
implementation and its performance.
  • Loading branch information
sphuber committed Jun 20, 2023
1 parent 3defb8b commit f797b47
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion aiida/engine/processes/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import collections
import functools
import inspect
import itertools
import logging
import signal
import sys
import types
import typing as t
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -62,6 +64,29 @@
FunctionType = t.TypeVar('FunctionType', bound=t.Callable[..., t.Any]) # pylint: disable=invalid-name


def get_stack_size(size: int = 2) -> int: # type: ignore[return]
"""Return the stack size for the caller's frame.
This solution is taken from https://stackoverflow.com/questions/34115298/ as a more performant alternative to the
naive ``len(inspect.stack())` solution. This implementation is about three orders of magnitude faster compared to
the naive solution and it scales especially well for larger stacks, which will be usually the case for the usage
of ``aiida-core``. However, it does use the internal ``_getframe`` of the ``sys`` standard library. It this ever
were to stop working, simply switch to using ``len(inspect.stack())``.
:param size: Hint for the expected stack size.
:returns: The stack size for caller's frame.
"""
frame = sys._getframe(size) # pylint: disable=protected-access
try:
for size in itertools.count(size, 8): # pylint: disable=redefined-argument-from-local
frame = frame.f_back.f_back.f_back.f_back.f_back.f_back.f_back.f_back # type: ignore[assignment,union-attr]
except AttributeError:
while frame:
frame = frame.f_back # type: ignore[assignment]
size += 1
return size - 1


def calcfunction(function: FunctionType) -> FunctionType:
"""
A decorator to turn a standard python function into a calcfunction.
Expand Down Expand Up @@ -139,8 +164,21 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode
:param args: input arguments to construct the FunctionProcess
:param kwargs: input keyword arguments to construct the FunctionProcess
:return: tuple of the outputs of the process and the process node
"""
frame_delta = 1000
frame_count = get_stack_size()
stack_limit = sys.getrecursionlimit()
LOGGER.info('Executing process function, current stack status: %d frames of %d', frame_count, stack_limit)

# If the current frame count is more than 80% of the stack limit, or comes within 200 frames, increase the
# stack limit by ``frame_delta``.
if frame_count > min(0.8 * stack_limit, stack_limit - 200):
LOGGER.warning(
'Current stack contains %d frames which is close to the limit of %d. Increasing the limit by %d',
frame_count, stack_limit, frame_delta
)
sys.setrecursionlimit(stack_limit + frame_delta)

manager = get_manager()
runner = manager.get_runner()
inputs = process_class.create_inputs(*args, **kwargs)
Expand Down

0 comments on commit f797b47

Please sign in to comment.