From 1dc3eb8aa3da081a82b902f0bf6b622031aac0e8 Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Wed, 13 Dec 2023 17:05:31 -0500
Subject: [PATCH 1/9] Rebatch records during the migration, collect more
migration stats
---
.../migration/BufferingBatchConsumer.java | 59 +++++++++++++++++++
.../migration/KRaftMigrationDriver.java | 48 +++++++++------
.../metadata/migration/MigrationManifest.java | 38 ++++++++++--
.../migration/MigrationManifestTest.java | 19 +++---
4 files changed, 134 insertions(+), 30 deletions(-)
create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
new file mode 100644
index 000000000000..014313101ac5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * A record batch consumer that re-batches incoming batches into a given size. It does so by buffering
+ * the records into an array that is later flushed to a downstream consumer. Batches consumed by this class
+ * will not be broken apart, only combined with other batches to reach the minimum batch size. Note that
+ * {@link #close()} must be called after the last batch has been accepted in order to flush any buffered records.
+ */
+public class BufferingBatchConsumer implements Consumer> {
+
+ private final Consumer> delegateConsumer;
+ private final List bufferedBatch;
+ private final int minBatchSize;
+
+ BufferingBatchConsumer(Consumer> delegateConsumer, int minBatchSize) {
+ this.delegateConsumer = delegateConsumer;
+ this.bufferedBatch = new ArrayList<>(minBatchSize);
+ this.minBatchSize = minBatchSize;
+ }
+
+ @Override
+ public void accept(List apiMessageAndVersions) {
+ bufferedBatch.addAll(apiMessageAndVersions);
+ if (bufferedBatch.size() >= minBatchSize) {
+ delegateConsumer.accept(new ArrayList<>(bufferedBatch));
+ bufferedBatch.clear();
+ }
+ }
+
+ public void close() {
+ if (!bufferedBatch.isEmpty()) {
+ delegateConsumer.accept(new ArrayList<>(bufferedBatch));
+ bufferedBatch.clear();
+ }
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 25a1cf5ba8be..1e37fffaaa60 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -89,6 +89,8 @@ public long nextPollTimeMs() {
*/
private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
+ private final static int MIGRATION_MIN_BATCH_SIZE = 1_000;
+
private final Time time;
private final Logger log;
private final int nodeId;
@@ -645,6 +647,29 @@ public void run() throws Exception {
}
}
+ private BufferingBatchConsumer buildMigrationBatchConsumer(
+ MigrationManifest.Builder manifestBuilder
+ ) {
+ return new BufferingBatchConsumer(batch -> {
+ try {
+ if (log.isTraceEnabled()) {
+ batch.forEach(apiMessageAndVersion ->
+ log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
+ }
+ CompletableFuture> future = zkRecordConsumer.acceptBatch(batch);
+ long batchStart = time.nanoseconds();
+ FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
+ "the metadata layer to commit " + batch.size() + " migration records",
+ future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
+ long batchEnd = time.nanoseconds();
+ manifestBuilder.acceptBatch(batch, batchEnd - batchStart);
+ } catch (Throwable e) {
+ // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
+ throw new RuntimeException(e);
+ }
+ }, MIGRATION_MIN_BATCH_SIZE);
+ }
+
class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
@@ -664,23 +689,12 @@ public void run() throws Exception {
super.handleException(t);
}
try {
- zkMigrationClient.readAllMetadata(batch -> {
- try {
- log.info("Migrating {} records from ZK", batch.size());
- if (log.isTraceEnabled()) {
- batch.forEach(apiMessageAndVersion ->
- log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
- }
- CompletableFuture> future = zkRecordConsumer.acceptBatch(batch);
- FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, "",
- "the metadata layer to commit migration record batch",
- future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
- manifestBuilder.acceptBatch(batch);
- } catch (Throwable e) {
- // This will cause readAllMetadata to throw since this batch consumer is called directly from readAllMetadata
- throw new RuntimeException(e);
- }
- }, brokersInMetadata::add);
+ BufferingBatchConsumer migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
+ zkMigrationClient.readAllMetadata(
+ migrationBatchConsumer,
+ brokersInMetadata::add
+ );
+ migrationBatchConsumer.close();
CompletableFuture completeMigrationFuture = zkRecordConsumer.completeMigration();
OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
KRaftMigrationDriver.this.log, "",
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
index fc31fb416d35..2934222be7b1 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
@@ -41,6 +41,8 @@ public static class Builder {
private final Map counts = new HashMap<>();
private int batches = 0;
private int total = 0;
+ private long batchDurationsNs = 0;
+ private long batchSizes = 0;
private long endTimeNanos = 0;
Builder(Time time) {
@@ -48,8 +50,10 @@ public static class Builder {
this.startTimeNanos = time.nanoseconds();
}
- public void acceptBatch(List recordBatch) {
+ public void acceptBatch(List recordBatch, long durationNs) {
batches++;
+ batchDurationsNs += durationNs;
+ batchSizes += recordBatch.size();
recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, Integer::sum);
@@ -62,23 +66,29 @@ public MigrationManifest build() {
endTimeNanos = time.nanoseconds();
}
Map orderedCounts = new TreeMap<>(counts);
- return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
+ return new MigrationManifest(total, batches, batchDurationsNs, batchSizes, endTimeNanos - startTimeNanos, orderedCounts);
}
}
private final int totalRecords;
private final int totalBatches;
+ private final long totalBatchDurationsNs;
+ private final long totalBatchSizes;
private final long durationNanos;
private final Map recordTypeCounts;
MigrationManifest(
int totalRecords,
int totalBatches,
+ long totalBatchDurationsNs,
+ long totalBatchSizes,
long durationNanos,
Map recordTypeCounts
) {
this.totalRecords = totalRecords;
this.totalBatches = totalBatches;
+ this.totalBatchDurationsNs = totalBatchDurationsNs;
+ this.totalBatchSizes = totalBatchSizes;
this.durationNanos = durationNanos;
this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
}
@@ -91,6 +101,20 @@ public long durationMs() {
return TimeUnit.NANOSECONDS.toMillis(durationNanos);
}
+ public double avgBatchDurationMs() {
+ if (totalBatches == 0) {
+ return -1;
+ }
+ return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches;
+ }
+
+ public double avgBatchSize() {
+ if (totalBatches == 0) {
+ return -1;
+ }
+ return 1.0 * totalBatchSizes / totalBatches;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -98,17 +122,21 @@ public boolean equals(Object o) {
MigrationManifest that = (MigrationManifest) o;
return totalRecords == that.totalRecords &&
totalBatches == that.totalBatches &&
+ totalBatchDurationsNs == that.totalBatchDurationsNs &&
+ totalBatchSizes == that.totalBatchSizes &&
durationNanos == that.durationNanos &&
recordTypeCounts.equals(that.recordTypeCounts);
}
@Override
public int hashCode() {
- return Objects.hash(totalRecords, totalBatches, durationNanos, recordTypeCounts);
+ return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, totalBatchSizes, durationNanos, recordTypeCounts);
}
public String toString() {
- return String.format("%d records were generated in %d ms across %d batches. The record types were %s",
- totalRecords, durationMs(), totalBatches, recordTypeCounts);
+ return String.format(
+ "%d records were generated in %d ms across %d batches. The average batch size was %.2f " +
+ "and the average batch commit duration was %.2f ms. The record types were %s",
+ totalRecords, durationMs(), totalBatches, avgBatchSize(), avgBatchDurationMs(), recordTypeCounts);
}
}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
index 63c548ec4323..ce4f5a84862f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
@@ -60,11 +60,12 @@ public void testOneBatch() {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
- ));
+ ), 20);
+ time.sleep(10);
MigrationManifest manifest = manifestBuilder.build();
- assertEquals(0L, manifest.durationMs());
+ assertEquals(10L, manifest.durationMs());
assertEquals(
- "13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
+ "13 records were generated in 10 ms across 1 batches. The average batch size was 13.00 and the average batch commit duration was 0.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
@@ -79,7 +80,7 @@ public void testManyBatch() {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)
- ));
+ ), 20_000_000);
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
@@ -88,14 +89,16 @@ public void testManyBatch() {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
- ));
+ ), 20_000_000);
manifestBuilder.acceptBatch(Collections.singletonList(
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
- ));
+ ), 5_000_000);
+ time.sleep(60);
MigrationManifest manifest = manifestBuilder.build();
- assertEquals(0L, manifest.durationMs());
+ assertEquals(60L, manifest.durationMs());
assertEquals(
- "13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
+ "13 records were generated in 60 ms across 3 batches. The average batch size was 4.33 and " +
+ "the average batch commit duration was 15.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
From 153b0e09594272d9724b2725284c80cd6aceb7c1 Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Thu, 14 Dec 2023 10:55:19 -0500
Subject: [PATCH 2/9] add some tests
---
.../migration/BufferingBatchConsumer.java | 10 +-
.../migration/KRaftMigrationDriver.java | 9 +-
.../migration/BufferingBatchConsumerTest.java | 103 ++++++++++++++++++
.../migration/CapturingMigrationClient.java | 33 +++++-
.../CapturingTopicMigrationClient.java | 2 +-
.../migration/KRaftMigrationDriverTest.java | 97 +++++++++++++++--
.../migration/MigrationManifestTest.java | 2 +-
7 files changed, 234 insertions(+), 22 deletions(-)
create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
index 014313101ac5..a394315e2781 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -29,20 +29,20 @@
* will not be broken apart, only combined with other batches to reach the minimum batch size. Note that
* {@link #close()} must be called after the last batch has been accepted in order to flush any buffered records.
*/
-public class BufferingBatchConsumer implements Consumer> {
+public class BufferingBatchConsumer implements Consumer> {
- private final Consumer> delegateConsumer;
- private final List bufferedBatch;
+ private final Consumer> delegateConsumer;
+ private final List bufferedBatch;
private final int minBatchSize;
- BufferingBatchConsumer(Consumer> delegateConsumer, int minBatchSize) {
+ BufferingBatchConsumer(Consumer> delegateConsumer, int minBatchSize) {
this.delegateConsumer = delegateConsumer;
this.bufferedBatch = new ArrayList<>(minBatchSize);
this.minBatchSize = minBatchSize;
}
@Override
- public void accept(List apiMessageAndVersions) {
+ public void accept(List apiMessageAndVersions) {
bufferedBatch.addAll(apiMessageAndVersions);
if (bufferedBatch.size() >= minBatchSize) {
delegateConsumer.accept(new ArrayList<>(bufferedBatch));
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 1e37fffaaa60..1ef10e4d5526 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -34,6 +34,7 @@
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.Deadline;
import org.apache.kafka.server.util.FutureUtils;
@@ -87,9 +88,9 @@ public long nextPollTimeMs() {
* amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from
* blocking indefinitely.
*/
- private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
+ final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
- private final static int MIGRATION_MIN_BATCH_SIZE = 1_000;
+ final static int MIGRATION_MIN_BATCH_SIZE = 1_000;
private final Time time;
private final Logger log;
@@ -650,11 +651,11 @@ public void run() throws Exception {
private BufferingBatchConsumer buildMigrationBatchConsumer(
MigrationManifest.Builder manifestBuilder
) {
- return new BufferingBatchConsumer(batch -> {
+ return new BufferingBatchConsumer(batch -> {
try {
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
- log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
+ log.trace(recordRedactor.toLoggableString(apiMessageAndVersion.message())));
}
CompletableFuture> future = zkRecordConsumer.acceptBatch(batch);
long batchStart = time.nanoseconds();
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
new file mode 100644
index 000000000000..fd1f0203b4f9
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
@@ -0,0 +1,103 @@
+package org.apache.kafka.metadata.migration;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BufferingBatchConsumerTest {
+
+ @Test
+ public void testEmptyBatches() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Collections.emptyList());
+ consumer.accept(Collections.emptyList());
+ assertEquals(batches.size(), 0);
+ consumer.close();
+ assertEquals(batches.size(), 0);
+ }
+
+ @Test
+ public void testOneBatchSameAsMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Arrays.asList(1, 2, 3, 4));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
+ consumer.close();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
+ }
+
+ @Test
+ public void testOneBatchSmallerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Arrays.asList(1, 2, 3));
+ assertEquals(batches.size(), 0);
+ consumer.close();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3));
+ }
+
+ @Test
+ public void testOneBatchLargerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
+ consumer.accept(Arrays.asList(1, 2, 3, 4, 5));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
+ consumer.close();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
+ }
+
+ @Test
+ public void testMultiBatchSameAsMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6);
+ consumer.accept(Arrays.asList(1, 2));
+ consumer.accept(Arrays.asList(3, 4));
+ consumer.accept(Arrays.asList(5, 6));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ consumer.close();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ }
+
+ @Test
+ public void testMultiBatchSmallerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6);
+ consumer.accept(Arrays.asList(1, 2));
+ consumer.accept(Arrays.asList(3, 4));
+ consumer.accept(Collections.singletonList(5));
+ assertEquals(batches.size(), 0);
+ consumer.close();
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
+ }
+
+ @Test
+ public void testMultiBatchLargerThanMinSize() {
+ List> batches = new ArrayList<>();
+ BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 6);
+ consumer.accept(Arrays.asList(1, 2));
+ consumer.accept(Arrays.asList(3, 4));
+ consumer.accept(Arrays.asList(5, 6));
+ consumer.accept(Arrays.asList(7, 8));
+ consumer.accept(Arrays.asList(9, 10));
+ assertEquals(batches.size(), 1);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ consumer.close();
+ assertEquals(batches.size(), 2);
+ assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
+ assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10));
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
index 02dfd540c7cd..503236cf266f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
@@ -30,6 +30,20 @@
class CapturingMigrationClient implements MigrationClient {
+ static final MigrationBatchSupplier EMPTY_BATCH_SUPPLIER = new MigrationBatchSupplier() {
+
+ };
+
+ interface MigrationBatchSupplier {
+ default List> recordBatches() {
+ return Collections.emptyList();
+ }
+
+ default List brokerIds() {
+ return Collections.emptyList();
+ }
+ }
+
static Builder newBuilder() {
return new Builder();
}
@@ -40,6 +54,8 @@ public static class Builder {
ConfigMigrationClient configMigrationClient = new CapturingConfigMigrationClient();
AclMigrationClient aclMigrationClient = new CapturingAclMigrationClient();
DelegationTokenMigrationClient delegationTokenMigrationClient = new CapturingDelegationTokenMigrationClient();
+ MigrationBatchSupplier batchSupplier = EMPTY_BATCH_SUPPLIER;
+
public Builder setBrokersInZk(int... brokerIds) {
brokersInZk = IntStream.of(brokerIds).boxed().collect(Collectors.toSet());
@@ -66,13 +82,19 @@ public Builder setDelegationTokenMigrationClient(DelegationTokenMigrationClient
return this;
}
+ public Builder setBatchSupplier(MigrationBatchSupplier batchSupplier) {
+ this.batchSupplier = batchSupplier;
+ return this;
+ }
+
public CapturingMigrationClient build() {
return new CapturingMigrationClient(
brokersInZk,
topicMigrationClient,
configMigrationClient,
aclMigrationClient,
- delegationTokenMigrationClient
+ delegationTokenMigrationClient,
+ batchSupplier
);
}
}
@@ -82,7 +104,7 @@ public CapturingMigrationClient build() {
private final ConfigMigrationClient configMigrationClient;
private final AclMigrationClient aclMigrationClient;
private final DelegationTokenMigrationClient delegationTokenMigrationClient;
-
+ private final MigrationBatchSupplier batchSupplier;
private ZkMigrationLeadershipState state = null;
CapturingMigrationClient(
@@ -90,13 +112,15 @@ public CapturingMigrationClient build() {
TopicMigrationClient topicMigrationClient,
ConfigMigrationClient configMigrationClient,
AclMigrationClient aclMigrationClient,
- DelegationTokenMigrationClient delegationTokenMigrationClient
+ DelegationTokenMigrationClient delegationTokenMigrationClient,
+ MigrationBatchSupplier batchSupplier
) {
this.brokerIds = brokerIdsInZk;
this.topicMigrationClient = topicMigrationClient;
this.configMigrationClient = configMigrationClient;
this.aclMigrationClient = aclMigrationClient;
this.delegationTokenMigrationClient = delegationTokenMigrationClient;
+ this.batchSupplier = batchSupplier;
}
@Override
@@ -165,7 +189,8 @@ public void readAllMetadata(
Consumer> batchConsumer,
Consumer brokerIdConsumer
) {
-
+ batchSupplier.recordBatches().forEach(batchConsumer);
+ batchSupplier.brokerIds().forEach(brokerIdConsumer);
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
index 8b8e5acc5f35..e3b7cede7fc4 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
@@ -48,7 +48,7 @@ public void reset() {
@Override
public void iterateTopics(EnumSet interests, TopicVisitor visitor) {
-
+
}
@Override
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index f6a57d1da99c..2b29574ab863 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -18,11 +18,13 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
@@ -53,8 +55,11 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
@@ -72,6 +77,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -305,7 +311,8 @@ public void testMigrationWithClientException(boolean authException) throws Excep
new CapturingTopicMigrationClient(),
new CapturingConfigMigrationClient(),
new CapturingAclMigrationClient(),
- new CapturingDelegationTokenMigrationClient()) {
+ new CapturingDelegationTokenMigrationClient(),
+ CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) {
@Override
public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
if (claimLeaderAttempts.getCount() == 0) {
@@ -447,11 +454,8 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate(
@Test
public void testSkipWaitForBrokersInDualWrite() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
- CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(),
- new CapturingTopicMigrationClient(),
- new CapturingConfigMigrationClient(),
- new CapturingAclMigrationClient(),
- new CapturingDelegationTokenMigrationClient());
+ CapturingMigrationClient.newBuilder().build();
+ CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
@@ -761,7 +765,7 @@ public CompletableFuture> beginMigration() {
};
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build();
- MockFaultHandler faultHandler = new MockFaultHandler("testTwoMigrateMetadataEvents");
+ MockFaultHandler faultHandler = new MockFaultHandler("testBeginMigrationOnce");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setZkRecordConsumer(recordConsumer)
@@ -795,4 +799,83 @@ public CompletableFuture> beginMigration() {
assertEquals(1, migrationBeginCalls.get());
}
}
+
+ private List fillBatch(int size) {
+ ApiMessageAndVersion[] batch = new ApiMessageAndVersion[size];
+ Arrays.fill(batch, new ApiMessageAndVersion(new TopicRecord().setName("topic-fill").setTopicId(Uuid.randomUuid()), (short) 0));
+ return Arrays.asList(batch);
+ }
+
+ static Stream batchSizes() {
+ return Stream.of(
+ Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0),
+ Arguments.of(Arrays.asList(0, 0, 1, 0), 1, 1),
+ Arguments.of(Arrays.asList(1, 1, 1, 1), 1, 4),
+ Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE - 1), 1, 999),
+ Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 1, 1000),
+ Arguments.of(Collections.singletonList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE + 1), 1, 1001),
+ Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, 1), 2, 1001),
+ Arguments.of(Arrays.asList(0, 0, 0, 0), 0, 0),
+ Arguments.of(Arrays.asList(KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE, KRaftMigrationDriver.MIGRATION_MIN_BATCH_SIZE), 3, 3000)
+ );
+ }
+ @ParameterizedTest
+ @MethodSource("batchSizes")
+ public void testCoalesceMigrationRecords(List batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception {
+ List> batchesPassedToController = new ArrayList<>();
+ NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
+ @Override
+ public CompletableFuture> acceptBatch(List recordBatch) {
+ batchesPassedToController.add(recordBatch);
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+ CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+ .setBrokersInZk(1, 2, 3)
+ .setBatchSupplier(new CapturingMigrationClient.MigrationBatchSupplier() {
+ @Override
+ public List> recordBatches() {
+ List> batches = new ArrayList<>();
+ for (int batchSize : batchSizes) {
+ batches.add(fillBatch(batchSize));
+ }
+ return batches;
+ }
+ })
+ .build();
+ MockFaultHandler faultHandler = new MockFaultHandler("testRebatchMigrationRecords");
+
+ KRaftMigrationDriver.Builder builder = defaultTestBuilder()
+ .setZkMigrationClient(migrationClient)
+ .setZkRecordConsumer(recordConsumer)
+ .setPropagator(metadataPropagator)
+ .setFaultHandler(faultHandler);
+ try (KRaftMigrationDriver driver = builder.build()) {
+ MetadataImage image = MetadataImage.EMPTY;
+ MetadataDelta delta = new MetadataDelta(image);
+
+ driver.start();
+ setupDeltaForMigration(delta, true);
+ delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+ delta.replay(zkBrokerRecord(1));
+ delta.replay(zkBrokerRecord(2));
+ delta.replay(zkBrokerRecord(3));
+ MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+ image = delta.apply(provenance);
+
+ driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
+
+ driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
+ new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
+ driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
+ new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
+
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+ "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+
+ assertEquals(expectedBatchCount, batchesPassedToController.size());
+ assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum());
+ }
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
index ce4f5a84862f..165cae2f1650 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
@@ -38,7 +38,7 @@ public void testEmpty() {
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
- "0 records were generated in 0 ms across 0 batches. The record types were {}",
+ "0 records were generated in 0 ms across 0 batches. The average batch size was -1.00 and the average batch commit duration was -1.00 ms. The record types were {}",
manifest.toString());
}
From 0f13561cd52db3a724ebb30b0d16c7434e00db58 Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Thu, 14 Dec 2023 13:26:45 -0500
Subject: [PATCH 3/9] add license
---
.../migration/BufferingBatchConsumer.java | 2 --
.../migration/BufferingBatchConsumerTest.java | 17 +++++++++++++++++
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
index a394315e2781..80df2573b3c9 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -17,8 +17,6 @@
package org.apache.kafka.metadata.migration;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
index fd1f0203b4f9..4caedec20e77 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kafka.metadata.migration;
import org.junit.jupiter.api.Test;
From 7ef3f82ebb5a1499846e1808e20ade75b1655e6c Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Thu, 14 Dec 2023 13:30:23 -0500
Subject: [PATCH 4/9] fix typing
---
.../kafka/metadata/migration/KRaftMigrationDriver.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 1ef10e4d5526..8f257b4c1ae6 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -648,10 +648,10 @@ public void run() throws Exception {
}
}
- private BufferingBatchConsumer buildMigrationBatchConsumer(
+ private BufferingBatchConsumer buildMigrationBatchConsumer(
MigrationManifest.Builder manifestBuilder
) {
- return new BufferingBatchConsumer(batch -> {
+ return new BufferingBatchConsumer<>(batch -> {
try {
if (log.isTraceEnabled()) {
batch.forEach(apiMessageAndVersion ->
@@ -690,7 +690,7 @@ public void run() throws Exception {
super.handleException(t);
}
try {
- BufferingBatchConsumer migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
+ BufferingBatchConsumer migrationBatchConsumer = buildMigrationBatchConsumer(manifestBuilder);
zkMigrationClient.readAllMetadata(
migrationBatchConsumer,
brokersInMetadata::add
From 067b46eec224a79cbc26604eb8c935260bc41b9c Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Thu, 14 Dec 2023 18:39:38 -0500
Subject: [PATCH 5/9] clarify javadoc
---
.../metadata/migration/BufferingBatchConsumer.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
index 80df2573b3c9..6e2a62abe8f0 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -22,10 +22,12 @@
import java.util.function.Consumer;
/**
- * A record batch consumer that re-batches incoming batches into a given size. It does so by buffering
- * the records into an array that is later flushed to a downstream consumer. Batches consumed by this class
- * will not be broken apart, only combined with other batches to reach the minimum batch size. Note that
- * {@link #close()} must be called after the last batch has been accepted in order to flush any buffered records.
+ * A record batch consumer that merges incoming batches into batches of a minimum a given size. It does so
+ * by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed
+ * by this class will not be broken apart, only combined with other batches to reach the minimum batch size.
+ *
+ * Note that {@link #close()} must be called after the last batch has been accepted in order to flush any
+ * buffered records.
*/
public class BufferingBatchConsumer implements Consumer> {
From fa25bc9c263abf35f6d6305f0190524e54a008c6 Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Thu, 14 Dec 2023 20:12:16 -0500
Subject: [PATCH 6/9] rename variable
---
.../kafka/metadata/migration/BufferingBatchConsumer.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
index 6e2a62abe8f0..2a99c597f538 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -42,8 +42,8 @@ public class BufferingBatchConsumer implements Consumer> {
}
@Override
- public void accept(List apiMessageAndVersions) {
- bufferedBatch.addAll(apiMessageAndVersions);
+ public void accept(List batch) {
+ bufferedBatch.addAll(batch);
if (bufferedBatch.size() >= minBatchSize) {
delegateConsumer.accept(new ArrayList<>(bufferedBatch));
bufferedBatch.clear();
From 2aee7595cd755b6753a4b3ab4d1c5b7181b6bd3d Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Thu, 14 Dec 2023 21:17:13 -0500
Subject: [PATCH 7/9] remove avg batch size
---
.../metadata/migration/MigrationManifest.java | 23 ++++---------------
.../migration/MigrationManifestTest.java | 8 +++----
2 files changed, 9 insertions(+), 22 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
index 2934222be7b1..39dae38d52cb 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java
@@ -42,7 +42,6 @@ public static class Builder {
private int batches = 0;
private int total = 0;
private long batchDurationsNs = 0;
- private long batchSizes = 0;
private long endTimeNanos = 0;
Builder(Time time) {
@@ -53,7 +52,6 @@ public static class Builder {
public void acceptBatch(List recordBatch, long durationNs) {
batches++;
batchDurationsNs += durationNs;
- batchSizes += recordBatch.size();
recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, Integer::sum);
@@ -66,14 +64,13 @@ public MigrationManifest build() {
endTimeNanos = time.nanoseconds();
}
Map orderedCounts = new TreeMap<>(counts);
- return new MigrationManifest(total, batches, batchDurationsNs, batchSizes, endTimeNanos - startTimeNanos, orderedCounts);
+ return new MigrationManifest(total, batches, batchDurationsNs, endTimeNanos - startTimeNanos, orderedCounts);
}
}
private final int totalRecords;
private final int totalBatches;
private final long totalBatchDurationsNs;
- private final long totalBatchSizes;
private final long durationNanos;
private final Map recordTypeCounts;
@@ -81,14 +78,12 @@ public MigrationManifest build() {
int totalRecords,
int totalBatches,
long totalBatchDurationsNs,
- long totalBatchSizes,
long durationNanos,
Map recordTypeCounts
) {
this.totalRecords = totalRecords;
this.totalBatches = totalBatches;
this.totalBatchDurationsNs = totalBatchDurationsNs;
- this.totalBatchSizes = totalBatchSizes;
this.durationNanos = durationNanos;
this.recordTypeCounts = Collections.unmodifiableMap(recordTypeCounts);
}
@@ -108,13 +103,6 @@ public double avgBatchDurationMs() {
return 1.0 * TimeUnit.NANOSECONDS.toMillis(totalBatchDurationsNs) / totalBatches;
}
- public double avgBatchSize() {
- if (totalBatches == 0) {
- return -1;
- }
- return 1.0 * totalBatchSizes / totalBatches;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -123,20 +111,19 @@ public boolean equals(Object o) {
return totalRecords == that.totalRecords &&
totalBatches == that.totalBatches &&
totalBatchDurationsNs == that.totalBatchDurationsNs &&
- totalBatchSizes == that.totalBatchSizes &&
durationNanos == that.durationNanos &&
recordTypeCounts.equals(that.recordTypeCounts);
}
@Override
public int hashCode() {
- return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, totalBatchSizes, durationNanos, recordTypeCounts);
+ return Objects.hash(totalRecords, totalBatches, totalBatchDurationsNs, durationNanos, recordTypeCounts);
}
public String toString() {
return String.format(
- "%d records were generated in %d ms across %d batches. The average batch size was %.2f " +
- "and the average batch commit duration was %.2f ms. The record types were %s",
- totalRecords, durationMs(), totalBatches, avgBatchSize(), avgBatchDurationMs(), recordTypeCounts);
+ "%d records were generated in %d ms across %d batches. The average time spent waiting on a " +
+ "batch was %.2f ms. The record types were %s",
+ totalRecords, durationMs(), totalBatches, avgBatchDurationMs(), recordTypeCounts);
}
}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
index 165cae2f1650..711073e364a4 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java
@@ -38,7 +38,7 @@ public void testEmpty() {
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
- "0 records were generated in 0 ms across 0 batches. The average batch size was -1.00 and the average batch commit duration was -1.00 ms. The record types were {}",
+ "0 records were generated in 0 ms across 0 batches. The average time spent waiting on a batch was -1.00 ms. The record types were {}",
manifest.toString());
}
@@ -65,7 +65,7 @@ public void testOneBatch() {
MigrationManifest manifest = manifestBuilder.build();
assertEquals(10L, manifest.durationMs());
assertEquals(
- "13 records were generated in 10 ms across 1 batches. The average batch size was 13.00 and the average batch commit duration was 0.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
+ "13 records were generated in 10 ms across 1 batches. The average time spent waiting on a batch was 0.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
@@ -97,8 +97,8 @@ public void testManyBatch() {
MigrationManifest manifest = manifestBuilder.build();
assertEquals(60L, manifest.durationMs());
assertEquals(
- "13 records were generated in 60 ms across 3 batches. The average batch size was 4.33 and " +
- "the average batch commit duration was 15.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
+ "13 records were generated in 60 ms across 3 batches. The average time spent waiting on a " +
+ "batch was 15.00 ms. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
From 141d979547ce0b7c3b8be4f32272cc5e9042a3c7 Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Fri, 15 Dec 2023 08:34:52 -0500
Subject: [PATCH 8/9] rename close to flush
---
.../metadata/migration/BufferingBatchConsumer.java | 4 ++--
.../metadata/migration/KRaftMigrationDriver.java | 2 +-
.../migration/BufferingBatchConsumerTest.java | 14 +++++++-------
3 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
index 2a99c597f538..c22a5e54f94c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -26,7 +26,7 @@
* by buffering the records into an array that is later flushed to a downstream consumer. Batches consumed
* by this class will not be broken apart, only combined with other batches to reach the minimum batch size.
*
- * Note that {@link #close()} must be called after the last batch has been accepted in order to flush any
+ * Note that {@link #flush()} must be called after the last batch has been accepted in order to flush any
* buffered records.
*/
public class BufferingBatchConsumer implements Consumer> {
@@ -50,7 +50,7 @@ public void accept(List batch) {
}
}
- public void close() {
+ public void flush() {
if (!bufferedBatch.isEmpty()) {
delegateConsumer.accept(new ArrayList<>(bufferedBatch));
bufferedBatch.clear();
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 8f257b4c1ae6..156515f7be81 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -695,7 +695,7 @@ public void run() throws Exception {
migrationBatchConsumer,
brokersInMetadata::add
);
- migrationBatchConsumer.close();
+ migrationBatchConsumer.flush();
CompletableFuture completeMigrationFuture = zkRecordConsumer.completeMigration();
OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
KRaftMigrationDriver.this.log, "",
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
index 4caedec20e77..819920573567 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/BufferingBatchConsumerTest.java
@@ -35,7 +35,7 @@ public void testEmptyBatches() {
consumer.accept(Collections.emptyList());
consumer.accept(Collections.emptyList());
assertEquals(batches.size(), 0);
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 0);
}
@@ -46,7 +46,7 @@ public void testOneBatchSameAsMinSize() {
consumer.accept(Arrays.asList(1, 2, 3, 4));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4));
}
@@ -57,7 +57,7 @@ public void testOneBatchSmallerThanMinSize() {
BufferingBatchConsumer consumer = new BufferingBatchConsumer<>(batches::add, 4);
consumer.accept(Arrays.asList(1, 2, 3));
assertEquals(batches.size(), 0);
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3));
}
@@ -69,7 +69,7 @@ public void testOneBatchLargerThanMinSize() {
consumer.accept(Arrays.asList(1, 2, 3, 4, 5));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}
@@ -83,7 +83,7 @@ public void testMultiBatchSameAsMinSize() {
consumer.accept(Arrays.asList(5, 6));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
}
@@ -96,7 +96,7 @@ public void testMultiBatchSmallerThanMinSize() {
consumer.accept(Arrays.asList(3, 4));
consumer.accept(Collections.singletonList(5));
assertEquals(batches.size(), 0);
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5));
}
@@ -112,7 +112,7 @@ public void testMultiBatchLargerThanMinSize() {
consumer.accept(Arrays.asList(9, 10));
assertEquals(batches.size(), 1);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
- consumer.close();
+ consumer.flush();
assertEquals(batches.size(), 2);
assertEquals(batches.get(0), Arrays.asList(1, 2, 3, 4, 5, 6));
assertEquals(batches.get(1), Arrays.asList(7, 8, 9, 10));
From c56c3f055b4499908f04088f37fddfc819175b60 Mon Sep 17 00:00:00 2001
From: David Arthur
Date: Fri, 15 Dec 2023 14:58:33 -0500
Subject: [PATCH 9/9] PR feedback
---
.../metadata/migration/BufferingBatchConsumer.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
index c22a5e54f94c..ed8a8236aaf0 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/BufferingBatchConsumer.java
@@ -32,28 +32,27 @@
public class BufferingBatchConsumer implements Consumer> {
private final Consumer> delegateConsumer;
- private final List bufferedBatch;
private final int minBatchSize;
+ private List bufferedBatch;
BufferingBatchConsumer(Consumer> delegateConsumer, int minBatchSize) {
this.delegateConsumer = delegateConsumer;
- this.bufferedBatch = new ArrayList<>(minBatchSize);
this.minBatchSize = minBatchSize;
+ this.bufferedBatch = new ArrayList<>(minBatchSize);
}
@Override
public void accept(List batch) {
bufferedBatch.addAll(batch);
if (bufferedBatch.size() >= minBatchSize) {
- delegateConsumer.accept(new ArrayList<>(bufferedBatch));
- bufferedBatch.clear();
+ flush();
}
}
public void flush() {
if (!bufferedBatch.isEmpty()) {
- delegateConsumer.accept(new ArrayList<>(bufferedBatch));
- bufferedBatch.clear();
+ delegateConsumer.accept(bufferedBatch);
+ bufferedBatch = new ArrayList<>(minBatchSize);
}
}
}