diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 35cda5e51634b..f0c2719db9612 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -514,7 +514,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) { // the split doesn't happen too often. CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(), Math.max(1.0f, (float) bigBatch.compressionRatio())); - Deque dq = bigBatch.split(this.batchSize); + int targetSplitBatchSize = this.batchSize; + + if (bigBatch.isSplitBatch()) { + targetSplitBatchSize = Math.max(bigBatch.maxRecordSize, bigBatch.estimatedSizeInBytes() / 2); + } + Deque dq = bigBatch.split(targetSplitBatchSize); int numSplitBatches = dq.size(); Deque partitionDequeue = getOrCreateDeque(bigBatch.topicPartition); while (!dq.isEmpty()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index ce01460e6edb0..17d8676df1b35 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -58,12 +58,15 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.PriorityQueue; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -77,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -1665,4 +1669,125 @@ int randomPartition() { return mockRandom == null ? super.randomPartition() : mockRandom.getAndIncrement(); } } + + /** + * This test verifies that RecordAccumulator's batch splitting functionality + * correctly handles oversized batches + * by splitting them down to individual records when necessary. It ensures that: + * 1. The splitting process can reduce batches to single-record size + * 2. The process does not enter infinite recursion loops + * 3. No records are lost or duplicated during splitting + * 4. The correct batch state is maintained throughout the process + */ + @Test + public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedException { + // Initialize test environment with a large batch size + long now = time.milliseconds(); + int batchSize = 1024 * 1024; // 1MB batch size + RecordAccumulator accum = createTestRecordAccumulator(batchSize, 10 * batchSize, Compression.gzip().build(), + 10); + + // Create a large producer batch manually (bypassing the accumulator's normal + // append process) + ByteBuffer buffer = ByteBuffer.allocate(batchSize); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); + ProducerBatch bigBatch = new ProducerBatch(tp1, builder, now, true); + + // Populate the batch with multiple records (100 records of 1KB each) + byte[] largeValue = new byte[1024]; // Each record is 1KB + for (int i = 0; i < 100; i++) { + ByteBuffer keyBytes = ByteBuffer.allocate(4); + keyBytes.putInt(i); // Use the loop counter as the key for verification later + FutureRecordMetadata result = bigBatch.tryAppend(time.milliseconds(), keyBytes.array(), largeValue, + Record.EMPTY_HEADERS, null, time.milliseconds()); + assertNotNull(result); + } + bigBatch.close(); + + time.sleep(101L); // Ensure the batch has time to become ready for processing + + // Add the batch to the accumulator for splitting + accum.reenqueue(bigBatch, time.milliseconds()); + + // Iteratively split batches until we find single-record batches + // This section tests the core batch splitting functionality + int splitOperations = 0; + int maxSplitOperations = 100; // Safety limit to prevent infinite recursion + boolean foundSingleRecordBatch = false; + + // Use a comparator that puts the batch with the most records first + Comparator reverseComparator = (batch1, batch2) -> Integer.compare(batch2.recordCount, + batch1.recordCount); + + while (splitOperations < maxSplitOperations && !foundSingleRecordBatch) { + // Get the current batches for this topic-partition + Deque tp1Deque = accum.getDeque(tp1); + if (tp1Deque.isEmpty()) { + break; + } + + // Find the batch with the most records + PriorityQueue tp1PriorityQue = new PriorityQueue<>(reverseComparator); + tp1PriorityQue.addAll(tp1Deque); + ProducerBatch batch = tp1PriorityQue.poll(); + if (batch == null) { + break; + } + + // If we've found a batch with only one record, we've reached our goal + if (batch.recordCount == 1) { + foundSingleRecordBatch = true; + break; + } + + // Remove the batch from the deque before splitting it + tp1Deque.remove(batch); + + // Split the batch and track the operation + int numSplitBatches = accum.splitAndReenqueue(batch); + splitOperations++; + + // If splitting produced no new batches (shouldn't happen with multi-record + // batches) + // mark the batch as complete + if (numSplitBatches == 0) { + assertEquals(1, batch.recordCount, "Unsplittable batch should have only 1 record"); + batch.complete(0L, 0L); + foundSingleRecordBatch = true; + } + } + + // Verification section: Check that the splitting process worked as expected + + // Verify that we found a single-record batch, proving that splitting can reach + // that level + assertTrue(foundSingleRecordBatch, "Should eventually produce batches with single records"); + + // Verify we didn't hit our safety limit, which would indicate potential + // infinite recursion + assertTrue(splitOperations < maxSplitOperations, + "Should not hit the safety limit, indicating no infinite recursion"); + + // Verify all remaining batches have at most one record + Deque finalDeque = accum.getDeque(tp1); + + Map keyFoundMap = new HashMap<>(); + // Check each batch and verify record integrity + for (ProducerBatch batch : finalDeque) { + assertTrue(batch.recordCount <= 1, "All remaining batches should have at most 1 record"); + + // Extract the record and its key + MemoryRecords batchRecords = batch.records(); + Iterator recordIterator = batchRecords.records().iterator(); + Record singleRecord = recordIterator.next(); + + // Track keys to ensure no duplicates (putIfAbsent returns null if the key + // wasn't present) + assertNull(keyFoundMap.putIfAbsent(singleRecord.key().getInt(), true), + "Each key should appear exactly once in the split batches"); + } + + // Verify all original records are accounted for (no data loss) + assertEquals(keyFoundMap.size(), 100, "All original 100 records should be present after splitting"); + } }