-
-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add rebalance integration testing via pytest and setup integration testing on CI #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5838e3d
d967e5a
4b9c1e5
80ab376
9986d38
d45e3e6
31ebf8e
313d6e4
aaaa867
ae7904f
79eb4a2
9463596
6690164
189b578
9d152c9
537eaa8
2f3868a
1c83e31
242c047
8dfbc71
a891666
537a7c3
c5dabda
96a4e9c
22ec37a
4090802
86eccaf
b99a662
1498b26
34c98cb
feccd50
fa44ef8
8b91cee
b3c53f2
5f25539
f3261b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| import orjson | ||
| import subprocess | ||
| import time | ||
|
|
||
| from confluent_kafka import Producer | ||
| from pathlib import Path | ||
| from uuid import uuid4 | ||
| from sentry_protos.sentry.v1.taskworker_pb2 import RetryState, TaskActivation | ||
| from google.protobuf.timestamp_pb2 import Timestamp | ||
|
|
||
| TASKBROKER_ROOT = Path(__file__).parent.parent.parent | ||
| TASKBROKER_BIN = TASKBROKER_ROOT / "target/debug/taskbroker" | ||
| TESTS_OUTPUT_PATH = Path(__file__).parent / ".tests_output" | ||
|
|
||
|
|
||
| def check_topic_exists(topic_name: str) -> bool: | ||
| try: | ||
| check_topic_cmd = [ | ||
| "docker", | ||
| "exec", | ||
| "sentry_kafka", | ||
| "kafka-topics", | ||
| "--bootstrap-server", | ||
| "localhost:9092", | ||
| "--list", | ||
| ] | ||
| result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) | ||
| topics = result.stdout.strip().split("\n") | ||
|
|
||
| return topic_name in topics | ||
| except Exception as e: | ||
| raise Exception(f"Failed to check if topic exists: {e}") | ||
|
|
||
|
|
||
| def create_topic(topic_name: str, num_partitions: int) -> None: | ||
| try: | ||
| create_topic_cmd = [ | ||
| "docker", | ||
| "exec", | ||
| "sentry_kafka", | ||
| "kafka-topics", | ||
| "--bootstrap-server", | ||
| "localhost:9092", | ||
| "--create", | ||
| "--topic", | ||
| topic_name, | ||
| "--partitions", | ||
| str(num_partitions), | ||
| "--replication-factor", | ||
| "1", | ||
| ] | ||
| subprocess.run(create_topic_cmd, check=True) | ||
| except Exception as e: | ||
| raise Exception(f"Failed to create topic: {e}") | ||
|
|
||
|
|
||
| def update_topic_partitions(topic_name: str, num_partitions: int) -> None: | ||
| try: | ||
| create_topic_cmd = [ | ||
| "docker", | ||
| "exec", | ||
| "sentry_kafka", | ||
| "kafka-topics", | ||
| "--bootstrap-server", | ||
| "localhost:9092", | ||
| "--alter", | ||
| "--topic", | ||
| topic_name, | ||
| "--partitions", | ||
| str(num_partitions), | ||
| ] | ||
| subprocess.run(create_topic_cmd, check=True) | ||
| except Exception: | ||
| # Command fails topic already has the correct number of partitions. Try to continue. | ||
| pass | ||
|
|
||
|
|
||
| def serialize_task_activation(args: list, kwargs: dict) -> bytes: | ||
| retry_state = RetryState( | ||
| attempts=0, | ||
| kind="sentry.taskworker.retry.Retry", | ||
| discard_after_attempt=None, | ||
| deadletter_after_attempt=None, | ||
| ) | ||
| pending_task_payload = TaskActivation( | ||
| id=uuid4().hex, | ||
| namespace="integration_tests", | ||
| taskname="integration_tests.say_hello", | ||
| parameters=orjson.dumps({"args": args, "kwargs": kwargs}), | ||
| retry_state=retry_state, | ||
| received_at=Timestamp(seconds=int(time.time())), | ||
| ).SerializeToString() | ||
|
|
||
| return pending_task_payload | ||
|
|
||
|
|
||
| def send_messages_to_kafka(topic_name: str, num_messages: int) -> None: | ||
| try: | ||
| producer = Producer({ | ||
| 'bootstrap.servers': 'localhost:9092', | ||
| 'broker.address.family': 'v4' | ||
| }) | ||
|
|
||
| for _ in range(num_messages): | ||
| task_message = serialize_task_activation(["foobar"], {}) | ||
| producer.produce(topic_name, task_message) | ||
|
|
||
| producer.poll(5) # trigger delivery reports | ||
| producer.flush() | ||
| print(f"Sent {num_messages} messages to kafka topic {topic_name}") | ||
| except Exception as e: | ||
| raise Exception(f"Failed to send messages to kafka: {e}") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,211 @@ | ||
| import random | ||
| import shutil | ||
| import signal | ||
| import sqlite3 | ||
| import subprocess | ||
| import threading | ||
| import time | ||
|
|
||
| from threading import Thread | ||
|
|
||
| import yaml | ||
|
|
||
| from python.integration_tests.helpers import ( | ||
| TASKBROKER_BIN, | ||
| TESTS_OUTPUT_PATH, | ||
| check_topic_exists, | ||
| create_topic, | ||
| update_topic_partitions, | ||
| send_messages_to_kafka, | ||
| ) | ||
|
|
||
|
|
||
| def manage_consumer( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand the purpose of this function. At the moment it starts, and then after a random delay dies. Is that the intended behaviour? There's no guarantee it will process the messages assigned to it. I think Mark had a good idea in his PR, where he passes in the number of expected messages, and can track to see whether the consumer has processed them or not.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the function starts, it executes the rust binary in a new child process. Then, thread itself sleeps for a random delay. This gives the consumer a chance to start up, receive assigned partitions, and process messages. Finally, a SIGINT is sent to the process and the consumer shutsdown gracefully. It is true that we aren't explicitly checking all messages are processed here. It is possible that the consumer is started and dies before it can process any messages (though unlikely given the min_sleep and max_sleep parameters). The purpose of the test is to check for duplicate messages during rebalancing, not necessarily that all messages have been processed. |
||
| consumer_index: int, | ||
| consumer_path: str, | ||
| config_file_path: str, | ||
| iterations: int, | ||
| min_sleep: int, | ||
| max_sleep: int, | ||
| log_file_path: str, | ||
| ) -> None: | ||
| with open(log_file_path, "a") as log_file: | ||
| print( | ||
| f"Starting consumer {consumer_index}, writing log file to {log_file_path}" | ||
| ) | ||
| for i in range(iterations): | ||
| process = subprocess.Popen( | ||
| [consumer_path, "-c", config_file_path], | ||
| stderr=subprocess.STDOUT, | ||
| stdout=log_file, | ||
| ) | ||
| time.sleep(random.randint(min_sleep, max_sleep)) | ||
markstory marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| print( | ||
| f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining for that consumer" | ||
| ) | ||
| process.send_signal(signal.SIGINT) | ||
| try: | ||
| return_code = process.wait(timeout=10) | ||
| assert return_code == 0 | ||
| except Exception: | ||
| process.kill() | ||
|
|
||
|
|
||
| def test_tasks_written_once_during_rebalancing() -> None: | ||
| # Test configuration | ||
| consumer_path = str(TASKBROKER_BIN) | ||
| num_consumers = 8 | ||
| num_messages = 100_000 | ||
| num_restarts = 16 | ||
| num_partitions = 32 | ||
| min_restart_duration = 1 | ||
| max_restart_duration = 10 | ||
| topic_name = "task-worker" | ||
| curr_time = int(time.time()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we fixate the time/seed when trying to reproduce a failure? We could read from an environment variable like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| print( | ||
| f""" | ||
| Running test with the following configuration: | ||
| num of consumers: {num_consumers}, | ||
| num of messages: {num_messages}, | ||
| num of restarts: {num_restarts}, | ||
| num of partitions: {num_partitions}, | ||
| min restart duration: {min_restart_duration} seconds, | ||
| max restart duration: {max_restart_duration} seconds, | ||
| topic name: {topic_name} | ||
| random seed value: 42 | ||
| """ | ||
| ) | ||
| random.seed(42) | ||
|
|
||
| # Ensure topic has correct number of partitions | ||
| if not check_topic_exists(topic_name): | ||
| print( | ||
| f"{topic_name} topic does not exist, creating it with {num_partitions} partitions" | ||
| ) | ||
| create_topic(topic_name, num_partitions) | ||
| else: | ||
| print( | ||
| f"{topic_name} topic already exists, making sure it has {num_partitions} partitions" | ||
| ) | ||
| update_topic_partitions(topic_name, num_partitions) | ||
|
|
||
| # Create config files for consumers | ||
| print("Creating config files for consumers") | ||
| TESTS_OUTPUT_PATH.mkdir(exist_ok=True) | ||
| consumer_configs = {} | ||
| for i in range(num_consumers): | ||
| db_name = f"db_{i}_{curr_time}" | ||
| consumer_configs[f"config_{i}.yml"] = { | ||
evanh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "db_name": db_name, | ||
| "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), | ||
| "max_pending_count": 16384, | ||
| "kafka_topic": topic_name, | ||
| "kafka_consumer_group": topic_name, | ||
| "kafka_auto_offset_reset": "earliest", | ||
| "grpc_port": 50051 + i, | ||
| } | ||
|
|
||
| for filename, config in consumer_configs.items(): | ||
| with open(str(TESTS_OUTPUT_PATH / filename), "w") as f: | ||
| yaml.safe_dump(config, f) | ||
|
|
||
| try: | ||
| send_messages_to_kafka(topic_name, num_messages) | ||
| threads: list[Thread] = [] | ||
| for i in range(num_consumers): | ||
| thread = threading.Thread( | ||
| target=manage_consumer, | ||
| args=( | ||
| i, | ||
| consumer_path, | ||
| str(TESTS_OUTPUT_PATH / f"config_{i}.yml"), | ||
| num_restarts, | ||
| min_restart_duration, | ||
| max_restart_duration, | ||
| str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), | ||
| ), | ||
| ) | ||
| thread.start() | ||
| threads.append(thread) | ||
|
|
||
| for t in threads: | ||
| t.join() | ||
|
|
||
| except Exception as e: | ||
| raise Exception(f"Error running taskbroker: {e}") | ||
|
|
||
| # Validate that all tasks were written once during rebalancing | ||
| attach_db_stmt = "".join( | ||
| [ | ||
| f"ATTACH DATABASE '{config['db_path']}' AS {config['db_name']};\n" | ||
| for config in consumer_configs.values() | ||
| ] | ||
| ) | ||
| from_stmt = "\n UNION ALL\n".join( | ||
| [ | ||
| f" SELECT * FROM {config['db_name']}.inflight_taskactivations" | ||
| for config in consumer_configs.values() | ||
| ] | ||
| ) | ||
| query = f""" SELECT | ||
| partition, | ||
| (max(offset) - min(offset)) + 1 AS expected, | ||
| count(*) AS actual, | ||
| (max(offset) - min(offset)) + 1 - count(*) AS diff | ||
| FROM ( | ||
| {from_stmt} | ||
| ) | ||
| GROUP BY partition | ||
| ORDER BY partition;""" | ||
|
|
||
| con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) | ||
| cur = con.cursor() | ||
| cur.executescript(attach_db_stmt) | ||
| row_count = cur.execute(query).fetchall() | ||
| print("\n======== Verify number of rows based on max and min offset ========") | ||
| print("Query:") | ||
| print(query) | ||
| print("Result:") | ||
| print( | ||
| f"{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" | ||
| ) | ||
| for partition, expected_row_count, actual_row_count, diff in row_count: | ||
| print( | ||
| f"{str(partition).rjust(16)}{str(expected_row_count).rjust(16)}{str(actual_row_count).rjust(16)}{str(diff).rjust(16)}" | ||
| ) | ||
|
|
||
| query = f""" SELECT partition, offset, count(*) as count | ||
| FROM ( | ||
| {from_stmt} | ||
| ) | ||
| GROUP BY partition, offset | ||
| HAVING count > 1""" | ||
| res = cur.execute(query).fetchall() | ||
| print("\n======== Verify all (partition, offset) are unique ========") | ||
| print("Query:") | ||
| print(query) | ||
| print("Result:") | ||
| print(f"{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") | ||
| for partition, offset, count in res: | ||
| print( | ||
| f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" | ||
| ) | ||
|
|
||
| if not all([row[3] == 0 for row in row_count]): | ||
| print( | ||
| "Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs" | ||
| ) | ||
| for i in range(num_consumers): | ||
| print(f"=== consumer {i} log ===") | ||
| with open( | ||
| str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r" | ||
| ) as f: | ||
| print(f.read()) | ||
|
|
||
| # Clean up test output files | ||
| print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") | ||
| shutil.rmtree(TESTS_OUTPUT_PATH) | ||
|
|
||
| if not all([row[3] == 0 for row in row_count]): | ||
| assert False | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| [build-system] | ||
| requires = ["setuptools>=42", "wheel"] | ||
| build-backend = "setuptools.build_meta" | ||
|
|
||
| [tool.pytest.ini_options] | ||
| testpaths = ["integration_tests"] | ||
| python_files = ["test_*.py"] | ||
| python_functions = ["test_*"] |
Uh oh!
There was an error while loading. Please reload this page.