-
Notifications
You must be signed in to change notification settings - Fork 515
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
main (development)
Please describe the bug 🐞
Problem Description:
When using the idempotent producer, if a batch's response is lost (e.g., due to request.timeout.ms expiry) but the batch itself was successfully written to the server, and subsequent batches with higher sequence numbers are acknowledged before the timed-out batch is retried, the retried batch will receive OutOfOrderSequenceException indefinitely, causing an infinite retry loop.
Root Cause:
The server validates incoming batch sequences in WriterAppendInfo.maybeValidateDataBatch(), which requires nextBatchSeq == lastBatchSeq + 1. The server's lastBatchSeq is advanced as each batch is committed.
Consider the following scenario:
- Client sends batch1 (seq=0), batch2 (seq=1), ..., batch5 (seq=4).
- All 5 batches are successfully written on the server. Server's
lastBatchSeq = 4. - The response for batch1 is lost (network issue,
request.timeout.msexpiry). - Responses for batch2~batch5 return successfully. Client's
lastAckedBatchSequence = 4. - Client enqueues batch6 (seq=5), which is sent and acknowledged successfully. Server's
lastBatchSeq = 5. - Client now retries batch1 (seq=0).
- Server receives batch1 (seq=0), but
lastBatchSeq = 5. Since0 != 5 + 1, server throwsOutOfOrderSequenceException. - Client receives
OutOfOrderSequenceExceptionfor batch1. InIdempotenceManager.canRetry(), sincebatch1.batchSequence() (0)is NOTlastAckedBatchSequence (4) + 1,canRetry()returnstrue, and the batch is retried again. - This creates an infinite retry loop — batch1 will keep being retried and keep receiving
OutOfOrderSequenceExceptionindefinitely.
The client's canRetry() logic (in IdempotenceManager) is intended to handle the case where the batch is not the "next expected" batch. However, it doesn't account for the scenario where the server has already advanced far beyond the retried batch's sequence number due to successful writes of subsequent batches.
Furthermore, the server-side deduplication window in WriterStateEntry only retains the last 5 batches (NUM_BATCHES_TO_RETAIN = 5). Even if batch1 was originally written, once more than 5 subsequent batches have been committed, it will slide out of the deduplication cache. Therefore, the server can no longer identify the retried batch1 as a duplicate and cannot return DUPLICATE_SEQUENCE — it simply returns OutOfOrderSequenceException each time.
Expected Behavior:
When a batch's response is lost (timeout) but has already been successfully written on the server, and subsequent batches with higher sequence numbers have been acknowledged, the client should be able to detect that the timed-out batch has already been successfully committed (i.e., its sequence number is within the lastAckedBatchSequence) and complete the batch successfully without entering an infinite retry loop.
Solution
When the client receives OutOfOrderSequenceException for a retried batch, it should check whether the batch's sequence number is less than or equal to lastAckedBatchSequence. If so, the batch has already been successfully committed (its ack was simply lost), and the client should complete the batch as a success rather than retrying.
Specifically, in IdempotenceManager.canRetry() (or the handleFailedBatch() flow), an additional check should be added:
- If the error is
OutOfOrderSequenceExceptionandbatch.batchSequence() <= lastAckedBatchSequence, the batch should be treated as already successfully committed and completed without error.
Are you willing to submit a PR?
- I'm willing to submit a PR!