From 365308b52da8ff7fec61d2027b36740acc68b0c2 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 6 Sep 2023 13:02:13 -0400 Subject: [PATCH] KAFKA-15435 Fix counts in MigrationManifest (#14342) Reviewers: Liu Zeyu , Colin P. McCabe --- .../migration/KRaftMigrationDriver.java | 7 +- .../metadata/migration/MigrationManifest.java | 6 +- .../migration/MigrationManifestTest.java | 102 ++++++++++++++++++ 3 files changed, 110 insertions(+), 5 deletions(-) create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 313c1ce1ec1e..a5aca03126b6 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -718,14 +718,15 @@ public void run() throws Exception { // Ignore sending RPCs to the brokers since we're no longer in the state. if (checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) { if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) { - log.trace("Sending RPCs to broker before moving to dual-write mode using " + + log.info("Sending RPCs to broker before moving to dual-write mode using " + "at offset and epoch {}", image.highestOffsetAndEpoch()); propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); // Migration leadership state doesn't change since we're not doing any Zk writes. transitionTo(MigrationDriverState.DUAL_WRITE); } else { - log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}", - migrationLeadershipState.offsetAndEpoch()); + log.info("Not sending metadata RPCs with current metadata image since does not contain the offset " + + "that was last written to ZK during the migration. Image offset {} is less than migration " + + "leadership state offset {}", image.highestOffsetAndEpoch(), migrationLeadershipState.offsetAndEpoch()); } } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java index 7d174b15e681..fc31fb416d35 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; /** @@ -51,7 +52,7 @@ public void acceptBatch(List recordBatch) { batches++; recordBatch.forEach(apiMessageAndVersion -> { MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey()); - counts.merge(type, 1, (__, count) -> count + 1); + counts.merge(type, 1, Integer::sum); total++; }); } @@ -60,7 +61,8 @@ public MigrationManifest build() { if (endTimeNanos == 0) { endTimeNanos = time.nanoseconds(); } - return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts); + Map orderedCounts = new TreeMap<>(counts); + return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts); } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java new file mode 100644 index 000000000000..63c548ec4323 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/MigrationManifestTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MigrationManifestTest { + @Test + public void testEmpty() { + Time time = new MockTime(); + MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time); + MigrationManifest manifest = manifestBuilder.build(); + assertEquals(0L, manifest.durationMs()); + assertEquals( + "0 records were generated in 0 ms across 0 batches. The record types were {}", + manifest.toString()); + } + + @Test + public void testOneBatch() { + Time time = new MockTime(); + MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time); + manifestBuilder.acceptBatch(Arrays.asList( + new ApiMessageAndVersion(new TopicRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new TopicRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new ConfigRecord(), (short) 0), + new ApiMessageAndVersion(new ConfigRecord(), (short) 0) + )); + MigrationManifest manifest = manifestBuilder.build(); + assertEquals(0L, manifest.durationMs()); + assertEquals( + "13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", + manifest.toString() + ); + } + + @Test + public void testManyBatch() { + Time time = new MockTime(); + MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time); + manifestBuilder.acceptBatch(Arrays.asList( + new ApiMessageAndVersion(new TopicRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0) + )); + manifestBuilder.acceptBatch(Arrays.asList( + new ApiMessageAndVersion(new TopicRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new PartitionRecord(), (short) 0), + new ApiMessageAndVersion(new ConfigRecord(), (short) 0) + )); + manifestBuilder.acceptBatch(Collections.singletonList( + new ApiMessageAndVersion(new ConfigRecord(), (short) 0) + )); + MigrationManifest manifest = manifestBuilder.build(); + assertEquals(0L, manifest.durationMs()); + assertEquals( + "13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}", + manifest.toString() + ); + } +}