diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
new file mode 100644
index 000000000000..ed8a8236aaf0
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so
+ * by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed
+ * by this class will not be broken apart, only combined with other batches to reach the minimum batch size.
+ *
+ * Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any
+ * buffered records.
+ */
+public class BufferingBatchConsumer implements Consumer> {
+
+ private final Consumer> delegateConsumer;
+ private final int minBatchSize;
+ private List bufferedBatch;
+
+ BufferingBatchConsumer(Consumer> delegateConsumer, int minBatchSize) {
+ this.delegateConsumer = delegateConsumer;
+ this.minBatchSize = minBatchSize;
+ this.bufferedBatch = new ArrayList<>(minBatchSize);
+ }
+
+ @Override
+ public void accept(List batch) {
+ bufferedBatch.addAll(batch);
+ if (bufferedBatch.size() >= minBatchSize) {
+ flush();
+ }
+ }
+
+ public void flush() {
+ if (!bufferedBatch.isEmpty()) {
+ delegateConsumer.accept(bufferedBatch);
+ bufferedBatch = new ArrayList<>(minBatchSize);
+ }
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 94183701742a..518ceea13874 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -34,6 +34,7 @@
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
@@ -87,7 +88,9 @@ public long nextPollTimeMs() {
* amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from
* blocking indefinitely.
*/
- private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
+ final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
+
+ final static int MIGRATION_MIN_BATCH_SIZE = 1_000;
private final Time time;
private final Logger log;
@@ -644,6 +647,29 @@ public void run() throws Exception {
}
}
+ private BufferingBatchConsumer buildMigrationBatchConsumer(
+ MigrationManifest.Builder manifestBuilder
+ ) {
+ return new BufferingBatchConsumer<>(batch -> {
+ try {
+ if (log.isTraceEnabled()) {
+ batch.forEach(apiMessageAndVersion ->
+ log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
+ }
+ CompletableFuture> future = zkRecordConsumer.acceptBatch(batch);
+ long batchStart = time.nanoseconds();
+ FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
+ "the metadata layer to commit " + batch.size() + " migration records",
+ future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
+ long batchEnd = time.nanoseconds();
+ manifestBuilder.acceptBatch(batch, batchEnd - batchStart);
+ } catch (Throwable e) {
+ // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
+ throw new RuntimeException(e);
+ }
+ }, MIGRATION_MIN_BATCH_SIZE);
+ }
+
class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
@@ -663,23 +689,12 @@ public void run() throws Exception {
super.handleException(t);
}
try {
- zkMigrationClient.readAllMetadata(batch -> {
- try {
- log.info("Migrating {} records from ZK", batch.size());
- if (log.isTraceEnabled()) {
- batch.forEach(apiMessageAndVersion ->
- log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
- }
- CompletableFuture> future = zkRecordConsumer.acceptBatch(batch);
- FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
- "the metadata layer to commit migration record batch",
- future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
- manifestBuilder.acceptBatch(batch);
- } catch (Throwable e) {
- // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
- throw new RuntimeException(e);
- }
- }, brokersInMetadata::add);
+ BufferingBatchConsumer migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
+ zkMigrationClient.readAllMetadata(
+ migrationBatchConsumer,
+ brokersInMetadata::add
+ );
+ migrationBatchConsumer.flush();
CompletableFuture completeMigrationFuture = zkRecordConsumer.completeMigration();
OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
KRaftMigrationDriver.this.log, "",
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
index fc31fb416d35..39dae38d52cb 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
@@ -41,6 +41,7 @@ public static class Builder {
private final Map counts = new HashMap<>();
private int batches = 0;
private int total = 0;
+ private long batchDurationsNs = 0;
private long endTimeNanos = 0;
Builder(Time time) {
@@ -48,8 +49,9 @@ public static class Builder {
this.startTimeNanos = time.nanoseconds();
}
- public void acceptBatch(List recordBatch) {
+ public void acceptBatch(List recordBatch, long durationNs) {
batches++;
+ batchDurationsNs += durationNs;
recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, Integer::sum);
@@ -62,23 +64,26 @@ public MigrationManifest build() {
endTimeNanos = time.nanoseconds();
}
Map orderedCounts = new TreeMap<>(counts);
- return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
+ return new MigrationManifest(total, batches, batchDurationsNs, endTimeNanos - startTimeNanos, orderedCounts);
}
}
private final int totalRecords;
private final int totalBatches;
+ private final long totalBatchDurationsNs;
private final long durationNanos;
private final Map recordTypeCounts;
MigrationManifest(
int totalRecords,
int totalBatches,
+ long totalBatchDurationsNs,
long durationNanos,
Map recordTypeCounts
) {
this.totalRecords = totalRecords;
this.totalBatches = totalBatches;
+ this.totalBatchDurationsNs = totalBatchDurationsNs;
this.durationNanos = durationNanos;
this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
}
@@ -91,6 +96,13 @@ public long durationMs() {
return TimeUnit.NANOSECONDS.toMillis(durationNanos);
}
+ public double avgBatchDurationMs() {
+ if (totalBatches == 0) {
+ return -1;
+ }
+ return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -98,17 +110,20 @@ public boolean equals(Object o) {
MigrationManifest that = (MigrationManifest) o;
return totalRecords == that.totalRecords &&
totalBatches == that.totalBatches &&
+ totalBatchDurationsNs == that.totalBatchDurationsNs &&
durationNanos == that.durationNanos &&
recordTypeCounts.equals(that.recordTypeCounts);
}
@Override
public int hashCode() {
- return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts);
+ return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, durationNanos, recordTypeCounts);
}
public String toString() {
- return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
- totalRecords, durationMs(), totalBatches, recordTypeCounts);
+ return String.format(
+ "%d records were generated in %d ms across %d batches. The average time spent waiting on a " +
+ "batch was %.2f ms. The record types were %s",
+ totalRecords, durationMs(), totalBatches, avgBatchDurationMs(), recordTypeCounts);
}
}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
new file mode 100644
index 000000000000..819920573567
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BufferingBatchConsumerTest {
+
+ @Test
+ public void testEmptyBatches() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Collections.emptyList());
+ consumer.accept(Collections.emptyList());
+ assertEquals(batches.size(), 0);
+ consumer.flush();
+ assertEquals(batches.size(), 0);
+ }
+
+ @Test
+ public void testOneBatchSameAsMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Arrays.asList(1, 2, 3, 4));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
+ consumer.flush();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
+ }
+
+ @Test
+ public void testOneBatchSmallerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Arrays.asList(1, 2, 3));
+ assertEquals(batches.size(), 0);
+ consumer.flush();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3));
+ }
+
+ @Test
+ public void testOneBatchLargerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Arrays.asList(1, 2, 3, 4, 5));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
+ consumer.flush();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
+ }
+
+ @Test
+ public void testMultiBatchSameAsMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6);
+ consumer.accept(Arrays.asList(1, 2));
+ consumer.accept(Arrays.asList(3, 4));
+ consumer.accept(Arrays.asList(5, 6));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ consumer.flush();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ }
+
+ @Test
+ public void testMultiBatchSmallerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6);
+ consumer.accept(Arrays.asList(1, 2));
+ consumer.accept(Arrays.asList(3, 4));
+ consumer.accept(Collections.singletonList(5));
+ assertEquals(batches.size(), 0);
+ consumer.flush();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
+ }
+
+ @Test
+ public void testMultiBatchLargerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6);
+ consumer.accept(Arrays.asList(1, 2));
+ consumer.accept(Arrays.asList(3, 4));
+ consumer.accept(Arrays.asList(5, 6));
+ consumer.accept(Arrays.asList(7, 8));
+ consumer.accept(Arrays.asList(9, 10));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ consumer.flush();
+ assertEquals(batches.size(), 2);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10));
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
index 02dfd540c7cd..503236cf266f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
@@ -30,6 +30,20 @@
class CapturingMigrationClient implements MigrationClient {
+ static final MigrationBatchSupplier EMPTY_BATCH_SUPPLIER = new MigrationBatchSupplier() {
+
+ };
+
+ interface MigrationBatchSupplier {
+ default List> recordBatches() {
+ return Collections.emptyList();
+ }
+
+ default List brokerIds() {
+ return Collections.emptyList();
+ }
+ }
+
static Builder newBuilder() {
return new Builder();
}
@@ -40,6 +54,8 @@ public static class Builder {
ConfigMigrationClient configMigrationClient = new CapturingConfigMigrationClient();
AclMigrationClient aclMigrationClient = new CapturingAclMigrationClient();
DelegationTokenMigrationClient delegationTokenMigrationClient = new CapturingDelegationTokenMigrationClient();
+ MigrationBatchSupplier batchSupplier = EMPTY_BATCH_SUPPLIER;
+
public Builder setBrokersInZk(int... brokerIds) {
brokersInZk = IntStream.of(brokerIds).boxed().collect(Collectors.toSet());
@@ -66,13 +82,19 @@ public Builder setDelegationTokenMigrationClient(DelegationTokenMigrationClient
return this;
}
+ public Builder setBatchSupplier(MigrationBatchSupplier batchSupplier) {
+ this.batchSupplier = batchSupplier;
+ return this;
+ }
+
public CapturingMigrationClient build() {
return new CapturingMigrationClient(
brokersInZk,
topicMigrationClient,
configMigrationClient,
aclMigrationClient,
- delegationTokenMigrationClient
+ delegationTokenMigrationClient,
+ batchSupplier
);
}
}
@@ -82,7 +104,7 @@ public CapturingMigrationClient build() {
private final ConfigMigrationClient configMigrationClient;
private final AclMigrationClient aclMigrationClient;
private final DelegationTokenMigrationClient delegationTokenMigrationClient;
-
+ private final MigrationBatchSupplier batchSupplier;
private ZkMigrationLeadershipState state = null;
CapturingMigrationClient(
@@ -90,13 +112,15 @@ public CapturingMigrationClient build() {
TopicMigrationClient topicMigrationClient,
ConfigMigrationClient configMigrationClient,
AclMigrationClient aclMigrationClient,
- DelegationTokenMigrationClient delegationTokenMigrationClient
+ DelegationTokenMigrationClient delegationTokenMigrationClient,
+ MigrationBatchSupplier batchSupplier
) {
this.brokerIds = brokerIdsInZk;
this.topicMigrationClient = topicMigrationClient;
this.configMigrationClient = configMigrationClient;
this.aclMigrationClient = aclMigrationClient;
this.delegationTokenMigrationClient = delegationTokenMigrationClient;
+ this.batchSupplier = batchSupplier;
}
@Override
@@ -165,7 +189,8 @@ public void readAllMetadata(
Consumer> batchConsumer,
Consumer brokerIdConsumer
) {
-
+ batchSupplier.recordBatches().forEach(batchConsumer);
+ batchSupplier.brokerIds().forEach(brokerIdConsumer);
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
index 8b8e5acc5f35..e3b7cede7fc4 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
@@ -48,7 +48,7 @@ public void reset() {
@Override
public void iterateTopics(EnumSet interests, TopicVisitor visitor) {
-
+
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index e2d4ec49d1e8..19c7c7600c1f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -21,10 +21,12 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
@@ -55,8 +57,11 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
@@ -74,6 +79,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -319,7 +325,8 @@ public void testMigrationWithClientException(boolean authException) throws Excep
new CapturingTopicMigrationClient(),
new CapturingConfigMigrationClient(),
new CapturingAclMigrationClient(),
- new CapturingDelegationTokenMigrationClient()) {
+ new CapturingDelegationTokenMigrationClient(),
+ CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) {
@Override
public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
if (claimLeaderAttempts.getCount() == 0) {
@@ -411,11 +418,8 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate()
@Test
public void testSkipWaitForBrokersInDualWrite() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
- CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(),
- new CapturingTopicMigrationClient(),
- new CapturingConfigMigrationClient(),
- new CapturingAclMigrationClient(),
- new CapturingDelegationTokenMigrationClient());
+ CapturingMigrationClient.newBuilder().build();
+ CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
@@ -721,7 +725,7 @@ public CompletableFuture> beginMigration() {
};
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build();
- MockFaultHandler faultHandler = new MockFaultHandler("testTwoMigrateMetadataEvents");
+ MockFaultHandler faultHandler = new MockFaultHandler("testBeginMigrationOnce");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setZkRecordConsumer(recordConsumer)
@@ -754,4 +758,82 @@ public CompletableFuture> beginMigration() {
assertEquals(1, migrationBeginCalls.get());
}
}
+
+ private List fillBatch(int size) {
+ ApiMessageAndVersion[] batch = new ApiMessageAndVersion[size];
+ Arrays.fill(batch, new ApiMessageAndVersion(new TopicRecord().setName("topic-fill").setTopicId(Uuid.randomUuid()), (short) 0));
+ return Arrays.asList(batch);
+ }
+
+ static Stream batchSizes() {
+ return Stream.of(
+ Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0),
+ Arguments.of(Arrays.asList(0, 0, 1, 0), 1, 1),
+ Arguments.of(Arrays.asList(1, 1, 1, 1), 1, 4),
+ Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE - 1), 1, 999),
+ Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 1, 1000),
+ Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE + 1), 1, 1001),
+ Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, 1), 2, 1001),
+ Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0),
+ Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 3, 3000)
+ );
+ }
+ @ParameterizedTest
+ @MethodSource("batchSizes")
+ public void testCoalesceMigrationRecords(List batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception {
+ List> batchesPassedToController = new ArrayList<>();
+ NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
+ @Override
+ public CompletableFuture> acceptBatch(List recordBatch) {
+ batchesPassedToController.add(recordBatch);
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+ CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+ .setBrokersInZk(1, 2, 3)
+ .setBatchSupplier(new CapturingMigrationClient.MigrationBatchSupplier() {
+ @Override
+ public List> recordBatches() {
+ List> batches = new ArrayList<>();
+ for (int batchSize : batchSizes) {
+ batches.add(fillBatch(batchSize));
+ }
+ return batches;
+ }
+ })
+ .build();
+ MockFaultHandler faultHandler = new MockFaultHandler("testRebatchMigrationRecords");
+
+ KRaftMigrationDriver.Builder builder = defaultTestBuilder()
+ .setZkMigrationClient(migrationClient)
+ .setZkRecordConsumer(recordConsumer)
+ .setPropagator(metadataPropagator)
+ .setFaultHandler(faultHandler);
+ try (KRaftMigrationDriver driver = builder.build()) {
+ MetadataImage image = MetadataImage.EMPTY;
+ MetadataDelta delta = new MetadataDelta(image);
+
+ driver.start();
+ delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+ delta.replay(zkBrokerRecord(1));
+ delta.replay(zkBrokerRecord(2));
+ delta.replay(zkBrokerRecord(3));
+ MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+ image = delta.apply(provenance);
+
+ driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
+
+ driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
+ new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
+ driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
+ new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
+
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+ "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+
+ assertEquals(expectedBatchCount, batchesPassedToController.size());
+ assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum());
+ }
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
index 63c548ec4323..711073e364a4 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
@@ -38,7 +38,7 @@ public void testEmpty() {
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
- "0 records were generated in 0 ms across 0 batches. The record types were {}",
+ "0 records were generated in 0 ms across 0 batches. The average time spent waiting on a batch was -1.00 ms. The record types were {}",
manifest.toString());
}
@@ -60,11 +60,12 @@ public void testOneBatch() {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
- ));
+ ), 20);
+ time.sleep(10);
MigrationManifest manifest = manifestBuilder.build();
- assertEquals(0L, manifest.durationMs());
+ assertEquals(10L, manifest.durationMs());
assertEquals(
- "13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
+ "13 records were generated in 10 ms across 1 batches. The average time spent waiting on a batch was 0.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
@@ -79,7 +80,7 @@ public void testManyBatch() {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)
- ));
+ ), 20_000_000);
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
@@ -88,14 +89,16 @@ public void testManyBatch() {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
- ));
+ ), 20_000_000);
manifestBuilder.acceptBatch(Collections.singletonList(
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
- ));
+ ), 5_000_000);
+ time.sleep(60);
MigrationManifest manifest = manifestBuilder.build();
- assertEquals(0L, manifest.durationMs());
+ assertEquals(60L, manifest.durationMs());
assertEquals(
- "13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
+ "13 records were generated in 60 ms across 3 batches. The average time spent waiting on a " +
+ "batch was 15.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}