diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 1c17cbf8d85712..41a2acbd5d97e7 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -253,6 +253,53 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: raise SystemExit(exitcode) +@run.command() +@log_options() +@configuration +@click.option( + "--repeat", + type=int, + help="Number of messages to send to the kafka topic", + default=1, + show_default=True, +) +@click.option( + "--kwargs", + type=str, + help="Task function keyword arguments", +) +@click.option( + "--args", + type=str, + help="Task function arguments", +) +@click.option( + "--task-function-path", + type=str, + help="The path to the function name of the task to execute", + required=True, +) +def taskbroker_send_tasks( + task_function_path: str, + args: str, + kwargs: str, + repeat: int, +) -> None: + from sentry.utils.imports import import_string + + try: + func = import_string(task_function_path) + except Exception as e: + click.echo(f"Error: {e}") + raise click.Abort() + task_args = [] if not args else eval(args) + task_kwargs = {} if not kwargs else eval(kwargs) + + for _ in range(repeat): + func.delay(*task_args, **task_kwargs) + click.echo(message=f"Successfully sent {repeat} messages.") + + @run.command() @click.option( "--pidfile", diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py new file mode 100644 index 00000000000000..2e077d70e9b88b --- /dev/null +++ b/src/sentry/taskworker/tasks/examples.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import logging + +from sentry.taskworker.registry import taskregistry + +logger = logging.getLogger(__name__) +exampletasks = taskregistry.create_namespace(name="examples") + + +@exampletasks.register(name="examples.say_hello") +def say_hello(name: str) -> None: + print(f"Hello {name}") # noqa