Skip to content

Commit

Permalink
ref(metrics): Add pause/resume counters [INC-626] (#338)
Browse files Browse the repository at this point in the history
* ref(metrics): Add pause/resume counters [INC-626]

As part of the linked incident we think that the ingest consumer
paused/resumed in very quick succession. We currently don't have any
metrics for that.

* add to test
  • Loading branch information
untitaker committed Feb 9, 2024
1 parent a28450e commit 8d6e594
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 0 deletions.
4 changes: 4 additions & 0 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class InvalidStateError(RuntimeError):
ConsumerCounter = Literal[
"arroyo.consumer.run.count",
"arroyo.consumer.invalid_message.count",
"arroyo.consumer.pause",
"arroyo.consumer.resume",
]


Expand Down Expand Up @@ -421,6 +423,7 @@ def _run_once(self) -> None:
elif not self.__is_paused and (
time.time() - self.__backpressure_timestamp > 1
):
self.__metrics_buffer.incr_counter("arroyo.consumer.pause", 1)
logger.debug(
"Caught %r while submitting %r, pausing consumer...",
e,
Expand All @@ -438,6 +441,7 @@ def _run_once(self) -> None:
else:
# Resume if we are currently in a paused state
if self.__is_paused:
self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1)
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False

Expand Down
8 changes: 8 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@
# Time (unitless) spent in shutting down the consumer. This metric's
# Consumer latency in seconds. Recorded by the commit offsets strategy.
"arroyo.consumer.latency",
# Counter metric for when the underlying rdkafka consumer is being paused.
#
# This flushes internal prefetch buffers.
"arroyo.consumer.pause",
# Counter metric for when the underlying rdkafka consumer is being resumed.
#
# This might cause increased network usage as messages are being re-fetched.
"arroyo.consumer.resume",
# Queue size of background queue that librdkafka uses to prefetch messages.
"arroyo.consumer.librdkafka.total_queue_size",
# Counter metric to measure how often the healthcheck file has been touched.
Expand Down
2 changes: 2 additions & 0 deletions tests/processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ def test_stream_processor_lifecycle() -> None:
(Timing, "arroyo.consumer.shutdown.time"),
(Timing, "arroyo.consumer.callback.time"),
(Timing, "arroyo.consumer.poll.time"),
(Increment, "arroyo.consumer.pause"),
(Increment, "arroyo.consumer.run.count"),
(Increment, "arroyo.consumer.resume"),
]


Expand Down

0 comments on commit 8d6e594

Please sign in to comment.