From 007a470418da2cb551cee3ca201be4977014d514 Mon Sep 17 00:00:00 2001 From: Lyn Date: Sun, 10 Dec 2023 11:33:52 -0800 Subject: [PATCH] fix(dlq): Ensure consumer crashes if DLQ limit is reached Previously it was being ignored --- arroyo/dlq.py | 4 +++- tests/test_dlq.py | 26 ++++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/arroyo/dlq.py b/arroyo/dlq.py index 4c0e66f3..9a0fc363 100644 --- a/arroyo/dlq.py +++ b/arroyo/dlq.py @@ -356,7 +356,7 @@ def reset_offsets(self, assignment: Mapping[Partition, int]) -> None: def produce(self, message: BrokerValue[TStrategyPayload]) -> None: """ Removes all completed futures, then appends the given future to the list. - Blocks if the list is full. + Blocks if the list is full. If the DLQ limit is exceeded, an exception is raised. """ for values in self.__futures.values(): while len(values) > 0: @@ -373,6 +373,8 @@ def produce(self, message: BrokerValue[TStrategyPayload]) -> None: if should_accept: future = self.__dlq_policy.producer.produce(message) self.__futures[message.partition].append((message, future)) + else: + raise RuntimeError("Dlq limit exceeded") def flush(self, committable: Mapping[Partition, int]) -> None: """ diff --git a/tests/test_dlq.py b/tests/test_dlq.py index 829293fe..a0ff7488 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -3,6 +3,8 @@ from typing import Generator from unittest.mock import ANY +import pytest + from arroyo.backends.kafka import KafkaPayload from arroyo.backends.local.backend import LocalBroker from arroyo.backends.local.storages.memory import MemoryMessageStorage @@ -113,6 +115,22 @@ def test_dlq_policy_wrapper() -> None: wrapper.flush({partition: 11}) +def test_dlq_policy_wrapper_limit_exceeded() -> None: + broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage()) + broker.create_topic(dlq_topic, 1) + dlq_policy = DlqPolicy( + KafkaDlqProducer(broker.get_producer(), dlq_topic), DlqLimit(0.0, 1), None + ) + partition = Partition(topic, 0) + wrapper = DlqPolicyWrapper(dlq_policy) + wrapper.reset_offsets({partition: 0}) + wrapper.MAX_PENDING_FUTURES = 1 + + message = BrokerValue(KafkaPayload(None, b"", []), partition, 1, datetime.now()) + with pytest.raises(RuntimeError): + wrapper.produce(message) + + def test_invalid_message_pickleable() -> None: exc = InvalidMessage(Partition(Topic("test_topic"), 0), 2) pickled_exc = pickle.dumps(exc) @@ -129,7 +147,11 @@ def test_dlq_limit_state() -> None: # 1 valid message followed by 4 invalid for i in range(4, 9): - assert state.record_invalid_message(BrokerValue(i, partition, i, datetime.now())) + assert state.record_invalid_message( + BrokerValue(i, partition, i, datetime.now()) + ) # Next message should not be accepted - assert not state.record_invalid_message(BrokerValue(9, partition, 9, datetime.now())) + assert not state.record_invalid_message( + BrokerValue(9, partition, 9, datetime.now()) + )