Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/src/examples/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def task_that_produces(
def producer_factory() -> KafkaProducer:
return KafkaProducer({"bootstrap.servers": bootstrap_servers})

producer = TaskProducer(producer_factory)
producer = TaskProducer("test.producer", producer_factory)
production_count = random.randint(1, 50) if random_count else production_count
for i in range(production_count):
logger.debug(f"Producing message {i} onto topic {destination_topic}...")
Expand Down
18 changes: 17 additions & 1 deletion clients/python/src/taskbroker_client/worker/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from arroyo.types import BrokerValue, Topic

from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES
from taskbroker_client.metrics import MetricsBackend, NoOpMetricsBackend
from taskbroker_client.types import ProducerProtocol

# This is global as TaskWorker needs to be able to call TaskProducer.collect_futures()
Expand All @@ -29,12 +30,22 @@ class TaskProducer:
Otherwise, the activation will be retried.

Args:
name: Unique identifying name of this TaskProducer. Used in metric tags.
producer_factory: Callable that returns a producer object.
metrics_backend: Application metrics backend this producer should use.
Defaults to NoOpMetricsBackend.
"""

def __init__(self, producer_factory: Callable[[], ProducerProtocol]) -> None:
def __init__(
self,
name: str,
producer_factory: Callable[[], ProducerProtocol],
metrics_backend: MetricsBackend | None = None,
) -> None:
self.name = name
self._producer_factory = producer_factory
self._inner_producer: ProducerProtocol | None = None
self.metrics = metrics_backend if metrics_backend is not None else NoOpMetricsBackend()

def _get(self) -> ProducerProtocol:
if self._inner_producer is None:
Expand All @@ -43,6 +54,11 @@ def _get(self) -> ProducerProtocol:

def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None:
_pending_futures.append(future)
self.metrics.gauge(
"task.producer.pending.futures",
len(_pending_futures),
tags={"producer_name": self.name},
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gauge stale after queue cleared

Medium Severity

task.producer.pending.futures is updated only in track_future when a future is appended, but collect_futures clears _pending_futures without recording a gauge. After each activation (including when the queue is empty), dashboards can still show the last non-zero size and trigger false queue-pressure alerts.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 383993e. Configure here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Producer tag omits other producers

Medium Severity

The gauge uses producer_name but reports len(_pending_futures), a global deque shared by every TaskProducer. Only the instance that called track_future updates its tag, so another producer’s futures can leave that series stale and under-report true queue pressure for that name.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 383993e. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_pending_futures is global out of necessity, but there should generally only be 1 TaskProducer instance per worker replica.


@staticmethod
def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]:
Expand Down
8 changes: 4 additions & 4 deletions clients/python/tests/worker/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def clear_pending_futures() -> Iterator[None]:


def test_producer_tracks_futures() -> None:
producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True))
producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True))
producer.produce(Topic("test"), make_kafka_payload())
assert len(_pending_futures) == 1
future = next(iter(TaskProducer.collect_futures()))
Expand All @@ -58,7 +58,7 @@ def test_producer_tracks_futures() -> None:


def test_producer_executes_callbacks() -> None:
producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=False))
producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=False))
received: list[Future[BrokerValue[KafkaPayload]]] = []

def callback(future: Future[BrokerValue[KafkaPayload]]) -> None:
Expand All @@ -73,7 +73,7 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None:


def test_producer_rejects_callbacks_for_simple_futures() -> None:
producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True))
producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True))

def callback(future: Future[BrokerValue[KafkaPayload]]) -> None:
pass
Expand All @@ -83,7 +83,7 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None:


def test_pending_futures_max_len() -> None:
producer = TaskProducer(partial(get_dummy_producer, use_simple_futures=True))
producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True))
for _ in range(10001):
producer.produce(Topic("test"), make_kafka_payload())
assert len(_pending_futures) == 10000
Loading