Skip to content

Commit

Permalink
KAFKA-15435 Fix counts in MigrationManifest (#14342)
Browse files Browse the repository at this point in the history
Reviewers: Liu Zeyu <zeyu.luke@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
mumrah committed Sep 8, 2023
1 parent 99bc91b commit 365308b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 5 deletions.
Expand Up @@ -718,14 +718,15 @@ public void run() throws Exception {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
log.info("Sending RPCs to broker before moving to dual-write mode using " +
"at offset and epoch {}", image.highestOffsetAndEpoch());
propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch());
// Migration leadership state doesn't change since we're not doing any Zk writes.
transitionTo(MigrationDriverState.DUAL_WRITE);
} else {
log.trace("Ignoring using metadata image since migration leadership state is at a greater offset and epoch {}",
migrationLeadershipState.offsetAndEpoch());
log.info("Not sending metadata RPCs with current metadata image since does not contain the offset " +
"that was last written to ZK during the migration. Image offset {} is less than migration " +
"leadership state offset {}", image.highestOffsetAndEpoch(), migrationLeadershipState.offsetAndEpoch());
}
}
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -51,7 +52,7 @@ public void acceptBatch(List<ApiMessageAndVersion> recordBatch) {
batches++;
recordBatch.forEach(apiMessageAndVersion -> {
MetadataRecordType type = MetadataRecordType.fromId(apiMessageAndVersion.message().apiKey());
counts.merge(type, 1, (__, count) -> count + 1);
counts.merge(type, 1, Integer::sum);
total++;
});
}
Expand All @@ -60,7 +61,8 @@ public MigrationManifest build() {
if (endTimeNanos == 0) {
endTimeNanos = time.nanoseconds();
}
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts);
Map<MetadataRecordType, Integer> orderedCounts = new TreeMap<>(counts);
return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, orderedCounts);
}
}

Expand Down
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata.migration;

import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class MigrationManifestTest {
@Test
public void testEmpty() {
Time time = new MockTime();
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
"0 records were generated in 0 ms across 0 batches. The record types were {}",
manifest.toString());
}

@Test
public void testOneBatch() {
Time time = new MockTime();
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
));
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
"13 records were generated in 0 ms across 1 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}

@Test
public void testManyBatch() {
Time time = new MockTime();
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)
));
manifestBuilder.acceptBatch(Arrays.asList(
new ApiMessageAndVersion(new TopicRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new PartitionRecord(), (short) 0),
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
));
manifestBuilder.acceptBatch(Collections.singletonList(
new ApiMessageAndVersion(new ConfigRecord(), (short) 0)
));
MigrationManifest manifest = manifestBuilder.build();
assertEquals(0L, manifest.durationMs());
assertEquals(
"13 records were generated in 0 ms across 3 batches. The record types were {TOPIC_RECORD=2, PARTITION_RECORD=9, CONFIG_RECORD=2}",
manifest.toString()
);
}
}

0 comments on commit 365308b

Please sign in to comment.