Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,38 @@ jobs:
- name: Run failed tasks integration test
run: |
make test-failed-tasks

multi-topic-integration-test:
name: Multi-topic integration test
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2

- name: Install cmake
uses: lukka/get-cmake@28983e0d3955dba2bb0a6810caae0c6cf268ec0c # latest

- uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # pin@v1
with:
toolchain: stable
profile: minimal
override: true

- uses: swatinem/rust-cache@81d053bdb0871dcd3f10763c8cc60d0adc41762b # pin@v1
with:
key: ${{ github.job }}

- uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3
with:
version: '0.8.2'
# we just cache the venv-dir directly in action-setup-venv
enable-cache: false

- uses: getsentry/action-setup-venv@5a80476d175edf56cb205b08bc58986fa99d1725 # v3.2.0
with:
cache-dependency-path: uv.lock
install-cmd: uv sync --frozen --only-dev --active

- name: Run multi-topic integration test
run: |
make test-multi-topic
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ test-upkeep-retry: build reset-kafka ## Run the upkeep retry integration test
rm -r integration_tests/.tests_output/test_upkeep_retry
.PHONY: test-upkeep-retry

test-multi-topic: build reset-kafka ## Run the multi-topic consumption integration test
python -m pytest integration_tests/integration_tests/test_multi_topic.py -s
rm -r integration_tests/.tests_output/test_multi_topic
.PHONY: test-multi-topic

test-upkeep-expiry: build reset-kafka ## Run the upkeep expiry integration test
python -m pytest integration_tests/integration_tests/test_upkeep_expiry.py -s
rm -r integration_tests/.tests_output/test_upkeep_expiry
Expand Down
133 changes: 133 additions & 0 deletions integration_tests/integration_tests/test_multi_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import signal
import subprocess
import time

import yaml

from integration_tests.helpers import (
TASKBROKER_BIN,
TESTS_OUTPUT_ROOT,
TaskbrokerConfig,
create_topic,
get_available_ports,
get_num_tasks_in_sqlite,
send_generic_messages_to_topic,
)

TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_multi_topic"


def test_multi_topic_consumption() -> None:
"""
Verify that a single taskbroker configured (new `kafka_topics` format) with
two consumable topics consumes from BOTH into its one sqlite store.

Each consumed topic gets its own rdkafka consumer (own group.id); both
pipelines write to the shared store. We produce N messages to each of two
topics and assert the broker ends up with 2*N tasks in sqlite.
"""
num_messages_per_topic = 1_000
num_partitions = 4
timeout = 60
curr_time = int(time.time())

topic_a = f"multitopic-a-{curr_time}"
topic_b = f"multitopic-b-{curr_time}"
retry_topic = f"multitopic-retry-{curr_time}"
dlq_topic = f"multitopic-dlq-{curr_time}"

# Pre-create the topics so the test exercises consumption, not topic
# creation. (create_missing_topics stays off.)
for topic in (topic_a, topic_b, retry_topic, dlq_topic):
create_topic(topic, num_partitions)

TEST_OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
db_name = f"db_multi_topic_{curr_time}"
db_path = str(TEST_OUTPUT_PATH / f"{db_name}.sqlite")
config_filename = f"config_multi_topic_{curr_time}.yml"
grpc_port = get_available_ports(1)[0]

# New multi-topic config: two consumable topics + produce-only retry/dlq,
# all on a single cluster.
config_dict = {
"db_name": db_name,
"db_path": db_path,
"max_pending_count": 100_000,
"grpc_port": grpc_port,
"kafka_auto_offset_reset": "earliest",
"kafka_deadletter_topic": dlq_topic,
"kafka_retry_topic": retry_topic,
"kafka_clusters": {
"default": {"address": "127.0.0.1:9092"},
},
"kafka_topics": {
topic_a: {"cluster": "default", "consumer_group": f"{topic_a}-grp"},
topic_b: {"cluster": "default", "consumer_group": f"{topic_b}-grp"},
retry_topic: {
"cluster": "default",
"consumer_group": f"{retry_topic}-grp",
"produce_only": True,
},
dlq_topic: {
"cluster": "default",
"consumer_group": f"{dlq_topic}-grp",
"produce_only": True,
},
},
}

config_path = str(TEST_OUTPUT_PATH / config_filename)
with open(config_path, "w") as f:
yaml.safe_dump(config_dict, f)

# A TaskbrokerConfig instance is only needed for the sqlite-counting helper,
# which reads db_name/db_path.
query_config = TaskbrokerConfig(
db_name=db_name,
db_path=db_path,
max_pending_count=100_000,
kafka_topic=topic_a,
kafka_deadletter_topic=dlq_topic,
kafka_consumer_group=f"{topic_a}-grp",
kafka_auto_offset_reset="earliest",
grpc_port=grpc_port,
)

log_path = str(TEST_OUTPUT_PATH / f"taskbroker_multi_topic_{curr_time}.log")
expected_total = num_messages_per_topic * 2

send_generic_messages_to_topic(topic_a, num_messages_per_topic)
send_generic_messages_to_topic(topic_b, num_messages_per_topic)

process = None
try:
with open(log_path, "a") as log_file:
process = subprocess.Popen(
[str(TASKBROKER_BIN), "-c", config_path],
stderr=subprocess.STDOUT,
stdout=log_file,
)
time.sleep(3) # give the broker time to start both consumers

written = 0
end = time.time() + timeout
while time.time() < end:
written = get_num_tasks_in_sqlite(query_config)
if written >= expected_total:
break
# the broker should still be alive while consuming
assert process.poll() is None, "taskbroker exited early"
time.sleep(1)

assert written == expected_total, (
f"expected {expected_total} tasks in sqlite "
f"({num_messages_per_topic} from each of two topics), got {written}"
)
finally:
if process is not None:
process.send_signal(signal.SIGINT)
try:
assert process.wait(timeout=10) == 0
except Exception:
process.kill()
raise
Loading
Loading