diff --git a/bin/send_tasks.py b/bin/send_tasks.py new file mode 100644 index 00000000000000..1b29aee48d4f3b --- /dev/null +++ b/bin/send_tasks.py @@ -0,0 +1,23 @@ +import click + +from sentry.taskdemo import say_hello + + +def produce_activations(num_activations: int): + for i in range(num_activations): + say_hello.delay(f"{i}") + + +@click.option( + "--num-activations", + type=int, + default=1, + show_default=True, + help="Number of task activations to send to kafka", +) +def main(num_activations: int): + produce_activations(num_activations) + + +if __name__ == "__main__": + main() diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 1c17cbf8d85712..4d9e7891d3323e 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -4,7 +4,7 @@ import os import signal from multiprocessing import cpu_count -from typing import Any +from threading import Thread import click @@ -253,6 +253,230 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: raise SystemExit(exitcode) +@run.command() +@click.option("--consumer-path", type=str, help="Path to taskbroker brinary") +@click.option( + "--num-consumers", + type=int, + help="Number of consumers to run in the consumer group", + default=8, + show_default=True, +) +@click.option( + "--num-messages", + type=int, + help="Number of messages to send to the kafka topic", + default=80_000, + show_default=True, +) +@click.option( + "--num-restarts", + type=int, + help="Number of restarts for each consumers", + default=24, + show_default=True, +) +@click.option( + "--min-restart-duration", + type=int, + help="Minimum number of seconds between each restarts per consumer", + default=1, + show_default=True, +) +@click.option( + "--max-restart-duration", + type=int, + help="Maximum number of seconds between each restarts per consumer", + default=30, + show_default=True, +) +@log_options() +@configuration +def taskbroker_integration_test( + consumer_path: str, + num_consumers: int, + num_messages: int, + num_restarts: int, + min_restart_duration: int, + max_restart_duration: int, +) -> None: + import random + import sqlite3 + import subprocess + import threading + import time + from pathlib import Path + + import yaml + + def manage_consumer( + consumer_index: int, + consumer_path: str, + config_file: str, + iterations: int, + min_sleep: int, + max_sleep: int, + log_file_path: str, + ) -> None: + with open(log_file_path, "a") as log_file: + print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}") + for i in range(iterations): + config_file_path = f"../taskbroker/tests/{config_file}" + process = subprocess.Popen([consumer_path, "-c", config_file_path], stderr=log_file) + time.sleep(random.randint(min_sleep, max_sleep)) + print( + f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" + ) + process.send_signal(signal.SIGINT) + try: + return_code = process.wait(timeout=10) + assert return_code == 0 + except Exception: + process.kill() + + if num_consumers < 1: + print("Number of consumers must be greater than 0") + return + + # First check if taskdemo topic exists + print("Checking if taskdemo topic already exists") + check_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--list", + ] + result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) + topics = result.stdout.strip().split("\n") + # Create taskdemo Kafka topic with 32 partitions + if "task-worker" not in topics: + print("task-worker topic does not exist, creating it with 32 partitions") + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + "task-worker", + "--partitions", + "32", + "--replication-factor", + "1", + ] + subprocess.run(create_topic_cmd, check=True) + else: + print("Taskdemo topic already exists, making sure it has 32 partitions") + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--alter", + "--topic", + "task-worker", + "--partitions", + "32", + ] + subprocess.run(create_topic_cmd, check=True) + except Exception: + pass + + # Create config files for consumers + print("Creating config files for consumers in taskbroker/tests") + consumer_configs = { + f"config_{i}.yml": { + "db_path": f"db_{i}_{int(time.time())}.sqlite", + "kafka_topic": "task-worker", + "kafka_consumer_group": "task-worker-integration-test", + "kafka_auto_offset_reset": "earliest", + "grpc_port": 50051 + i, + } + for i in range(num_consumers) + } + + test_dir = Path("../taskbroker/tests") + test_dir.mkdir(parents=True, exist_ok=True) + + for filename, config in consumer_configs.items(): + with open(test_dir / filename, "w") as f: + yaml.safe_dump(config, f) + + try: + # Produce a test message to the taskdemo topic + from sentry.taskdemo import say_hello + + for i in range(num_messages): + print(f"Sending message: {i}", end="\r") + say_hello.delay("hello world") + print(f"\nDone: sent {num_messages} messages") + + threads: list[Thread] = [] + for i in range(num_consumers): + thread = threading.Thread( + target=manage_consumer, + args=( + i, + consumer_path, + f"config_{i}.yml", + num_restarts, + min_restart_duration, + max_restart_duration, + f"consumer_{i}.log", + ), + ) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + except Exception: + raise + + attach_db_stmt = "".join( + [ + f"attach '{config['db_path']}' as {config['db_path'].replace('.sqlite', '')};\n" + for config in consumer_configs.values() + ] + ) + from_stmt = "\nUNION ALL\n".join( + [ + f" SELECT * FROM {config['db_path'].replace('.sqlite', '')}.inflight_taskactivations" + for config in consumer_configs.values() + ] + ) + query = f""" + SELECT + partition, + (max(offset) - min(offset)) + 1 AS offset_diff, + count(*) AS occ, + (max(offset) - min(offset)) + 1 - count(offset) AS delta + FROM ( + {from_stmt} + ) + GROUP BY partition + ORDER BY partition; + """ + + con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) + cur = con.cursor() + cur.executescript(attach_db_stmt) + res = cur.execute(query) + assert all( + [res[3] == 0 for res in res.fetchall()] + ) # Assert that each value in the delta (fourth) column is 0 + print("Taskbroker integration test completed successfully.") + + @run.command() @click.option( "--pidfile", diff --git a/src/sentry/taskdemo/__init__.py b/src/sentry/taskdemo/__init__.py new file mode 100644 index 00000000000000..d330b1e2d1d5f6 --- /dev/null +++ b/src/sentry/taskdemo/__init__.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import logging + +from sentry.taskworker.registry import taskregistry + +logger = logging.getLogger(__name__) +demotasks = taskregistry.create_namespace(name="demos") + + +@demotasks.register(name="demos.say_hello") +def say_hello(name): + # logger.info("hello %s", name) need to fix logging now that we are running this in another process + print(f"{name}") # noqa