Skip to content

Commit

Permalink
CLI: Add the verdi process repair command
Browse files Browse the repository at this point in the history
This command replaces `verdi devel rabbitmq tasks analyze`. This command
was added to the `verdi devel` namespace because it is working around a
problem and it was experimental. Since then, it has proved really
efficient and so should be made more directly available to users in case
of stuck processes.

The implementation is moved to `verdi process repair` and the original
command simply forwards to it, while emitting a message that it is
deprecated.

While the original command would not do anything by default and the
`--fix` flag had to be explicitly specified, this behavior is inverted
for `verdi process repair`. By default it will fix inconsistencies and
the `--dry-run` flag can be used to have to old behavior of just
detecting them.
  • Loading branch information
sphuber committed Nov 30, 2023
1 parent 9006eef commit 3e3d9b9
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 176 deletions.
74 changes: 74 additions & 0 deletions aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,77 @@ def _print(communicator, body, sender, subject, correlation_id): # pylint: disa

# Reraise to trigger clicks builtin abort sequence
raise


@verdi_process.command('repair')
@options.DRY_RUN()
@decorators.only_if_daemon_not_running()
@decorators.with_manager
@click.pass_context
def process_repair(ctx, manager, dry_run):
"""Automatically repair all stuck processes.
N.B.: This command requires the daemon to be stopped.
This command queries the database to find all "active" processes, meaning those that haven't yet reached a terminal
state, and cross-references them with the active process tasks in the process queue of RabbitMQ. Any active process
that does not have a corresponding process task can be considered a zombie, as it will never be picked up by a
daemon worker to complete it and will effectively be "stuck". Any process task that does not correspond to an active
process is useless and should be discarded. Finally, duplicate process tasks are also problematic and are discarded.
"""
from aiida.engine.processes.control import get_active_processes, get_process_tasks, iterate_process_tasks

active_processes = get_active_processes(project='id')
process_tasks = get_process_tasks(ctx.obj.profile, manager.get_communicator())

set_active_processes = set(active_processes)
set_process_tasks = set(process_tasks)

echo.echo_info(f'Active processes: {active_processes}')
echo.echo_info(f'Process tasks: {process_tasks}')

state_inconsistent = False

if len(process_tasks) != len(set_process_tasks):
state_inconsistent = True
echo.echo_warning('There are duplicates process tasks: ', nl=False)
echo.echo(set(x for x in process_tasks if process_tasks.count(x) > 1))

if set_process_tasks.difference(set_active_processes):
state_inconsistent = True
echo.echo_warning('There are process tasks for terminated processes: ', nl=False)
echo.echo(set_process_tasks.difference(set_active_processes))

if set_active_processes.difference(set_process_tasks):
state_inconsistent = True
echo.echo_warning('There are active processes without process task: ', nl=False)
echo.echo(set_active_processes.difference(set_process_tasks))

if state_inconsistent:
echo.echo_critical('Inconsistencies detected between database and RabbitMQ.')

if not state_inconsistent:
echo.echo_success('No inconsistencies detected between database and RabbitMQ.')
return

if dry_run:
return

# At this point we have either exited because of inconsistencies and ``--dry-run`` was passed, or we returned
# because there were no inconsistencies, so all that is left is to address inconsistencies
echo.echo_info('Attempting to fix inconsistencies')

# Eliminate duplicate tasks and tasks that correspond to terminated process
for task in iterate_process_tasks(ctx.obj.profile, manager.get_communicator()):
pid = task.body.get('args', {}).get('pid', None)
if pid not in set_active_processes:
with task.processing() as outcome:
outcome.set_result(False)
echo.echo_report(f'Acknowledged task `{pid}`')

# Revive zombie processes that no longer have a process task
process_controller = manager.get_process_controller()
for pid in set_active_processes:
if pid not in set_process_tasks:
process_controller.continue_process(pid)
echo.echo_report(f'Revived process `{pid}`')
133 changes: 8 additions & 125 deletions aiida/cmdline/commands/cmd_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"""`verdi devel rabbitmq` commands."""
from __future__ import annotations

import collections
import re
import sys
import typing as t
Expand All @@ -23,11 +22,8 @@
from aiida.cmdline.utils import decorators, echo, echo_tabulate

if t.TYPE_CHECKING:
import kiwipy.rmq
import requests

from aiida.manage.configuration.profile import Profile


@verdi_devel.group('rabbitmq')
def cmd_rabbitmq():
Expand Down Expand Up @@ -132,16 +128,8 @@ def with_client(ctx, wrapped, _, args, kwargs):
return wrapped(*args, **kwargs)


