From 10165f64f497696e107694e7f079d8efdf671885 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 11:37:55 -0500 Subject: [PATCH 1/6] add sentry run CLI command for creating taskbroker tasks --- src/sentry/runner/commands/run.py | 18 ++++++++++++++++++ src/sentry/taskdemo/__init__.py | 13 +++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 src/sentry/taskdemo/__init__.py diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 1c17cbf8d85712..4c782b8f19f3b6 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -253,6 +253,24 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: raise SystemExit(exitcode) +@run.command() +@log_options() +@configuration +@click.option( + "--num-messages", + type=int, + help="Number of messages to send to the kafka topic", + default=100, + show_default=True, +) +def taskbroker_create_tasks(num_messages: int) -> None: + from sentry.taskdemo import demotasks, say_hello + + for i in range(num_messages): + say_hello.delay("hello world") + click.echo(message=f"Successfully sent {num_messages} messages to {demotasks.topic.name} topic") + + @run.command() @click.option( "--pidfile", diff --git a/src/sentry/taskdemo/__init__.py b/src/sentry/taskdemo/__init__.py new file mode 100644 index 00000000000000..39b932eb7e20e2 --- /dev/null +++ b/src/sentry/taskdemo/__init__.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import logging + +from sentry.taskworker.registry import taskregistry + +logger = logging.getLogger(__name__) +demotasks = taskregistry.create_namespace(name="demos") + + +@demotasks.register(name="demos.say_hello") +def say_hello(name): + print(f"{name}") # noqa From b7fc8bcf9e7b911bc1ef85ed4afaf07d008f6705 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 12:24:31 -0500 Subject: [PATCH 2/6] make importing task variable --- src/sentry/runner/commands/run.py | 37 +++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 4c782b8f19f3b6..ebf0b10955b876 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -257,18 +257,41 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: @log_options() @configuration @click.option( - "--num-messages", + "--repeat", type=int, help="Number of messages to send to the kafka topic", default=100, show_default=True, ) -def taskbroker_create_tasks(num_messages: int) -> None: - from sentry.taskdemo import demotasks, say_hello - - for i in range(num_messages): - say_hello.delay("hello world") - click.echo(message=f"Successfully sent {num_messages} messages to {demotasks.topic.name} topic") +@click.option( + "--task-function", + type=str, + help="The function name of the task to execute located in the module", + required=True, +) +@click.option( + "--path", + type=str, + help="The path to the task module", + required=True, +) +def taskbroker_send_tasks( + path: str, + task_function: str, + repeat: int, +) -> None: + import importlib + + try: + module = importlib.import_module(path) + func = getattr(module, task_function) + except Exception as e: + click.echo(f"Error: {e}") + raise click.Abort() + + for i in range(repeat): + func.delay("hello world") + click.echo(message=f"Successfully sent {repeat} messages.") @run.command() From a7022c09f704f2f1b1912edce06eb636b2d32bf6 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 13:12:06 -0500 Subject: [PATCH 3/6] use args and kwargs --- src/sentry/runner/commands/run.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index ebf0b10955b876..c9a29ff3e5ce77 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -263,6 +263,16 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: default=100, show_default=True, ) +@click.option( + "--kwargs", + type=str, + help="Task function keyword arguments", +) +@click.option( + "--args", + type=str, + help="Task function arguments", +) @click.option( "--task-function", type=str, @@ -278,6 +288,8 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: def taskbroker_send_tasks( path: str, task_function: str, + args: str, + kwargs: str, repeat: int, ) -> None: import importlib @@ -289,8 +301,10 @@ def taskbroker_send_tasks( click.echo(f"Error: {e}") raise click.Abort() - for i in range(repeat): - func.delay("hello world") + for _ in range(repeat): + task_args = [] if not args else eval(args) + task_kwargs = {} if not kwargs else eval(kwargs) + func.delay(*task_args, **task_kwargs) click.echo(message=f"Successfully sent {repeat} messages.") From a751e92bd26719b987de2d46181aa6ff1d64d3d3 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 15:34:13 -0500 Subject: [PATCH 4/6] clean up --- src/sentry/runner/commands/run.py | 24 ++++++++---------------- src/sentry/taskdemo/__init__.py | 4 ++-- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index c9a29ff3e5ce77..41a2acbd5d97e7 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -260,7 +260,7 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: "--repeat", type=int, help="Number of messages to send to the kafka topic", - default=100, + default=1, show_default=True, ) @click.option( @@ -274,36 +274,28 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: help="Task function arguments", ) @click.option( - "--task-function", + "--task-function-path", type=str, - help="The function name of the task to execute located in the module", - required=True, -) -@click.option( - "--path", - type=str, - help="The path to the task module", + help="The path to the function name of the task to execute", required=True, ) def taskbroker_send_tasks( - path: str, - task_function: str, + task_function_path: str, args: str, kwargs: str, repeat: int, ) -> None: - import importlib + from sentry.utils.imports import import_string try: - module = importlib.import_module(path) - func = getattr(module, task_function) + func = import_string(task_function_path) except Exception as e: click.echo(f"Error: {e}") raise click.Abort() + task_args = [] if not args else eval(args) + task_kwargs = {} if not kwargs else eval(kwargs) for _ in range(repeat): - task_args = [] if not args else eval(args) - task_kwargs = {} if not kwargs else eval(kwargs) func.delay(*task_args, **task_kwargs) click.echo(message=f"Successfully sent {repeat} messages.") diff --git a/src/sentry/taskdemo/__init__.py b/src/sentry/taskdemo/__init__.py index 39b932eb7e20e2..91f89f271b689e 100644 --- a/src/sentry/taskdemo/__init__.py +++ b/src/sentry/taskdemo/__init__.py @@ -9,5 +9,5 @@ @demotasks.register(name="demos.say_hello") -def say_hello(name): - print(f"{name}") # noqa +def say_hello(name, age): + print(f"{name} is {age} years old") # noqa From 1b1f60dd921ba7c1ea6bbfcc0196849102091b3d Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 15:37:57 -0500 Subject: [PATCH 5/6] remove task demo to taskworker subdir --- src/sentry/taskdemo/__init__.py | 13 ------------- src/sentry/taskworker/tasks/examples.py | 13 +++++++++++++ 2 files changed, 13 insertions(+), 13 deletions(-) delete mode 100644 src/sentry/taskdemo/__init__.py create mode 100644 src/sentry/taskworker/tasks/examples.py diff --git a/src/sentry/taskdemo/__init__.py b/src/sentry/taskdemo/__init__.py deleted file mode 100644 index 91f89f271b689e..00000000000000 --- a/src/sentry/taskdemo/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from __future__ import annotations - -import logging - -from sentry.taskworker.registry import taskregistry - -logger = logging.getLogger(__name__) -demotasks = taskregistry.create_namespace(name="demos") - - -@demotasks.register(name="demos.say_hello") -def say_hello(name, age): - print(f"{name} is {age} years old") # noqa diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py new file mode 100644 index 00000000000000..cec19ef87ce188 --- /dev/null +++ b/src/sentry/taskworker/tasks/examples.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import logging + +from sentry.taskworker.registry import taskregistry + +logger = logging.getLogger(__name__) +exampletasks = taskregistry.create_namespace(name="examples") + + +@exampletasks.register(name="examples.say_hello") +def say_hello(name): + print(f"Hello {name}") # noqa From 36396ee44108b4045b907b51f25566b6a07054b4 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Wed, 27 Nov 2024 12:57:52 -0500 Subject: [PATCH 6/6] fix typing --- src/sentry/taskworker/tasks/examples.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/taskworker/tasks/examples.py b/src/sentry/taskworker/tasks/examples.py index cec19ef87ce188..2e077d70e9b88b 100644 --- a/src/sentry/taskworker/tasks/examples.py +++ b/src/sentry/taskworker/tasks/examples.py @@ -9,5 +9,5 @@ @exampletasks.register(name="examples.say_hello") -def say_hello(name): +def say_hello(name: str) -> None: print(f"Hello {name}") # noqa