Skip to content

Commit

Permalink
Make custom remote control commands available in CLI (#8489)
Browse files Browse the repository at this point in the history
* Make custom remote control commands available in CLI

* fixup (remove accidentally commited todo comments)

* Avoid breaking test_worker by modifying os.environ

* Reset global state after each preload test
  • Loading branch information
tobinus committed Feb 28, 2024
1 parent f8c952d commit 582e169
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 58 deletions.
89 changes: 69 additions & 20 deletions celery/bin/control.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""The ``celery control``, ``. inspect`` and ``. status`` programs."""
from functools import partial
from typing import Literal

import click
from kombu.utils.json import dumps
Expand Down Expand Up @@ -39,18 +40,69 @@ def _consume_arguments(meta, method, args):
args[:] = args[i:]


def _compile_arguments(action, args):
meta = Panel.meta[action]
def _compile_arguments(command, args):
meta = Panel.meta[command]
arguments = {}
if meta.args:
arguments.update({
k: v for k, v in _consume_arguments(meta, action, args)
k: v for k, v in _consume_arguments(meta, command, args)
})
if meta.variadic:
arguments.update({meta.variadic: args})
return arguments


_RemoteControlType = Literal['inspect', 'control']


def _verify_command_name(type_: _RemoteControlType, command: str) -> None:
choices = _get_commands_of_type(type_)

if command not in choices:
command_listing = ", ".join(choices)
raise click.UsageError(
message=f'Command {command} not recognized. Available {type_} commands: {command_listing}',
)


def _list_option(type_: _RemoteControlType):
def callback(ctx: click.Context, param, value) -> None:
if not value:
return
choices = _get_commands_of_type(type_)

formatter = click.HelpFormatter()

with formatter.section(f'{type_.capitalize()} Commands'):
command_list = []
for command_name, info in choices.items():
if info.signature:
command_preview = f'{command_name} {info.signature}'
else:
command_preview = command_name
command_list.append((command_preview, info.help))
formatter.write_dl(command_list)
ctx.obj.echo(formatter.getvalue(), nl=False)
ctx.exit()

return click.option(
'--list',
is_flag=True,
help=f'List available {type_} commands and exit.',
expose_value=False,
is_eager=True,
callback=callback,
)


def _get_commands_of_type(type_: _RemoteControlType) -> dict:
command_name_info_pairs = [
(name, info) for name, info in Panel.meta.items()
if info.type == type_ and info.visible
]
return dict(sorted(command_name_info_pairs))


@click.command(cls=CeleryCommand)
@click.option('-t',
'--timeout',
Expand Down Expand Up @@ -96,10 +148,8 @@ def status(ctx, timeout, destination, json, **kwargs):

@click.command(cls=CeleryCommand,
context_settings={'allow_extra_args': True})
@click.argument("action", type=click.Choice([
name for name, info in Panel.meta.items()
if info.type == 'inspect' and info.visible
]))
@click.argument('command')
@_list_option('inspect')
@click.option('-t',
'--timeout',
cls=CeleryOption,
Expand All @@ -121,19 +171,19 @@ def status(ctx, timeout, destination, json, **kwargs):
help='Use json as output format.')
@click.pass_context
@handle_preload_options
def inspect(ctx, action, timeout, destination, json, **kwargs):
"""Inspect the worker at runtime.
def inspect(ctx, command, timeout, destination, json, **kwargs):
"""Inspect the workers by sending them the COMMAND inspect command.
Availability: RabbitMQ (AMQP) and Redis transports.
"""
_verify_command_name('inspect', command)
callback = None if json else partial(_say_remote_command_reply, ctx,
show_reply=True)
arguments = _compile_arguments(action, ctx.args)
arguments = _compile_arguments(command, ctx.args)
inspect = ctx.obj.app.control.inspect(timeout=timeout,
destination=destination,
callback=callback)
replies = inspect._request(action,
**arguments)
replies = inspect._request(command, **arguments)

if not replies:
raise CeleryCommandException(
Expand All @@ -153,10 +203,8 @@ def inspect(ctx, action, timeout, destination, json, **kwargs):

@click.command(cls=CeleryCommand,
context_settings={'allow_extra_args': True})
@click.argument("action", type=click.Choice([
name for name, info in Panel.meta.items()
if info.type == 'control' and info.visible
]))
@click.argument('command')
@_list_option('control')
@click.option('-t',
'--timeout',
cls=CeleryOption,
Expand All @@ -178,16 +226,17 @@ def inspect(ctx, action, timeout, destination, json, **kwargs):
help='Use json as output format.')
@click.pass_context
@handle_preload_options
def control(ctx, action, timeout, destination, json):
"""Workers remote control.
def control(ctx, command, timeout, destination, json):
"""Send the COMMAND control command to the workers.
Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
"""
_verify_command_name('control', command)
callback = None if json else partial(_say_remote_command_reply, ctx,
show_reply=True)
args = ctx.args
arguments = _compile_arguments(action, args)
replies = ctx.obj.app.control.broadcast(action, timeout=timeout,
arguments = _compile_arguments(command, args)
replies = ctx.obj.app.control.broadcast(command, timeout=timeout,
destination=destination,
callback=callback,
reply=True,
Expand Down
69 changes: 31 additions & 38 deletions t/unit/app/test_preload_cli.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,41 @@
import contextlib
from typing import Tuple
from unittest.mock import patch

import pytest
from click.testing import CliRunner

from celery.bin.celery import celery


def test_preload_options(isolated_cli_runner: CliRunner):
# Verify commands like shell and purge can accept preload options.
# Projects like Pyramid-Celery's ini option should be valid preload
# options.

# TODO: Find a way to run these separate invoke and assertions
# such that order does not matter. Currently, running
# the "t.unit.bin.proj.pyramid_celery_app" first seems
# to result in cache or memoization of the option.
# As a result, the expected exception is not raised when
# the invoke on "t.unit.bin.proj.app" is run as a second
# call.
@pytest.fixture(autouse=True)
def reset_command_params_between_each_test():
with contextlib.ExitStack() as stack:
for command in celery.commands.values():
# We only need shallow copy -- preload options are appended to the list,
# existing options are kept as-is
params_copy = command.params[:]
patch_instance = patch.object(command, "params", params_copy)
stack.enter_context(patch_instance)

res_without_preload = isolated_cli_runner.invoke(
celery,
["-A", "t.unit.bin.proj.app", "purge", "-f", "--ini", "some_ini.ini"],
catch_exceptions=True,
)
yield

assert "No such option: --ini" in res_without_preload.stdout
assert res_without_preload.exit_code == 2

@pytest.mark.parametrize(
"subcommand_with_params",
[
("purge", "-f"),
("shell",),
]
)
def test_preload_options(subcommand_with_params: Tuple[str, ...], isolated_cli_runner: CliRunner):
# Verify commands like shell and purge can accept preload options.
# Projects like Pyramid-Celery's ini option should be valid preload
# options.
res_without_preload = isolated_cli_runner.invoke(
celery,
["-A", "t.unit.bin.proj.app", "shell", "--ini", "some_ini.ini"],
catch_exceptions=True,
["-A", "t.unit.bin.proj.app", *subcommand_with_params, "--ini", "some_ini.ini"],
catch_exceptions=False,
)

assert "No such option: --ini" in res_without_preload.stdout
Expand All @@ -39,25 +46,11 @@ def test_preload_options(isolated_cli_runner: CliRunner):
[
"-A",
"t.unit.bin.proj.pyramid_celery_app",
"purge",
"-f",
*subcommand_with_params,
"--ini",
"some_ini.ini",
],
catch_exceptions=True,
catch_exceptions=False,
)

assert res_with_preload.exit_code == 0

res_with_preload = isolated_cli_runner.invoke(
celery,
[
"-A",
"t.unit.bin.proj.pyramid_celery_app",
"shell",
"--ini",
"some_ini.ini",
],
catch_exceptions=True,
)
assert res_with_preload.exit_code == 0
assert res_with_preload.exit_code == 0, res_with_preload.stdout
24 changes: 24 additions & 0 deletions t/unit/bin/proj/app_with_custom_cmds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from celery import Celery
from celery.worker.control import control_command, inspect_command


@control_command(
args=[('a', int), ('b', int)],
signature='a b',
)
def custom_control_cmd(state, a, b):
"""Ask the workers to reply with a and b."""
return {'ok': f'Received {a} and {b}'}


@inspect_command(
args=[('x', int)],
signature='x',
)
def custom_inspect_cmd(state, x):
"""Ask the workers to reply with x."""
return {'ok': f'Received {x}'}


app = Celery(set_as_current=False)
app.config_from_object('t.integration.test_worker_config')
82 changes: 82 additions & 0 deletions t/unit/bin/test_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
import re
from unittest.mock import patch

import pytest
from click.testing import CliRunner

from celery.bin.celery import celery
from celery.platforms import EX_UNAVAILABLE

_GLOBAL_OPTIONS = ['-A', 't.unit.bin.proj.app_with_custom_cmds', '--broker', 'memory://']
_INSPECT_OPTIONS = ['--timeout', '0'] # Avoid waiting for the zero workers to reply


@pytest.fixture(autouse=True)
def clean_os_environ():
# Celery modifies os.environ when given the CLI option --broker memory://
# This interferes with other tests, so we need to reset os.environ
with patch.dict(os.environ, clear=True):
yield


@pytest.mark.parametrize(
('celery_cmd', 'custom_cmd'),
[
('inspect', ('custom_inspect_cmd', '123')),
('control', ('custom_control_cmd', '123', '456')),
],
)
def test_custom_remote_command(celery_cmd, custom_cmd, isolated_cli_runner: CliRunner):
res = isolated_cli_runner.invoke(
celery,
[*_GLOBAL_OPTIONS, celery_cmd, *_INSPECT_OPTIONS, *custom_cmd],
catch_exceptions=False,
)
assert res.exit_code == EX_UNAVAILABLE, (res, res.stdout)
assert res.stdout.strip() == 'Error: No nodes replied within time constraint'


@pytest.mark.parametrize(
('celery_cmd', 'remote_cmd'),
[
# Test nonexistent commands
('inspect', 'this_command_does_not_exist'),
('control', 'this_command_does_not_exist'),
# Test commands that exist, but are of the wrong type
('inspect', 'custom_control_cmd'),
('control', 'custom_inspect_cmd'),
],
)
def test_unrecognized_remote_command(celery_cmd, remote_cmd, isolated_cli_runner: CliRunner):
res = isolated_cli_runner.invoke(
celery,
[*_GLOBAL_OPTIONS, celery_cmd, *_INSPECT_OPTIONS, remote_cmd],
catch_exceptions=False,
)
assert res.exit_code == 2, (res, res.stdout)
assert f'Error: Command {remote_cmd} not recognized. Available {celery_cmd} commands: ' in res.stdout


_expected_inspect_regex = (
'\n custom_inspect_cmd x\\s+Ask the workers to reply with x\\.\n'
)
_expected_control_regex = (
'\n custom_control_cmd a b\\s+Ask the workers to reply with a and b\\.\n'
)


@pytest.mark.parametrize(
('celery_cmd', 'expected_regex'),
[
('inspect', re.compile(_expected_inspect_regex, re.MULTILINE)),
('control', re.compile(_expected_control_regex, re.MULTILINE)),
],
)
def test_listing_remote_commands(celery_cmd, expected_regex, isolated_cli_runner: CliRunner):
res = isolated_cli_runner.invoke(
celery,
[*_GLOBAL_OPTIONS, celery_cmd, '--list'],
)
assert res.exit_code == 0, (res, res.stdout)
assert expected_regex.search(res.stdout)

0 comments on commit 582e169

Please sign in to comment.