diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index f4bd46471..be922a85f 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -852,11 +852,14 @@ def _log_slow_processing_commit(self, msg: str, *args: Any) -> None: current_value=app.conf.broker_commit_livelock_soft_timeout, ) - def _make_slow_processing_error(self, msg: str, causes: Iterable[str]) -> str: + def _make_slow_processing_error( + self, msg: str, causes: Iterable[str], setting: str, current_value: float + ) -> str: return " ".join( [ msg, - SLOW_PROCESSING_EXPLAINED, + SLOW_PROCESSING_EXPLAINED + % {"setting": setting, "current_value": current_value}, text.enumeration(causes, start=2, sep="\n\n"), ] ) @@ -870,10 +873,8 @@ def _log_slow_processing( current_value: float, ) -> None: return self.log.error( - self._make_slow_processing_error(msg, causes), + self._make_slow_processing_error(msg, causes, setting, current_value), *args, - setting=setting, - current_value=current_value, ) async def position(self, tp: TP) -> Optional[int]: diff --git a/tests/unit/transport/drivers/test_aiokafka.py b/tests/unit/transport/drivers/test_aiokafka.py index 2c70354eb..d34753854 100644 --- a/tests/unit/transport/drivers/test_aiokafka.py +++ b/tests/unit/transport/drivers/test_aiokafka.py @@ -8,6 +8,7 @@ import pytest from aiokafka.errors import CommitFailedError, IllegalStateError, KafkaError from aiokafka.structs import OffsetAndMetadata, TopicPartition +from mode.utils import text from mode.utils.futures import done_future from mode.utils.mocks import ANY, AsyncMock, MagicMock, Mock, call, patch from opentracing.ext import tags @@ -18,6 +19,10 @@ from faust.sensors.monitor import Monitor from faust.transport.drivers import aiokafka as mod from faust.transport.drivers.aiokafka import ( + SLOW_PROCESSING_CAUSE_AGENT, + SLOW_PROCESSING_CAUSE_STREAM, + SLOW_PROCESSING_EXPLAINED, + SLOW_PROCESSING_STREAM_IDLE_SINCE_START, TOPIC_LENGTH_MAX, AIOKafkaConsumerThread, Consumer, @@ -386,6 +391,29 @@ def test_state(self, *, cthread, now): assert cthread.time_started == now +class Test_Log_Slow_Processing(Test_verify_event_path_base): + def test_log_slow_processing_stream( + self, cthread: AIOKafkaConsumerThread, tp: TP, logger + ): + cthread._log_slow_processing_stream( + SLOW_PROCESSING_STREAM_IDLE_SINCE_START, tp, "3 seconds ago" + ) + logger.error.assert_called_with( + SLOW_PROCESSING_STREAM_IDLE_SINCE_START + + " " + + SLOW_PROCESSING_EXPLAINED + % {"setting": "stream_processing_timeout", "current_value": 300.0} + + " " + + text.enumeration( + [SLOW_PROCESSING_CAUSE_STREAM, SLOW_PROCESSING_CAUSE_AGENT], + start=2, + sep="\n\n", + ), + tp, + "3 seconds ago", + ) + + @pytest.mark.skip("Needs fixing") class Test_VEP_no_fetch_since_start(Test_verify_event_path_base): def test_just_started(self, *, cthread, now, tp, logger):