diff --git a/clients/python/src/examples/tasks.py b/clients/python/src/examples/tasks.py index 190fa0d7..8fd3ab56 100644 --- a/clients/python/src/examples/tasks.py +++ b/clients/python/src/examples/tasks.py @@ -135,7 +135,7 @@ def task_that_produces( def producer_factory() -> KafkaProducer: return KafkaProducer({"bootstrap.servers": bootstrap_servers}) - producer = TaskProducer(producer_factory) + producer = TaskProducer("test.producer", producer_factory) production_count = random.randint(1, 50) if random_count else production_count for i in range(production_count): logger.debug(f"Producing message {i} onto topic {destination_topic}...") diff --git a/clients/python/src/taskbroker_client/worker/producer.py b/clients/python/src/taskbroker_client/worker/producer.py index d1e0a3b3..4b1c0f8d 100644 --- a/clients/python/src/taskbroker_client/worker/producer.py +++ b/clients/python/src/taskbroker_client/worker/producer.py @@ -8,6 +8,7 @@ from arroyo.types import BrokerValue, Topic from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES +from taskbroker_client.metrics import MetricsBackend, NoOpMetricsBackend from taskbroker_client.types import ProducerProtocol # This is global as TaskWorker needs to be able to call TaskProducer.collect_futures() @@ -29,12 +30,22 @@ class TaskProducer: Otherwise, the activation will be retried. Args: + name: Unique identifying name of this TaskProducer. Used in metric tags. producer_factory: Callable that returns a producer object. + metrics_backend: Application metrics backend this producer should use. + Defaults to NoOpMetricsBackend. """ - def __init__(self, producer_factory: Callable[[], ProducerProtocol]) -> None: + def __init__( + self, + name: str, + producer_factory: Callable[[], ProducerProtocol], + metrics_backend: MetricsBackend | None = None, + ) -> None: + self.name = name self._producer_factory = producer_factory self._inner_producer: ProducerProtocol | None = None + self.metrics = metrics_backend if metrics_backend is not None else NoOpMetricsBackend() def _get(self) -> ProducerProtocol: if self._inner_producer is None: @@ -43,6 +54,11 @@ def _get(self) -> ProducerProtocol: def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None: _pending_futures.append(future) + self.metrics.gauge( + "task.producer.pending.futures", + len(_pending_futures), + tags={"producer_name": self.name}, + ) @staticmethod def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: diff --git a/clients/python/tests/worker/test_producer.py b/clients/python/tests/worker/test_producer.py index c3cb7ec6..43ab3be0 100644 --- a/clients/python/tests/worker/test_producer.py +++ b/clients/python/tests/worker/test_producer.py @@ -49,7 +49,7 @@ def clear_pending_futures() -> Iterator[None]: def test_producer_tracks_futures() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) producer.produce(Topic("test"), make_kafka_payload()) assert len(_pending_futures) == 1 future = next(iter(TaskProducer.collect_futures())) @@ -58,7 +58,7 @@ def test_producer_tracks_futures() -> None: def test_producer_executes_callbacks() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=False)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=False)) received: list[Future[BrokerValue[KafkaPayload]]] = [] def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: @@ -73,7 +73,7 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: def test_producer_rejects_callbacks_for_simple_futures() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: pass @@ -83,7 +83,7 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: def test_pending_futures_max_len() -> None: - producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True)) + producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) for _ in range(10001): producer.produce(Topic("test"), make_kafka_payload()) assert len(_pending_futures) == 10000