Skip to content

Commit

Permalink
fix(dlq): RunTaskWithMultiprocessing supports forwarding downstream i…
Browse files Browse the repository at this point in the history
…nvalid message (#301)

Invalid messages raised from strategies downstream of RunTaskWithMultiprocessing are not correctly handled currently. Currently messages from that batch will be re-submitted to the next step multiple times. This change ensures they are correctly re-raised and avoids the duplicate messages downstream.
  • Loading branch information
lynnagara committed Nov 8, 2023
1 parent 112667a commit 54f4369
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
27 changes: 23 additions & 4 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ def parallel_run_task_worker_apply(
output_block: SharedMemory,
start_index: int = 0,
) -> ParallelRunTaskResult[TResult]:

valid_messages_transformed: MessageBatch[
Union[InvalidMessage, Message[Union[FilteredPayload, TResult]]]
] = MessageBatch(output_block)
Expand Down Expand Up @@ -614,6 +613,13 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None:

try:
self.__next_step.poll()
except InvalidMessage as e:
# For the next invocation of __check_for_results, start at this message
result.valid_messages_transformed.reset_iterator(idx)
self.__invalid_messages.append(e)
raise e

try:
self.__next_step.submit(message)

except MessageRejected:
Expand All @@ -624,6 +630,12 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None:
"arroyo.strategies.run_task_with_multiprocessing.batch.backpressure"
)
raise NextStepTimeoutError()
except InvalidMessage as e:
# For the next invocation of __check_for_results, skip over this message
# since we do not want to re-submit it.
result.valid_messages_transformed.reset_iterator(idx + 1)
self.__invalid_messages.append(e)
raise e

if result.next_index_to_process != len(input_batch):
self.__metrics.increment(
Expand Down Expand Up @@ -770,14 +782,21 @@ def terminate(self) -> None:
self.__next_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
start_join = time.time()
deadline = time.time() + timeout if timeout is not None else None
self.__forward_invalid_offsets()

logger.debug("Waiting for %s batches...", len(self.__processes))

self.__check_for_results(
timeout=timeout,
)
while True:
elapsed = time.time() - start_join
try:
self.__check_for_results(
timeout=timeout - elapsed if timeout is not None else None,
)
break
except InvalidMessage:
raise

logger.debug("Waiting for %s...", self.__pool)
self.__pool.terminate()
Expand Down
24 changes: 24 additions & 0 deletions tests/processing/strategies/test_run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,3 +655,27 @@ def test_multiprocessing_with_invalid_message() -> None:
strategy.close()
with pytest.raises(InvalidMessage):
strategy.join(timeout=3)


def test_reraise_invalid_message() -> None:
next_step = Mock()
partition = Partition(Topic("test"), 0)
offset = 5
next_step.poll.side_effect = InvalidMessage(partition, offset)

strategy = RunTaskWithMultiprocessing(
run_multiply_times_two,
next_step,
num_processes=2,
max_batch_size=1,
max_batch_time=60,
)

strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {})))

with pytest.raises(InvalidMessage):
strategy.poll()

next_step.poll.reset_mock(side_effect=True)
strategy.close()
strategy.join()

0 comments on commit 54f4369

Please sign in to comment.