From 460b1faf4269c6b027271e493b5ce70a5e6eb2a6 Mon Sep 17 00:00:00 2001 From: Michael Knox Date: Fri, 15 Aug 2025 21:52:30 +0000 Subject: [PATCH 1/2] Fix infinite recursion in RecordAccumulator batch splitting Prevent infinite recursion when splitting oversized producer batches by using a smaller batch size limit for already-split batches. When a batch is marked as split, use the maximum of maxRecordSize or half the estimated batch size instead of the default batch size limit. Add test to verify splitting behavior and prevent regression. --- .../producer/internals/RecordAccumulator.java | 7 +- .../internals/RecordAccumulatorTest.java | 125 ++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 35cda5e51634b..7c45221461469 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -514,7 +514,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // the split doesn't happen too often. CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(), Math.max(1.0f, (float) bigBatch.compressionRatio())); - Deque dq = bigBatch.split(this.batchSize); + int maxBatchSize = this.batchSize; + + if (bigBatch.isSplitBatch()) { + maxBatchSize = Math.max(bigBatch.maxRecordSize, bigBatch.estimatedSizeInBytes() / 2); + } + Deque dq = bigBatch.split(maxBatchSize); int numSplitBatches = dq.size(); Deque partitionDequeue = getOrCreateDeque(bigBatch.topicPartition); while (!dq.isEmpty()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index ce01460e6edb0..61bfc4b2c6264 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -58,10 +58,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Optional; import java.util.OptionalInt; import java.util.Random; @@ -77,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -1665,4 +1669,125 @@ int randomPartition() { return mockRandom == null ? super.randomPartition() : mockRandom.getAndIncrement(); } } + + /** + * This test verifies that RecordAccumulator's batch splitting functionality + * correctly handles oversized batches + * by splitting them down to individual records when necessary. It ensures that: + * 1. The splitting process can reduce batches to single-record size + * 2. The process does not enter infinite recursion loops + * 3. No records are lost or duplicated during splitting + * 4. The correct batch state is maintained throughout the process + */ + @Test + public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedException { + // Initialize test environment with a large batch size + long now = time.milliseconds(); + int batchSize = 1024 * 1024; // 1MB batch size + RecordAccumulator accum = createTestRecordAccumulator(batchSize, 10 * batchSize, Compression.gzip().build(), + 10); + + // Create a large producer batch manually (bypassing the accumulator's normal + // append process) + ByteBuffer buffer = ByteBuffer.allocate(batchSize); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + ProducerBatch bigBatch = new ProducerBatch(tp1, builder, now, true); + + // Populate the batch with multiple records (100 records of 1KB each) + byte[] largeValue = new byte[1024]; // Each record is 1KB + for (int i = 0; i < 100; i++) { + ByteBuffer keyBytes = ByteBuffer.allocate(4); + keyBytes.putInt(i); // Use the loop counter as the key for verification later + FutureRecordMetadata result = bigBatch.tryAppend(time.milliseconds(), keyBytes.array(), largeValue, + Record.EMPTY_HEADERS, null, time.milliseconds()); + assertNotNull(result); + } + bigBatch.close(); + + time.sleep(101L); // Ensure the batch has time to become ready for processing + + // Add the batch to the accumulator for splitting + accum.reenqueue(bigBatch, time.milliseconds()); + + // Iteratively split batches until we find single-record batches + // This section tests the core batch splitting functionality + int splitOperations = 0; + int maxSplitOperations = 100; // Safety limit to prevent infinite recursion + boolean foundSingleRecordBatch = false; + + // Use a comparator that puts the batch with the most records first + Comparator reverseComparator = (batch1, batch2) -> Integer.compare(batch2.recordCount, + batch1.recordCount); + + while (splitOperations < maxSplitOperations || !foundSingleRecordBatch) { + // Get the current batches for this topic-partition + Deque tp1Deque = accum.getDeque(tp1); + if (tp1Deque.isEmpty()) { + break; + } + + // Find the batch with the most records + PriorityQueue tp1PriorityQue = new PriorityQueue<>(reverseComparator); + tp1PriorityQue.addAll(tp1Deque); + ProducerBatch batch = tp1PriorityQue.poll(); + if (batch == null) { + break; + } + + // If we've found a batch with only one record, we've reached our goal + if (batch.recordCount == 1) { + foundSingleRecordBatch = true; + break; + } + + // Remove the batch from the deque before splitting it + tp1Deque.remove(batch); + + // Split the batch and track the operation + int numSplitBatches = accum.splitAndReenqueue(batch); + splitOperations++; + + // If splitting produced no new batches (shouldn't happen with multi-record + // batches) + // mark the batch as complete + if (numSplitBatches == 0) { + assertEquals(1, batch.recordCount, "Unsplittable batch should have only 1 record"); + batch.complete(0L, 0L); + foundSingleRecordBatch = true; + } + } + + // Verification section: Check that the splitting process worked as expected + + // Verify that we found a single-record batch, proving that splitting can reach + // that level + assertTrue(foundSingleRecordBatch, "Should eventually produce batches with single records"); + + // Verify we didn't hit our safety limit, which would indicate potential + // infinite recursion + assertTrue(splitOperations < maxSplitOperations, + "Should not hit the safety limit, indicating no infinite recursion"); + + // Verify all remaining batches have at most one record + Deque finalDeque = accum.getDeque(tp1); + + Map keyFoundMap = new HashMap<>(); + // Check each batch and verify record integrity + for (ProducerBatch batch : finalDeque) { + assertTrue(batch.recordCount <= 1, "All remaining batches should have at most 1 record"); + + // Extract the record and its key + MemoryRecords batchRecords = batch.records(); + Iterator recordIterator = batchRecords.records().iterator(); + Record singleRecord = recordIterator.next(); + + // Track keys to ensure no duplicates (putIfAbsent returns null if the key + // wasn't present) + assertNull(keyFoundMap.putIfAbsent(singleRecord.key().getInt(), true), + "Each key should appear exactly once in the split batches"); + } + + // Verify all original records are accounted for (no data loss) + assertEquals(keyFoundMap.size(), 100, "All original 100 records should be present after splitting"); + } } From 95ce47261c62ba47e5b045c4bab1f836dbd27f8b Mon Sep 17 00:00:00 2001 From: Michael Knox Date: Wed, 20 Aug 2025 18:03:32 +0000 Subject: [PATCH 2/2] Fix RecordAccumulator test logic and improve variable naming - Fix while loop condition in RecordAccumulatorTest (OR to AND) - Rename maxBatchSize to targetSplitBatchSize for clarity --- .../kafka/clients/producer/internals/RecordAccumulator.java | 6 +++--- .../clients/producer/internals/RecordAccumulatorTest.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 7c45221461469..f0c2719db9612 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -514,12 +514,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // the split doesn't happen too often. CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(), Math.max(1.0f, (float) bigBatch.compressionRatio())); - int maxBatchSize = this.batchSize; + int targetSplitBatchSize = this.batchSize; if (bigBatch.isSplitBatch()) { - maxBatchSize = Math.max(bigBatch.maxRecordSize, bigBatch.estimatedSizeInBytes() / 2); + targetSplitBatchSize = Math.max(bigBatch.maxRecordSize, bigBatch.estimatedSizeInBytes() / 2); } - Deque dq = bigBatch.split(maxBatchSize); + Deque dq = bigBatch.split(targetSplitBatchSize); int numSplitBatches = dq.size(); Deque partitionDequeue = getOrCreateDeque(bigBatch.topicPartition); while (!dq.isEmpty()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 61bfc4b2c6264..17d8676df1b35 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -64,9 +64,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Optional; import java.util.OptionalInt; +import java.util.PriorityQueue; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -1719,7 +1719,7 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx Comparator reverseComparator = (batch1, batch2) -> Integer.compare(batch2.recordCount, batch1.recordCount); - while (splitOperations < maxSplitOperations || !foundSingleRecordBatch) { + while (splitOperations < maxSplitOperations && !foundSingleRecordBatch) { // Get the current batches for this topic-partition Deque tp1Deque = accum.getDeque(tp1); if (tp1Deque.isEmpty()) {