From 8d6e594169bf2bb822e82eb84b2277008268024c Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 9 Feb 2024 13:55:17 +0100 Subject: [PATCH] ref(metrics): Add pause/resume counters [INC-626] (#338) * ref(metrics): Add pause/resume counters [INC-626] As part of the linked incident we think that the ingest consumer paused/resumed in very quick succession. We currently don't have any metrics for that. * add to test --- arroyo/processing/processor.py | 4 ++++ arroyo/utils/metric_defs.py | 8 ++++++++ tests/processing/test_processor.py | 2 ++ 3 files changed, 14 insertions(+) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index b77101c2..0116a4f1 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -81,6 +81,8 @@ class InvalidStateError(RuntimeError): ConsumerCounter = Literal[ "arroyo.consumer.run.count", "arroyo.consumer.invalid_message.count", + "arroyo.consumer.pause", + "arroyo.consumer.resume", ] @@ -421,6 +423,7 @@ def _run_once(self) -> None: elif not self.__is_paused and ( time.time() - self.__backpressure_timestamp > 1 ): + self.__metrics_buffer.incr_counter("arroyo.consumer.pause", 1) logger.debug( "Caught %r while submitting %r, pausing consumer...", e, @@ -438,6 +441,7 @@ def _run_once(self) -> None: else: # Resume if we are currently in a paused state if self.__is_paused: + self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1) self.__consumer.resume([*self.__consumer.tell().keys()]) self.__is_paused = False diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index f28a5c22..52d95746 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -84,6 +84,14 @@ # Time (unitless) spent in shutting down the consumer. This metric's # Consumer latency in seconds. Recorded by the commit offsets strategy. "arroyo.consumer.latency", + # Counter metric for when the underlying rdkafka consumer is being paused. + # + # This flushes internal prefetch buffers. + "arroyo.consumer.pause", + # Counter metric for when the underlying rdkafka consumer is being resumed. + # + # This might cause increased network usage as messages are being re-fetched. + "arroyo.consumer.resume", # Queue size of background queue that librdkafka uses to prefetch messages. "arroyo.consumer.librdkafka.total_queue_size", # Counter metric to measure how often the healthcheck file has been touched. diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 31cf8719..50119f38 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -150,7 +150,9 @@ def test_stream_processor_lifecycle() -> None: (Timing, "arroyo.consumer.shutdown.time"), (Timing, "arroyo.consumer.callback.time"), (Timing, "arroyo.consumer.poll.time"), + (Increment, "arroyo.consumer.pause"), (Increment, "arroyo.consumer.run.count"), + (Increment, "arroyo.consumer.resume"), ]