Skip to content

Commit

Permalink
Fix string formatting error when logging slow processing
Browse files Browse the repository at this point in the history
Fixes crash in faust-streaming#153
  • Loading branch information
forsberg committed May 21, 2021
1 parent bef8710 commit ffa35fb
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
11 changes: 6 additions & 5 deletions faust/transport/drivers/aiokafka.py
Expand Up @@ -852,11 +852,14 @@ def _log_slow_processing_commit(self, msg: str, *args: Any) -> None:
current_value=app.conf.broker_commit_livelock_soft_timeout,
)

def _make_slow_processing_error(self, msg: str, causes: Iterable[str]) -> str:
def _make_slow_processing_error(
self, msg: str, causes: Iterable[str], setting: str, current_value: float
) -> str:
return " ".join(
[
msg,
SLOW_PROCESSING_EXPLAINED,
SLOW_PROCESSING_EXPLAINED
% {"setting": setting, "current_value": current_value},
text.enumeration(causes, start=2, sep="\n\n"),
]
)
Expand All @@ -870,10 +873,8 @@ def _log_slow_processing(
current_value: float,
) -> None:
return self.log.error(
self._make_slow_processing_error(msg, causes),
self._make_slow_processing_error(msg, causes, setting, current_value),
*args,
setting=setting,
current_value=current_value,
)

async def position(self, tp: TP) -> Optional[int]:
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/transport/drivers/test_aiokafka.py
Expand Up @@ -8,6 +8,7 @@
import pytest
from aiokafka.errors import CommitFailedError, IllegalStateError, KafkaError
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from mode.utils import text
from mode.utils.futures import done_future
from mode.utils.mocks import ANY, AsyncMock, MagicMock, Mock, call, patch
from opentracing.ext import tags
Expand All @@ -18,6 +19,10 @@
from faust.sensors.monitor import Monitor
from faust.transport.drivers import aiokafka as mod
from faust.transport.drivers.aiokafka import (
SLOW_PROCESSING_CAUSE_AGENT,
SLOW_PROCESSING_CAUSE_STREAM,
SLOW_PROCESSING_EXPLAINED,
SLOW_PROCESSING_STREAM_IDLE_SINCE_START,
TOPIC_LENGTH_MAX,
AIOKafkaConsumerThread,
Consumer,
Expand Down Expand Up @@ -386,6 +391,29 @@ def test_state(self, *, cthread, now):
assert cthread.time_started == now


class Test_Log_Slow_Processing(Test_verify_event_path_base):
def test_log_slow_processing_stream(
self, cthread: AIOKafkaConsumerThread, tp: TP, logger
):
cthread._log_slow_processing_stream(
SLOW_PROCESSING_STREAM_IDLE_SINCE_START, tp, "3 seconds ago"
)
logger.error.assert_called_with(
SLOW_PROCESSING_STREAM_IDLE_SINCE_START
+ " "
+ SLOW_PROCESSING_EXPLAINED
% {"setting": "stream_processing_timeout", "current_value": 300.0}
+ " "
+ text.enumeration(
[SLOW_PROCESSING_CAUSE_STREAM, SLOW_PROCESSING_CAUSE_AGENT],
start=2,
sep="\n\n",
),
tp,
"3 seconds ago",
)


@pytest.mark.skip("Needs fixing")
class Test_VEP_no_fetch_since_start(Test_verify_event_path_base):
def test_just_started(self, *, cthread, now, tp, logger):
Expand Down

0 comments on commit ffa35fb

Please sign in to comment.