Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
KAFKA-16007 Merge batch records during ZK migration (#15007)
To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking. Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size. Reviewers: José Armando García Sancio <jsancio@apache.org>
- Loading branch information
Showing
8 changed files
with
362 additions
and
44 deletions.
There are no files selected for viewing
58 changes: 58 additions & 0 deletions
58
metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kafka.metadata.migration; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.function.Consumer; | ||
|
||
/** | ||
* A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so | ||
* by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed | ||
* by this class will not be broken apart, only combined with other batches to reach the minimum batch size. | ||
* </p> | ||
* Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any | ||
* buffered records. | ||
*/ | ||
public class BufferingBatchConsumer<T> implements Consumer<List<T>> { | ||
|
||
private final Consumer<List<T>> delegateConsumer; | ||
private final int minBatchSize; | ||
private List<T> bufferedBatch; | ||
|
||
BufferingBatchConsumer(Consumer<List<T>> delegateConsumer, int minBatchSize) { | ||
this.delegateConsumer = delegateConsumer; | ||
this.minBatchSize = minBatchSize; | ||
this.bufferedBatch = new ArrayList<>(minBatchSize); | ||
} | ||
|
||
@Override | ||
public void accept(List<T> batch) { | ||
bufferedBatch.addAll(batch); | ||
if (bufferedBatch.size() >= minBatchSize) { | ||
flush(); | ||
} | ||
} | ||
|
||
public void flush() { | ||
if (!bufferedBatch.isEmpty()) { | ||
delegateConsumer.accept(bufferedBatch); | ||
bufferedBatch = new ArrayList<>(minBatchSize); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
120 changes: 120 additions & 0 deletions
120
metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kafka.metadata.migration; | ||
|
||
import org.junit.jupiter.api.Test; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
public class BufferingBatchConsumerTest { | ||
|
||
@Test | ||
public void testEmptyBatches() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4); | ||
consumer.accept(Collections.emptyList()); | ||
consumer.accept(Collections.emptyList()); | ||
assertEquals(batches.size(), 0); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 0); | ||
} | ||
|
||
@Test | ||
public void testOneBatchSameAsMinSize() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4); | ||
consumer.accept(Arrays.asList(1, 2, 3, 4)); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4)); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4)); | ||
} | ||
|
||
@Test | ||
public void testOneBatchSmallerThanMinSize() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4); | ||
consumer.accept(Arrays.asList(1, 2, 3)); | ||
assertEquals(batches.size(), 0); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3)); | ||
} | ||
|
||
@Test | ||
public void testOneBatchLargerThanMinSize() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 4); | ||
consumer.accept(Arrays.asList(1, 2, 3, 4, 5)); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5)); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5)); | ||
} | ||
|
||
@Test | ||
public void testMultiBatchSameAsMinSize() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6); | ||
consumer.accept(Arrays.asList(1, 2)); | ||
consumer.accept(Arrays.asList(3, 4)); | ||
consumer.accept(Arrays.asList(5, 6)); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); | ||
} | ||
|
||
@Test | ||
public void testMultiBatchSmallerThanMinSize() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6); | ||
consumer.accept(Arrays.asList(1, 2)); | ||
consumer.accept(Arrays.asList(3, 4)); | ||
consumer.accept(Collections.singletonList(5)); | ||
assertEquals(batches.size(), 0); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5)); | ||
} | ||
|
||
@Test | ||
public void testMultiBatchLargerThanMinSize() { | ||
List<List<Integer>> batches = new ArrayList<>(); | ||
BufferingBatchConsumer<Integer> consumer = new BufferingBatchConsumer<>(batches::add, 6); | ||
consumer.accept(Arrays.asList(1, 2)); | ||
consumer.accept(Arrays.asList(3, 4)); | ||
consumer.accept(Arrays.asList(5, 6)); | ||
consumer.accept(Arrays.asList(7, 8)); | ||
consumer.accept(Arrays.asList(9, 10)); | ||
assertEquals(batches.size(), 1); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); | ||
consumer.flush(); | ||
assertEquals(batches.size(), 2); | ||
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); | ||
assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10)); | ||
} | ||
} |
Oops, something went wrong.