From 40c18bbde7d21367da1ffd58d5b101c805037e66 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:38:26 -0400 Subject: [PATCH 1/2] fix(TaskProducer): bounded queue of pending futures --- clients/python/src/taskbroker_client/constants.py | 7 +++++++ .../src/taskbroker_client/worker/producer.py | 15 ++++++++++++--- clients/python/tests/worker/test_producer.py | 7 +++++++ clients/python/tests/worker/test_worker.py | 2 +- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/clients/python/src/taskbroker_client/constants.py b/clients/python/src/taskbroker_client/constants.py index ce79b934..66f113d5 100644 --- a/clients/python/src/taskbroker_client/constants.py +++ b/clients/python/src/taskbroker_client/constants.py @@ -70,6 +70,13 @@ to drain pending produce futures on shutdown before sending SIGKILL. """ +TASK_PRODUCER_MAX_PENDING_FUTURES = 10_000 +""" +Maximum number of pending futures that can be in the TaskProducer module's +`_pending_futures` list. This list is a global, so is shared between all instances +of TaskProducer. +""" + class CompressionType(Enum): """ diff --git a/clients/python/src/taskbroker_client/worker/producer.py b/clients/python/src/taskbroker_client/worker/producer.py index 2e741d9b..d1e0a3b3 100644 --- a/clients/python/src/taskbroker_client/worker/producer.py +++ b/clients/python/src/taskbroker_client/worker/producer.py @@ -1,3 +1,4 @@ +from collections import deque from collections.abc import Callable from concurrent.futures import Future from typing import Any, Sequence @@ -6,11 +7,16 @@ from arroyo.backends.kafka import KafkaPayload from arroyo.types import BrokerValue, Topic +from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES from taskbroker_client.types import ProducerProtocol # This is global as TaskWorker needs to be able to call TaskProducer.collect_futures() # without a reference to a task's specific instance of TaskProducer. -_pending_futures: set[ProducerFuture[BrokerValue[KafkaPayload]]] = set() +# Has a max_len to prevent unbounded future growth if TaskProducer.collect_futures() +# is never called. +_pending_futures: deque[ProducerFuture[BrokerValue[KafkaPayload]]] = deque( + maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES +) class TaskProducer: @@ -21,6 +27,9 @@ class TaskProducer: producer futures tracked by TaskProducer, and will only register the task activation as a success if all producer futures from that activation were successful. Otherwise, the activation will be retried. + + Args: + producer_factory: Callable that returns a producer object. """ def __init__(self, producer_factory: Callable[[], ProducerProtocol]) -> None: @@ -33,13 +42,13 @@ def _get(self) -> ProducerProtocol: return self._inner_producer def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None: - _pending_futures.add(future) + _pending_futures.append(future) @staticmethod def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: futures = _pending_futures.copy() _pending_futures.clear() - return futures + return set(futures) def produce( self, diff --git a/clients/python/tests/worker/test_producer.py b/clients/python/tests/worker/test_producer.py index 10a16a1a..c3cb7ec6 100644 --- a/clients/python/tests/worker/test_producer.py +++ b/clients/python/tests/worker/test_producer.py @@ -80,3 +80,10 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: with pytest.raises(RuntimeError, match="SimpleProducerFuture"): producer.produce(Topic("test"), make_kafka_payload(), callbacks=[callback]) + + +def test_pending_futures_max_len() -> None: + producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + for _ in range(10001): + producer.produce(Topic("test"), make_kafka_payload()) + assert len(_pending_futures) == 10000 diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 44e4e20a..bf583bce 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1452,7 +1452,7 @@ def test_child_process_clears_pending_futures_when_task_fails( ) -> None: leftover_future: Future[BrokerValue[KafkaPayload]] = Future() leftover_future.set_result(_make_broker_value()) - _pending_futures.add(leftover_future) + _pending_futures.append(leftover_future) assert len(_pending_futures) == 1 todo: queue.Queue[InflightTaskActivation] = queue.Queue() From 383993e539fbc368ee16562cc4b4bc9832ae20f0 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Fri, 5 Jun 2026 13:49:39 -0400 Subject: [PATCH 2/2] feat(TaskProducer): add metrics on queue size --- clients/python/src/examples/tasks.py | 2 +- .../src/taskbroker_client/worker/producer.py | 18 +++++++++++++++++- clients/python/tests/worker/test_producer.py | 8 ++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/clients/python/src/examples/tasks.py b/clients/python/src/examples/tasks.py index 190fa0d7..8fd3ab56 100644 --- a/clients/python/src/examples/tasks.py +++ b/clients/python/src/examples/tasks.py @@ -135,7 +135,7 @@ def task_that_produces( def producer_factory() -> KafkaProducer: return KafkaProducer({"bootstrap.servers": bootstrap_servers}) - producer = TaskProducer(producer_factory) + producer = TaskProducer("test.producer", producer_factory) production_count = random.randint(1, 50) if random_count else production_count for i in range(production_count): logger.debug(f"Producing message {i} onto topic {destination_topic}...") diff --git a/clients/python/src/taskbroker_client/worker/producer.py b/clients/python/src/taskbroker_client/worker/producer.py index d1e0a3b3..4b1c0f8d 100644 --- a/clients/python/src/taskbroker_client/worker/producer.py +++ b/clients/python/src/taskbroker_client/worker/producer.py @@ -8,6 +8,7 @@ from arroyo.types import BrokerValue, Topic from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES +from taskbroker_client.metrics import MetricsBackend, NoOpMetricsBackend from taskbroker_client.types import ProducerProtocol # This is global as TaskWorker needs to be able to call TaskProducer.collect_futures() @@ -29,12 +30,22 @@ class TaskProducer: Otherwise, the activation will be retried. Args: + name: Unique identifying name of this TaskProducer. Used in metric tags. producer_factory: Callable that returns a producer object. + metrics_backend: Application metrics backend this producer should use. + Defaults to NoOpMetricsBackend. """ - def __init__(self, producer_factory: Callable[[], ProducerProtocol]) -> None: + def __init__( + self, + name: str, + producer_factory: Callable[[], ProducerProtocol], + metrics_backend: MetricsBackend | None = None, + ) -> None: + self.name = name self._producer_factory = producer_factory self._inner_producer: ProducerProtocol | None = None + self.metrics = metrics_backend if metrics_backend is not None else NoOpMetricsBackend() def _get(self) -> ProducerProtocol: if self._inner_producer is None: @@ -43,6 +54,11 @@ def _get(self) -> ProducerProtocol: def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None: _pending_futures.append(future) + self.metrics.gauge( + "task.producer.pending.futures", + len(_pending_futures), + tags={"producer_name": self.name}, + ) @staticmethod def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: diff --git a/clients/python/tests/worker/test_producer.py b/clients/python/tests/worker/test_producer.py index c3cb7ec6..43ab3be0 100644 --- a/clients/python/tests/worker/test_producer.py +++ b/clients/python/tests/worker/test_producer.py @@ -49,7 +49,7 @@ def clear_pending_futures() -> Iterator[None]: def test_producer_tracks_futures() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) producer.produce(Topic("test"), make_kafka_payload()) assert len(_pending_futures) == 1 future = next(iter(TaskProducer.collect_futures())) @@ -58,7 +58,7 @@ def test_producer_tracks_futures() -> None: def test_producer_executes_callbacks() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=False)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=False)) received: list[Future[BrokerValue[KafkaPayload]]] = [] def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: @@ -73,7 +73,7 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: def test_producer_rejects_callbacks_for_simple_futures() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: pass @@ -83,7 +83,7 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: def test_pending_futures_max_len() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) for _ in range(10001): producer.produce(Topic("test"), make_kafka_payload()) assert len(_pending_futures) == 10000