diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 089b91bb..215e4e5c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: run: cargo clippy --workspace --all-features --tests -- -D clippy::all test: - name: Test (ubuntu) + name: Tests (ubuntu) runs-on: ubuntu-latest steps: @@ -78,3 +78,14 @@ jobs: with: command: test args: --all + + - name: Install Python Dependencies + run: | + python -m venv python/.venv + . python/.venv/bin/activate + pip install -r python/requirements-dev.txt + + - name: Run Python Integration Tests + run: | + export PYTEST_ADDOPTS="" + python -m pytest python/integration_tests -s -vv diff --git a/.gitignore b/.gitignore index 84c69769..0e745d47 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,10 @@ *.sqlite-shm *.sqlite-wal +# Python +**/__pycache__/ +**/.pytest_cache/ +**/integration_tests/.tests_output/ +**/.venv + .VERSION diff --git a/Makefile b/Makefile index f9cb7176..38826ecf 100644 --- a/Makefile +++ b/Makefile @@ -38,10 +38,23 @@ format: ## Run autofix mode for formatting and lint # Tests -test: +unit-test: cargo test .PHONY: test +install-py-dev: + python -m venv python/.venv + . python/.venv/bin/activate + pip install -r python/requirements-dev.txt +.PHONY: install-py-dev + +integration-test: + cargo build + python -m venv python/.venv + . python/.venv/bin/activate + python -m pytest python/integration_tests -s -vv +.PHONY: integration-test + # Help help: ## this help diff --git a/python/__init__.py b/python/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py new file mode 100644 index 00000000..bcc7f491 --- /dev/null +++ b/python/integration_tests/helpers.py @@ -0,0 +1,112 @@ +import orjson +import subprocess +import time + +from confluent_kafka import Producer +from pathlib import Path +from uuid import uuid4 +from sentry_protos.sentry.v1.taskworker_pb2 import RetryState, TaskActivation +from google.protobuf.timestamp_pb2 import Timestamp + +TASKBROKER_ROOT = Path(__file__).parent.parent.parent +TASKBROKER_BIN = TASKBROKER_ROOT / "target/debug/taskbroker" +TESTS_OUTPUT_PATH = Path(__file__).parent / ".tests_output" + + +def check_topic_exists(topic_name: str) -> bool: + try: + check_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--list", + ] + result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) + topics = result.stdout.strip().split("\n") + + return topic_name in topics + except Exception as e: + raise Exception(f"Failed to check if topic exists: {e}") + + +def create_topic(topic_name: str, num_partitions: int) -> None: + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + topic_name, + "--partitions", + str(num_partitions), + "--replication-factor", + "1", + ] + subprocess.run(create_topic_cmd, check=True) + except Exception as e: + raise Exception(f"Failed to create topic: {e}") + + +def update_topic_partitions(topic_name: str, num_partitions: int) -> None: + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--alter", + "--topic", + topic_name, + "--partitions", + str(num_partitions), + ] + subprocess.run(create_topic_cmd, check=True) + except Exception: + # Command fails topic already has the correct number of partitions. Try to continue. + pass + + +def serialize_task_activation(args: list, kwargs: dict) -> bytes: + retry_state = RetryState( + attempts=0, + kind="sentry.taskworker.retry.Retry", + discard_after_attempt=None, + deadletter_after_attempt=None, + ) + pending_task_payload = TaskActivation( + id=uuid4().hex, + namespace="integration_tests", + taskname="integration_tests.say_hello", + parameters=orjson.dumps({"args": args, "kwargs": kwargs}), + retry_state=retry_state, + received_at=Timestamp(seconds=int(time.time())), + ).SerializeToString() + + return pending_task_payload + + +def send_messages_to_kafka(topic_name: str, num_messages: int) -> None: + try: + producer = Producer({ + 'bootstrap.servers': 'localhost:9092', + 'broker.address.family': 'v4' + }) + + for _ in range(num_messages): + task_message = serialize_task_activation(["foobar"], {}) + producer.produce(topic_name, task_message) + + producer.poll(5) # trigger delivery reports + producer.flush() + print(f"Sent {num_messages} messages to kafka topic {topic_name}") + except Exception as e: + raise Exception(f"Failed to send messages to kafka: {e}") diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py new file mode 100644 index 00000000..15121762 --- /dev/null +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -0,0 +1,211 @@ +import random +import shutil +import signal +import sqlite3 +import subprocess +import threading +import time + +from threading import Thread + +import yaml + +from python.integration_tests.helpers import ( + TASKBROKER_BIN, + TESTS_OUTPUT_PATH, + check_topic_exists, + create_topic, + update_topic_partitions, + send_messages_to_kafka, +) + + +def manage_consumer( + consumer_index: int, + consumer_path: str, + config_file_path: str, + iterations: int, + min_sleep: int, + max_sleep: int, + log_file_path: str, +) -> None: + with open(log_file_path, "a") as log_file: + print( + f"Starting consumer {consumer_index}, writing log file to {log_file_path}" + ) + for i in range(iterations): + process = subprocess.Popen( + [consumer_path, "-c", config_file_path], + stderr=subprocess.STDOUT, + stdout=log_file, + ) + time.sleep(random.randint(min_sleep, max_sleep)) + print( + f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining for that consumer" + ) + process.send_signal(signal.SIGINT) + try: + return_code = process.wait(timeout=10) + assert return_code == 0 + except Exception: + process.kill() + + +def test_tasks_written_once_during_rebalancing() -> None: + # Test configuration + consumer_path = str(TASKBROKER_BIN) + num_consumers = 8 + num_messages = 100_000 + num_restarts = 16 + num_partitions = 32 + min_restart_duration = 1 + max_restart_duration = 10 + topic_name = "task-worker" + curr_time = int(time.time()) + + print( + f""" +Running test with the following configuration: + num of consumers: {num_consumers}, + num of messages: {num_messages}, + num of restarts: {num_restarts}, + num of partitions: {num_partitions}, + min restart duration: {min_restart_duration} seconds, + max restart duration: {max_restart_duration} seconds, + topic name: {topic_name} + random seed value: 42 + """ + ) + random.seed(42) + + # Ensure topic has correct number of partitions + if not check_topic_exists(topic_name): + print( + f"{topic_name} topic does not exist, creating it with {num_partitions} partitions" + ) + create_topic(topic_name, num_partitions) + else: + print( + f"{topic_name} topic already exists, making sure it has {num_partitions} partitions" + ) + update_topic_partitions(topic_name, num_partitions) + + # Create config files for consumers + print("Creating config files for consumers") + TESTS_OUTPUT_PATH.mkdir(exist_ok=True) + consumer_configs = {} + for i in range(num_consumers): + db_name = f"db_{i}_{curr_time}" + consumer_configs[f"config_{i}.yml"] = { + "db_name": db_name, + "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), + "max_pending_count": 16384, + "kafka_topic": topic_name, + "kafka_consumer_group": topic_name, + "kafka_auto_offset_reset": "earliest", + "grpc_port": 50051 + i, + } + + for filename, config in consumer_configs.items(): + with open(str(TESTS_OUTPUT_PATH / filename), "w") as f: + yaml.safe_dump(config, f) + + try: + send_messages_to_kafka(topic_name, num_messages) + threads: list[Thread] = [] + for i in range(num_consumers): + thread = threading.Thread( + target=manage_consumer, + args=( + i, + consumer_path, + str(TESTS_OUTPUT_PATH / f"config_{i}.yml"), + num_restarts, + min_restart_duration, + max_restart_duration, + str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), + ), + ) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + except Exception as e: + raise Exception(f"Error running taskbroker: {e}") + + # Validate that all tasks were written once during rebalancing + attach_db_stmt = "".join( + [ + f"ATTACH DATABASE '{config['db_path']}' AS {config['db_name']};\n" + for config in consumer_configs.values() + ] + ) + from_stmt = "\n UNION ALL\n".join( + [ + f" SELECT * FROM {config['db_name']}.inflight_taskactivations" + for config in consumer_configs.values() + ] + ) + query = f""" SELECT + partition, + (max(offset) - min(offset)) + 1 AS expected, + count(*) AS actual, + (max(offset) - min(offset)) + 1 - count(*) AS diff + FROM ( +{from_stmt} + ) + GROUP BY partition + ORDER BY partition;""" + + con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) + cur = con.cursor() + cur.executescript(attach_db_stmt) + row_count = cur.execute(query).fetchall() + print("\n======== Verify number of rows based on max and min offset ========") + print("Query:") + print(query) + print("Result:") + print( + f"{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" + ) + for partition, expected_row_count, actual_row_count, diff in row_count: + print( + f"{str(partition).rjust(16)}{str(expected_row_count).rjust(16)}{str(actual_row_count).rjust(16)}{str(diff).rjust(16)}" + ) + + query = f""" SELECT partition, offset, count(*) as count + FROM ( +{from_stmt} + ) + GROUP BY partition, offset + HAVING count > 1""" + res = cur.execute(query).fetchall() + print("\n======== Verify all (partition, offset) are unique ========") + print("Query:") + print(query) + print("Result:") + print(f"{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") + for partition, offset, count in res: + print( + f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" + ) + + if not all([row[3] == 0 for row in row_count]): + print( + "Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs" + ) + for i in range(num_consumers): + print(f"=== consumer {i} log ===") + with open( + str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r" + ) as f: + print(f.read()) + + # Clean up test output files + print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") + shutil.rmtree(TESTS_OUTPUT_PATH) + + if not all([row[3] == 0 for row in row_count]): + assert False diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 00000000..4a65d254 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,8 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.pytest.ini_options] +testpaths = ["integration_tests"] +python_files = ["test_*.py"] +python_functions = ["test_*"] diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt new file mode 100644 index 00000000..1853c2d8 --- /dev/null +++ b/python/requirements-dev.txt @@ -0,0 +1,6 @@ +confluent_kafka>=2.3.0 +orjson>=3.10.10 +protobuf>=5.28.3 +pytest>=8.3.3 +pyyaml>=6.0.2 +sentry-protos>=0.1.37 diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index bf77b0bb..7ec232db 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -32,7 +32,7 @@ use tokio::{ mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }, - task::{self, JoinError, JoinSet}, + task::{self, JoinSet}, time::{self, sleep, MissedTickBehavior}, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -60,46 +60,32 @@ pub async fn start_consumer( .subscribe(topics) .expect("Can't subscribe to specified topics"); - select! { - res = handle_os_signals(event_sender.clone()) => { - info!("Received shutdown signal, shutting down consumer"); - match res { - Ok(res) => Ok(res), - Err(e) => Err(anyhow!("Error in OS signals handler: {}", e)), - } - } - res = handle_consumer_client(consumer.clone(), client_shutdown_receiver) => { - info!("Consumer client shutdown"); - match res { - Ok(res) => Ok(res), - Err(e) => Err(anyhow!("Error in consumer client: {}", e)), - } - } - res = handle_events(consumer, event_receiver, client_shutdown_sender, spawn_actors) => { - info!("Events handler shutdown"); - res - } - } + handle_os_signals(event_sender.clone()); + handle_consumer_client(consumer.clone(), client_shutdown_receiver); + handle_events( + consumer, + event_receiver, + client_shutdown_sender, + spawn_actors, + ) + .await } -pub async fn handle_os_signals( - event_sender: UnboundedSender<(Event, SyncSender<()>)>, -) -> Result<(), JoinError> { +pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>) { let guard = elegant_departure::get_shutdown_guard(); tokio::spawn(async move { let _ = guard.wait().await; info!("Cancellation token received, shutting down consumer"); let (rendezvous_sender, _) = sync_channel(0); let _ = event_sender.send((Event::Shutdown, rendezvous_sender)); - }) - .await + }); } #[instrument(skip(consumer, shutdown))] -pub async fn handle_consumer_client( +pub fn handle_consumer_client( consumer: Arc>, shutdown: oneshot::Receiver<()>, -) -> Result<(), JoinError> { +) { task::spawn_blocking(|| { Handle::current().block_on(async move { select! { @@ -114,8 +100,7 @@ pub async fn handle_consumer_client( } debug!("Shutdown complete"); }); - }) - .await + }); } #[derive(Debug)] @@ -791,7 +776,6 @@ pub async fn commit( _rendezvous_guard: oneshot::Sender<()>, ) -> Result<(), Error> { while let Some(msgs) = receiver.recv().await { - debug!("Storing offsets"); let mut highwater_mark = HighwaterMark::new(); msgs.0.iter().for_each(|msg| highwater_mark.track(msg)); consumer.store_offsets(&highwater_mark.into()).unwrap();