From 7f763d327fbde2e9849a6548efee4ce165b6fcb1 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 15 Dec 2023 19:33:15 -0500 Subject: [PATCH] KAFKA-16007 Merge batch records during ZK migration (#15007) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit To avoid creating lots of small KRaft batches during the ZK migration, this patch adds a mechanism to merge batches into sizes of at least 1000. This has the effect of reducing the number of batches sent to Raft which reduces the amount of time spent blocking. Since migrations use metadata transactions, the batch boundaries for migrated records are not significant. Even in light of that, this implementation does not break up existing batches. It will only combine them into a larger batch to meet the minimum size. Reviewers: José Armando García Sancio --- .../migration/BufferingBatchConsumer.java | 58 +++++++++ .../migration/KRaftMigrationDriver.java | 51 +++++--- .../metadata/migration/MigrationManifest.java | 25 +++- .../migration/BufferingBatchConsumerTest.java | 120 ++++++++++++++++++ .../migration/CapturingMigrationClient.java | 33 ++++- .../CapturingTopicMigrationClient.java | 2 +- .../migration/KRaftMigrationDriverTest.java | 97 +++++++++++++- .../migration/MigrationManifestTest.java | 21 +-- 8 files changed, 363 insertions(+), 44 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java new file mode 100644 index 000000000000..ed8a8236aaf0 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so + * by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed + * by this class will not be broken apart, only combined with other batches to reach the minimum batch size. + *

+ * Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any + * buffered records. + */ +public class BufferingBatchConsumer implements Consumer> { + + private final Consumer> delegateConsumer; + private final int minBatchSize; + private List bufferedBatch; + + BufferingBatchConsumer(Consumer> delegateConsumer, int minBatchSize) { + this.delegateConsumer = delegateConsumer; + this.minBatchSize = minBatchSize; + this.bufferedBatch = new ArrayList<>(minBatchSize); + } + + @Override + public void accept(List batch) { + bufferedBatch.addAll(batch); + if (bufferedBatch.size() >= minBatchSize) { + flush(); + } + } + + public void flush() { + if (!bufferedBatch.isEmpty()) { + delegateConsumer.accept(bufferedBatch); + bufferedBatch = new ArrayList<>(minBatchSize); + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 25a1cf5ba8be..156515f7be81 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -34,6 +34,7 @@ import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.util.Deadline; import org.apache.kafka.server.util.FutureUtils; @@ -87,7 +88,9 @@ public long nextPollTimeMs() { * amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from * blocking indefinitely. */ - private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000; + final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000; + + final static int MIGRATION_MIN_BATCH_SIZE = 1_000; private final Time time; private final Logger log; @@ -645,6 +648,29 @@ public void run() throws Exception { } } + private BufferingBatchConsumer buildMigrationBatchConsumer( + MigrationManifest.Builder manifestBuilder + ) { + return new BufferingBatchConsumer<>(batch -> { + try { + if (log.isTraceEnabled()) { + batch.forEach(apiMessageAndVersion -> + log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message()))); + } + CompletableFuture future = zkRecordConsumer.acceptBatch(batch); + long batchStart = time.nanoseconds(); + FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", + "the metadata layer to commit " + batch.size() + " migration records", + future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time); + long batchEnd = time.nanoseconds(); + manifestBuilder.acceptBatch(batch, batchEnd - batchStart); + } catch (Throwable e) { + // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata + throw new RuntimeException(e); + } + }, MIGRATION_MIN_BATCH_SIZE); + } + class MigrateMetadataEvent extends MigrationEvent { @Override public void run() throws Exception { @@ -664,23 +690,12 @@ public void run() throws Exception { super.handleException(t); } try { - zkMigrationClient.readAllMetadata(batch -> { - try { - log.info("Migrating {} records from ZK", batch.size()); - if (log.isTraceEnabled()) { - batch.forEach(apiMessageAndVersion -> - log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message()))); - } - CompletableFuture future = zkRecordConsumer.acceptBatch(batch); - FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "", - "the metadata layer to commit migration record batch", - future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time); - manifestBuilder.acceptBatch(batch); - } catch (Throwable e) { - // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata - throw new RuntimeException(e); - } - }, brokersInMetadata::add); + BufferingBatchConsumer migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder); + zkMigrationClient.readAllMetadata( + migrationBatchConsumer, + brokersInMetadata::add + ); + migrationBatchConsumer.flush(); CompletableFuture completeMigrationFuture = zkRecordConsumer.completeMigration(); OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging( KRaftMigrationDriver.this.log, "", diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java index fc31fb416d35..39dae38d52cb 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java @@ -41,6 +41,7 @@ public static class Builder { private final Map counts = new HashMap<>(); private int batches = 0; private int total = 0; + private long batchDurationsNs = 0; private long endTimeNanos = 0; Builder(Time time) { @@ -48,8 +49,9 @@ public static class Builder { this.startTimeNanos = time.nanoseconds(); } - public void acceptBatch(List recordBatch) { + public void acceptBatch(List recordBatch, long durationNs) { batches++; + batchDurationsNs += durationNs; recordBatch.forEach(apiMessageAndVersion -> { MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey()); counts.merge(type, 1, Integer::sum); @@ -62,23 +64,26 @@ public MigrationManifest build() { endTimeNanos = time.nanoseconds(); } Map orderedCounts = new TreeMap<>(counts); - return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts); + return new MigrationManifest(total, batches, batchDurationsNs, endTimeNanos - startTimeNanos, orderedCounts); } } private final int totalRecords; private final int totalBatches; + private final long totalBatchDurationsNs; private final long durationNanos; private final Map recordTypeCounts; MigrationManifest( int totalRecords, int totalBatches, + long totalBatchDurationsNs, long durationNanos, Map recordTypeCounts ) { this.totalRecords = totalRecords; this.totalBatches = totalBatches; + this.totalBatchDurationsNs = totalBatchDurationsNs; this.durationNanos = durationNanos; this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts); } @@ -91,6 +96,13 @@ public long durationMs() { return TimeUnit.NANOSECONDS.toMillis(durationNanos); } + public double avgBatchDurationMs() { + if (totalBatches == 0) { + return -1; + } + return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -98,17 +110,20 @@ public boolean equals(Object o) { MigrationManifest that = (MigrationManifest) o; return totalRecords == that.totalRecords && totalBatches == that.totalBatches && + totalBatchDurationsNs == that.totalBatchDurationsNs && durationNanos == that.durationNanos && recordTypeCounts.equals(that.recordTypeCounts); } @Override public int hashCode() { - return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts); + return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, durationNanos, recordTypeCounts); } public String toString() { - return String.format("%d records were generated in %d ms across %d batches. The record types were %s", - totalRecords, durationMs(), totalBatches, recordTypeCounts); + return String.format( + "%d records were generated in %d ms across %d batches. The average time spent waiting on a " + + "batch was %.2f ms. The record types were %s", + totalRecords, durationMs(), totalBatches, avgBatchDurationMs(), recordTypeCounts); } } \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java new file mode 100644 index 000000000000..819920573567 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BufferingBatchConsumerTest { + + @Test + public void testEmptyBatches() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4); + consumer.accept(Collections.emptyList()); + consumer.accept(Collections.emptyList()); + assertEquals(batches.size(), 0); + consumer.flush(); + assertEquals(batches.size(), 0); + } + + @Test + public void testOneBatchSameAsMinSize() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4); + consumer.accept(Arrays.asList(1, 2, 3, 4)); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4)); + consumer.flush(); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4)); + } + + @Test + public void testOneBatchSmallerThanMinSize() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4); + consumer.accept(Arrays.asList(1, 2, 3)); + assertEquals(batches.size(), 0); + consumer.flush(); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3)); + } + + @Test + public void testOneBatchLargerThanMinSize() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4); + consumer.accept(Arrays.asList(1, 2, 3, 4, 5)); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5)); + consumer.flush(); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5)); + } + + @Test + public void testMultiBatchSameAsMinSize() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6); + consumer.accept(Arrays.asList(1, 2)); + consumer.accept(Arrays.asList(3, 4)); + consumer.accept(Arrays.asList(5, 6)); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); + consumer.flush(); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); + } + + @Test + public void testMultiBatchSmallerThanMinSize() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6); + consumer.accept(Arrays.asList(1, 2)); + consumer.accept(Arrays.asList(3, 4)); + consumer.accept(Collections.singletonList(5)); + assertEquals(batches.size(), 0); + consumer.flush(); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5)); + } + + @Test + public void testMultiBatchLargerThanMinSize() { + List> batches = new ArrayList<>(); + BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6); + consumer.accept(Arrays.asList(1, 2)); + consumer.accept(Arrays.asList(3, 4)); + consumer.accept(Arrays.asList(5, 6)); + consumer.accept(Arrays.asList(7, 8)); + consumer.accept(Arrays.asList(9, 10)); + assertEquals(batches.size(), 1); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); + consumer.flush(); + assertEquals(batches.size(), 2); + assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6)); + assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10)); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java index 02dfd540c7cd..503236cf266f 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java @@ -30,6 +30,20 @@ class CapturingMigrationClient implements MigrationClient { + static final MigrationBatchSupplier EMPTY_BATCH_SUPPLIER = new MigrationBatchSupplier() { + + }; + + interface MigrationBatchSupplier { + default List> recordBatches() { + return Collections.emptyList(); + } + + default List brokerIds() { + return Collections.emptyList(); + } + } + static Builder newBuilder() { return new Builder(); } @@ -40,6 +54,8 @@ public static class Builder { ConfigMigrationClient configMigrationClient = new CapturingConfigMigrationClient(); AclMigrationClient aclMigrationClient = new CapturingAclMigrationClient(); DelegationTokenMigrationClient delegationTokenMigrationClient = new CapturingDelegationTokenMigrationClient(); + MigrationBatchSupplier batchSupplier = EMPTY_BATCH_SUPPLIER; + public Builder setBrokersInZk(int... brokerIds) { brokersInZk = IntStream.of(brokerIds).boxed().collect(Collectors.toSet()); @@ -66,13 +82,19 @@ public Builder setDelegationTokenMigrationClient(DelegationTokenMigrationClient return this; } + public Builder setBatchSupplier(MigrationBatchSupplier batchSupplier) { + this.batchSupplier = batchSupplier; + return this; + } + public CapturingMigrationClient build() { return new CapturingMigrationClient( brokersInZk, topicMigrationClient, configMigrationClient, aclMigrationClient, - delegationTokenMigrationClient + delegationTokenMigrationClient, + batchSupplier ); } } @@ -82,7 +104,7 @@ public CapturingMigrationClient build() { private final ConfigMigrationClient configMigrationClient; private final AclMigrationClient aclMigrationClient; private final DelegationTokenMigrationClient delegationTokenMigrationClient; - + private final MigrationBatchSupplier batchSupplier; private ZkMigrationLeadershipState state = null; CapturingMigrationClient( @@ -90,13 +112,15 @@ public CapturingMigrationClient build() { TopicMigrationClient topicMigrationClient, ConfigMigrationClient configMigrationClient, AclMigrationClient aclMigrationClient, - DelegationTokenMigrationClient delegationTokenMigrationClient + DelegationTokenMigrationClient delegationTokenMigrationClient, + MigrationBatchSupplier batchSupplier ) { this.brokerIds = brokerIdsInZk; this.topicMigrationClient = topicMigrationClient; this.configMigrationClient = configMigrationClient; this.aclMigrationClient = aclMigrationClient; this.delegationTokenMigrationClient = delegationTokenMigrationClient; + this.batchSupplier = batchSupplier; } @Override @@ -165,7 +189,8 @@ public void readAllMetadata( Consumer> batchConsumer, Consumer brokerIdConsumer ) { - + batchSupplier.recordBatches().forEach(batchConsumer); + batchSupplier.brokerIds().forEach(brokerIdConsumer); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java index 8b8e5acc5f35..e3b7cede7fc4 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java @@ -48,7 +48,7 @@ public void reset() { @Override public void iterateTopics(EnumSet interests, TopicVisitor visitor) { - + } @Override diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index f6a57d1da99c..2b29574ab863 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -18,11 +18,13 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.controller.QuorumFeatures; @@ -53,8 +55,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -72,6 +77,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -305,7 +311,8 @@ public void testMigrationWithClientException(boolean authException) throws Excep new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient(), - new CapturingDelegationTokenMigrationClient()) { + new CapturingDelegationTokenMigrationClient(), + CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) { @Override public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) { if (claimLeaderAttempts.getCount() == 0) { @@ -447,11 +454,8 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate( @Test public void testSkipWaitForBrokersInDualWrite() throws Exception { CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); - CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), - new CapturingTopicMigrationClient(), - new CapturingConfigMigrationClient(), - new CapturingAclMigrationClient(), - new CapturingDelegationTokenMigrationClient()); + CapturingMigrationClient.newBuilder().build(); + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build(); MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration"); KRaftMigrationDriver.Builder builder = defaultTestBuilder() .setZkMigrationClient(migrationClient) @@ -761,7 +765,7 @@ public CompletableFuture beginMigration() { }; CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build(); - MockFaultHandler faultHandler = new MockFaultHandler("testTwoMigrateMetadataEvents"); + MockFaultHandler faultHandler = new MockFaultHandler("testBeginMigrationOnce"); KRaftMigrationDriver.Builder builder = defaultTestBuilder() .setZkMigrationClient(migrationClient) .setZkRecordConsumer(recordConsumer) @@ -795,4 +799,83 @@ public CompletableFuture beginMigration() { assertEquals(1, migrationBeginCalls.get()); } } + + private List fillBatch(int size) { + ApiMessageAndVersion[] batch = new ApiMessageAndVersion[size]; + Arrays.fill(batch, new ApiMessageAndVersion(new TopicRecord().setName("topic-fill").setTopicId(Uuid.randomUuid()), (short) 0)); + return Arrays.asList(batch); + } + + static Stream batchSizes() { + return Stream.of( + Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0), + Arguments.of(Arrays.asList(0, 0, 1, 0), 1, 1), + Arguments.of(Arrays.asList(1, 1, 1, 1), 1, 4), + Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE - 1), 1, 999), + Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 1, 1000), + Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE + 1), 1, 1001), + Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, 1), 2, 1001), + Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0), + Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 3, 3000) + ); + } + @ParameterizedTest + @MethodSource("batchSizes") + public void testCoalesceMigrationRecords(List batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception { + List> batchesPassedToController = new ArrayList<>(); + NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() { + @Override + public CompletableFuture acceptBatch(List recordBatch) { + batchesPassedToController.add(recordBatch); + return CompletableFuture.completedFuture(null); + } + }; + CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder() + .setBrokersInZk(1, 2, 3) + .setBatchSupplier(new CapturingMigrationClient.MigrationBatchSupplier() { + @Override + public List> recordBatches() { + List> batches = new ArrayList<>(); + for (int batchSize : batchSizes) { + batches.add(fillBatch(batchSize)); + } + return batches; + } + }) + .build(); + MockFaultHandler faultHandler = new MockFaultHandler("testRebatchMigrationRecords"); + + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setZkRecordConsumer(recordConsumer) + .setPropagator(metadataPropagator) + .setFaultHandler(faultHandler); + try (KRaftMigrationDriver driver = builder.build()) { + MetadataImage image = MetadataImage.EMPTY; + MetadataDelta delta = new MetadataDelta(image); + + driver.start(); + setupDeltaForMigration(delta, true); + delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); + delta.replay(zkBrokerRecord(1)); + delta.replay(zkBrokerRecord(2)); + delta.replay(zkBrokerRecord(3)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + + driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1)); + + driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, + new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); + driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, + new LeaderAndEpoch(OptionalInt.of(3000), 1)).build()); + + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + + assertEquals(expectedBatchCount, batchesPassedToController.size()); + assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum()); + } + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java index 63c548ec4323..711073e364a4 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java @@ -38,7 +38,7 @@ public void testEmpty() { MigrationManifest manifest = manifestBuilder.build(); assertEquals(0L, manifest.durationMs()); assertEquals( - "0 records were generated in 0 ms across 0 batches. The record types were {}", + "0 records were generated in 0 ms across 0 batches. The average time spent waiting on a batch was -1.00 ms. The record types were {}", manifest.toString()); } @@ -60,11 +60,12 @@ public void testOneBatch() { new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new ConfigRecord(), (short) 0), new ApiMessageAndVersion(new ConfigRecord(), (short) 0) - )); + ), 20); + time.sleep(10); MigrationManifest manifest = manifestBuilder.build(); - assertEquals(0L, manifest.durationMs()); + assertEquals(10L, manifest.durationMs()); assertEquals( - "13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", + "13 records were generated in 10 ms across 1 batches. The average time spent waiting on a batch was 0.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", manifest.toString() ); } @@ -79,7 +80,7 @@ public void testManyBatch() { new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0) - )); + ), 20_000_000); manifestBuilder.acceptBatch(Arrays.asList( new ApiMessageAndVersion(new TopicRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0), @@ -88,14 +89,16 @@ public void testManyBatch() { new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new PartitionRecord(), (short) 0), new ApiMessageAndVersion(new ConfigRecord(), (short) 0) - )); + ), 20_000_000); manifestBuilder.acceptBatch(Collections.singletonList( new ApiMessageAndVersion(new ConfigRecord(), (short) 0) - )); + ), 5_000_000); + time.sleep(60); MigrationManifest manifest = manifestBuilder.build(); - assertEquals(0L, manifest.durationMs()); + assertEquals(60L, manifest.durationMs()); assertEquals( - "13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", + "13 records were generated in 60 ms across 3 batches. The average time spent waiting on a " + + "batch was 15.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", manifest.toString() ); }