Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,12 @@ public int splitAndReenqueue(ProducerBatch bigBatch) {
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression.type(),
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int targetSplitBatchSize = this.batchSize;

if (bigBatch.isSplitBatch()) {
targetSplitBatchSize = Math.max(bigBatch.maxRecordSize, bigBatch.estimatedSizeInBytes() / 2);
}
Deque<ProducerBatch> dq = bigBatch.split(targetSplitBatchSize);
int numSplitBatches = dq.size();
Deque<ProducerBatch> partitionDequeue = getOrCreateDeque(bigBatch.topicPartition);
while (!dq.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -77,6 +80,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -1665,4 +1669,125 @@ int randomPartition() {
return mockRandom == null ? super.randomPartition() : mockRandom.getAndIncrement();
}
}

/**
* This test verifies that RecordAccumulator's batch splitting functionality
* correctly handles oversized batches
* by splitting them down to individual records when necessary. It ensures that:
* 1. The splitting process can reduce batches to single-record size
* 2. The process does not enter infinite recursion loops
* 3. No records are lost or duplicated during splitting
* 4. The correct batch state is maintained throughout the process
*/
@Test
public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedException {
// Initialize test environment with a large batch size
long now = time.milliseconds();
int batchSize = 1024 * 1024; // 1MB batch size
RecordAccumulator accum = createTestRecordAccumulator(batchSize, 10 * batchSize, Compression.gzip().build(),
10);

// Create a large producer batch manually (bypassing the accumulator's normal
// append process)
ByteBuffer buffer = ByteBuffer.allocate(batchSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L);
ProducerBatch bigBatch = new ProducerBatch(tp1, builder, now, true);

// Populate the batch with multiple records (100 records of 1KB each)
byte[] largeValue = new byte[1024]; // Each record is 1KB
for (int i = 0; i < 100; i++) {
ByteBuffer keyBytes = ByteBuffer.allocate(4);
keyBytes.putInt(i); // Use the loop counter as the key for verification later
FutureRecordMetadata result = bigBatch.tryAppend(time.milliseconds(), keyBytes.array(), largeValue,
Record.EMPTY_HEADERS, null, time.milliseconds());
assertNotNull(result);
}
bigBatch.close();

time.sleep(101L); // Ensure the batch has time to become ready for processing

// Add the batch to the accumulator for splitting
accum.reenqueue(bigBatch, time.milliseconds());

// Iteratively split batches until we find single-record batches
// This section tests the core batch splitting functionality
int splitOperations = 0;
int maxSplitOperations = 100; // Safety limit to prevent infinite recursion
boolean foundSingleRecordBatch = false;

// Use a comparator that puts the batch with the most records first
Comparator<ProducerBatch> reverseComparator = (batch1, batch2) -> Integer.compare(batch2.recordCount,
batch1.recordCount);

while (splitOperations < maxSplitOperations && !foundSingleRecordBatch) {
// Get the current batches for this topic-partition
Deque<ProducerBatch> tp1Deque = accum.getDeque(tp1);
if (tp1Deque.isEmpty()) {
break;
}

// Find the batch with the most records
PriorityQueue<ProducerBatch> tp1PriorityQue = new PriorityQueue<>(reverseComparator);
tp1PriorityQue.addAll(tp1Deque);
ProducerBatch batch = tp1PriorityQue.poll();
if (batch == null) {
break;
}

// If we've found a batch with only one record, we've reached our goal
if (batch.recordCount == 1) {
foundSingleRecordBatch = true;
break;
}

// Remove the batch from the deque before splitting it
tp1Deque.remove(batch);

// Split the batch and track the operation
int numSplitBatches = accum.splitAndReenqueue(batch);
splitOperations++;

// If splitting produced no new batches (shouldn't happen with multi-record
// batches)
// mark the batch as complete
if (numSplitBatches == 0) {
assertEquals(1, batch.recordCount, "Unsplittable batch should have only 1 record");
batch.complete(0L, 0L);
foundSingleRecordBatch = true;
}
}

// Verification section: Check that the splitting process worked as expected

// Verify that we found a single-record batch, proving that splitting can reach
// that level
assertTrue(foundSingleRecordBatch, "Should eventually produce batches with single records");

// Verify we didn't hit our safety limit, which would indicate potential
// infinite recursion
assertTrue(splitOperations < maxSplitOperations,
"Should not hit the safety limit, indicating no infinite recursion");

// Verify all remaining batches have at most one record
Deque<ProducerBatch> finalDeque = accum.getDeque(tp1);

Map<Integer, Boolean> keyFoundMap = new HashMap<>();
// Check each batch and verify record integrity
for (ProducerBatch batch : finalDeque) {
assertTrue(batch.recordCount <= 1, "All remaining batches should have at most 1 record");

// Extract the record and its key
MemoryRecords batchRecords = batch.records();
Iterator<Record> recordIterator = batchRecords.records().iterator();
Record singleRecord = recordIterator.next();

// Track keys to ensure no duplicates (putIfAbsent returns null if the key
// wasn't present)
assertNull(keyFoundMap.putIfAbsent(singleRecord.key().getInt(), true),
"Each key should appear exactly once in the split batches");
}

// Verify all original records are accounted for (no data loss)
assertEquals(keyFoundMap.size(), 100, "All original 100 records should be present after splitting");
}
}