@wrapt.decorator
def with_manager(wrapped, _, args, kwargs):
"""Decorate a function injecting a :class:`kiwipy.rmq.communicator.RmqCommunicator`."""
from aiida.manage import get_manager
kwargs['manager'] = get_manager()
return wrapped(*args, **kwargs)


@cmd_rabbitmq.command('server-properties')
@with_manager
@decorators.with_manager
def cmd_server_properties(manager):
"""List the server properties."""
import yaml
Expand Down Expand Up @@ -218,7 +206,7 @@ def cmd_queues_delete(client, queues):


@cmd_tasks.command('list')
@with_manager
@decorators.with_manager
@decorators.only_if_daemon_not_running()
@click.pass_context
def cmd_tasks_list(ctx, manager):
Expand All @@ -228,72 +216,18 @@ def cmd_tasks_list(ctx, manager):
only be seen when they are not currently with a daemon worker, this command can only be run when the daemon is not
running.
"""
from aiida.engine.processes.control import get_process_tasks

for pk in get_process_tasks(ctx.obj.profile, manager.get_communicator()):
echo.echo(pk)


def get_active_processes() -> list[int]:
"""Return the list of pks of active processes.
An active process is defined as a process that has a node with its attribute ``process_state`` set to one of:
* ``created``
* ``waiting``
* ``running``
:returns: A list of process pks that are marked as active in the database.
"""
from aiida.engine import ProcessState
from aiida.orm import ProcessNode, QueryBuilder

return QueryBuilder().append( # type: ignore[return-value]
ProcessNode,
filters={
'attributes.process_state': {
'in': [ProcessState.CREATED.value, ProcessState.WAITING.value, ProcessState.RUNNING.value]
}
},
project='id'
).all(flat=True)


def iterate_process_tasks(
profile: Profile, communicator: kiwipy.rmq.RmqCommunicator
) -> collections.abc.Iterator[kiwipy.rmq.RmqIncomingTask]:
"""Return the list of process pks that have a process task in the RabbitMQ process queue.
:returns: A list of process pks that have a corresponding process task with RabbitMQ.
"""
from aiida.manage.external.rmq import get_launch_queue_name

launch_queue = get_launch_queue_name(profile.rmq_prefix)

for task in communicator.task_queue(launch_queue):
yield task


def get_process_tasks(profile: Profile, communicator: kiwipy.rmq.RmqCommunicator) -> list[int]:
"""Return the list of process pks that have a process task in the RabbitMQ process queue.
:returns: A list of process pks that have a corresponding process task with RabbitMQ.
"""
pks = []

for task in iterate_process_tasks(profile, communicator):
try:
pks.append(task.body.get('args', {})['pid'])
except KeyError:
pass

return pks


@cmd_tasks.command('analyze')
@click.option('--fix', is_flag=True, help='Attempt to fix the inconsistencies if any are detected.')
@with_manager
@decorators.only_if_daemon_not_running()
@decorators.deprecated_command('Use `verdi process repair` instead.')
@click.pass_context
def cmd_tasks_analyze(ctx, manager, fix):
def cmd_tasks_analyze(ctx, fix):
"""Perform analysis of process tasks.
This command will perform a query of the database to find all "active" processes, meaning those that haven't yet
Expand All @@ -305,59 +239,8 @@ def cmd_tasks_analyze(ctx, manager, fix):
Use ``-v INFO`` to be more verbose and print more information.
"""
active_processes = get_active_processes()
process_tasks = get_process_tasks(ctx.obj.profile, manager.get_communicator())

set_active_processes = set(active_processes)
set_process_tasks = set(process_tasks)

echo.echo_info(f'Active processes: {active_processes}')
echo.echo_info(f'Process tasks: {process_tasks}')

state_inconsistent = False

if len(process_tasks) != len(set_process_tasks):
state_inconsistent = True
echo.echo_warning('There are duplicates process tasks: ', nl=False)
echo.echo(set(x for x in process_tasks if process_tasks.count(x) > 1))

if set_process_tasks.difference(set_active_processes):
state_inconsistent = True
echo.echo_warning('There are process tasks for terminated processes: ', nl=False)
echo.echo(set_process_tasks.difference(set_active_processes))

if set_active_processes.difference(set_process_tasks):
state_inconsistent = True
echo.echo_warning('There are active processes without process task: ', nl=False)
echo.echo(set_active_processes.difference(set_process_tasks))

if state_inconsistent and not fix:
echo.echo_critical(
'Inconsistencies detected between database and RabbitMQ. Run again with `--fix` to address problems.'
)

if not state_inconsistent:
echo.echo_success('No inconsistencies detected between database and RabbitMQ.')
return

# At this point we have either exited because of inconsistencies and ``--fix`` was not passed, or we returned
# because there were no inconsistencies, so all that is left is to address inconsistencies
echo.echo_info('Attempting to fix inconsistencies')

# Eliminate duplicate tasks and tasks that correspond to terminated process
for task in iterate_process_tasks(ctx.obj.profile, manager.get_communicator()):
pid = task.body.get('args', {}).get('pid', None)
if pid not in set_active_processes:
with task.processing() as outcome:
outcome.set_result(False)
echo.echo_report(f'Acknowledged task `{pid}`')

# Revive zombie processes that no longer have a process task
process_controller = manager.get_process_controller()
for pid in set_active_processes:
if pid not in set_process_tasks:
process_controller.continue_process(pid)
echo.echo_report(f'Revived process `{pid}`')
from .cmd_process import process_repair
ctx.invoke(process_repair, dry_run=not fix)


@cmd_tasks.command('revive')
Expand Down
8 changes: 8 additions & 0 deletions aiida/cmdline/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
__all__ = ('with_dbenv', 'dbenv', 'only_if_daemon_running')


@decorator
def with_manager(wrapped, _, args, kwargs):
"""Decorate a function injecting a :class:`kiwipy.rmq.communicator.RmqCommunicator`."""
from aiida.manage import get_manager
kwargs['manager'] = get_manager()
return wrapped(*args, **kwargs)


def load_backend_if_not_loaded():
"""Load the database backend environment for the currently loaded profile.
Expand Down
38 changes: 36 additions & 2 deletions aiida/engine/processes/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Functions to control and interact with running processes."""
from __future__ import annotations

import collections
import concurrent
import typing as t

Expand All @@ -12,6 +13,7 @@
from aiida.common.exceptions import AiidaException
from aiida.common.log import AIIDA_LOGGER
from aiida.engine.daemon.client import DaemonException, get_daemon_client
from aiida.manage.configuration.profile import Profile
from aiida.manage.manager import get_manager
from aiida.orm import ProcessNode, QueryBuilder
from aiida.tools.query.calculation import CalculationQueryBuilder
Expand All @@ -23,17 +25,49 @@ class ProcessTimeoutException(AiidaException):
"""Raised when action to communicate with a process times out."""


def get_active_processes(paused: bool = False) -> list[ProcessNode]:
def get_active_processes(paused: bool = False, project: str | list[str] = '*') -> list[ProcessNode] | list[t.Any]:
"""Return all active processes, i.e., those with a process state of created, waiting or running.
:param paused: Boolean, if True, filter for processes that are paused.
:param project: Single or list of properties to project. By default projects the entire node.
:return: A list of process nodes of active processes.
"""
filters = CalculationQueryBuilder().get_filters(process_state=('created', 'waiting', 'running'), paused=paused)
builder = QueryBuilder().append(ProcessNode, filters=filters)
builder = QueryBuilder().append(ProcessNode, filters=filters, project=project)
return builder.all(flat=True)


def iterate_process_tasks(
profile: Profile, communicator: kiwipy.rmq.RmqCommunicator
) -> collections.abc.Iterator[kiwipy.rmq.RmqIncomingTask]:
"""Return the list of process pks that have a process task in the RabbitMQ process queue.
:returns: A list of process pks that have a corresponding process task with RabbitMQ.
"""
from aiida.manage.external.rmq import get_launch_queue_name

launch_queue = get_launch_queue_name(profile.rmq_prefix)

for task in communicator.task_queue(launch_queue):
yield task


def get_process_tasks(profile: Profile, communicator: kiwipy.rmq.RmqCommunicator) -> list[int]:
"""Return the list of process pks that have a process task in the RabbitMQ process queue.
:returns: A list of process pks that have a corresponding process task with RabbitMQ.
"""
pks = []

for task in iterate_process_tasks(profile, communicator):
try:
pks.append(task.body.get('args', {})['pid'])
except KeyError:
pass

return pks


def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> None:
"""Revive processes that seem stuck and are no longer reachable.
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/command_line.rst
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ Below is a list with all available subcommands.
list Show a list of running or terminated processes.
pause Pause running processes.
play Play (unpause) paused processes.
repair Automatically repair all stuck processes.
report Show the log report for one or multiple processes.
show Show details for one or multiple processes.
status Print the status of one or multiple processes.
Expand Down
Loading

0 comments on commit 3e3d9b9

Please sign in to comment.