From 23c39a171e07a5d8172c874b6fb165b04dcb592b Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 24 Apr 2023 17:31:20 +0800 Subject: [PATCH 1/8] KAFKA-14909: verify all nodes are zkMigrationReady before transition to next state --- .../java/org/apache/kafka/clients/ApiVersions.java | 4 ++++ .../java/org/apache/kafka/clients/NetworkClient.java | 5 +++-- .../org/apache/kafka/clients/NodeApiVersions.java | 11 +++++++++-- config/kraft/controller.properties | 4 ++-- .../main/scala/kafka/server/ControllerServer.scala | 3 ++- .../metadata/migration/KRaftMigrationDriver.java | 12 ++++++++---- 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java index a09d58166b36..92aa153de6fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java @@ -64,4 +64,8 @@ public synchronized byte maxUsableProduceMagic() { return maxUsableProduceMagic; } + // check all nodes are ZK Migration ready + public boolean isAllNodeZkMigrationReady() { + return nodeApiVersions.values().stream().allMatch(ver -> ver.zkMigrationEnabled()); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 433440ce9332..d1ddf84472fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -956,10 +956,11 @@ private void handleApiVersionsResponse(List responses, } NodeApiVersions nodeVersionInfo = new NodeApiVersions( apiVersionsResponse.data().apiKeys(), - apiVersionsResponse.data().supportedFeatures()); + apiVersionsResponse.data().supportedFeatures(), + apiVersionsResponse.data().zkMigrationReady()); apiVersions.update(node, nodeVersionInfo); this.connectionStates.ready(node); - log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.", + log.info("!!! Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(), apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index a3aaa88fee19..44c1fe4d8159 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -48,6 +48,8 @@ public class NodeApiVersions { private final Map supportedFeatures; + private final boolean zkMigrationEnabled; + /** * Create a NodeApiVersions object with the current ApiVersions. * @@ -76,7 +78,7 @@ public static NodeApiVersions create(Collection overrides) { } if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey)); } - return new NodeApiVersions(apiVersions, Collections.emptyList()); + return new NodeApiVersions(apiVersions, Collections.emptyList(), false); } @@ -95,7 +97,7 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe .setMaxVersion(maxVersion))); } - public NodeApiVersions(Collection nodeApiVersions, Collection nodeSupportedFeatures) { + public NodeApiVersions(Collection nodeApiVersions, Collection nodeSupportedFeatures, boolean zkMigrationEnabled) { for (ApiVersion nodeApiVersion : nodeApiVersions) { if (ApiKeys.hasId(nodeApiVersion.apiKey())) { ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey()); @@ -112,6 +114,7 @@ public NodeApiVersions(Collection nodeApiVersions, Collection allSupportedApiVersions() { public Map supportedFeatures() { return supportedFeatures; } + + public boolean zkMigrationEnabled() { + return zkMigrationEnabled; + } } diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index 9e8ad62054e2..a465cce8c970 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -27,8 +27,8 @@ process.roles=controller node.id=1 # The connect string for the controller quorum -controller.quorum.voters=1@localhost:9093 - +#controller.quorum.voters=1@localhost:9093 +controller.quorum.voters=1@localhost:9093,4@localhost:9094,5@localhost:9095 ############################# Socket Server Settings ############################# # The address the socket server listens on. diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 022fd9d899ab..1308795f77c6 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -270,7 +270,8 @@ class ControllerServer( "zk migration", fatal = false, () => {} - ) + ), + sharedServer.raftManager.apiVersions ) migrationDriver.start() migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator)) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 1534e1b9509e..96066ffc99b6 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.metadata.migration; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.MetadataRecordType; @@ -81,6 +83,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { private volatile MigrationDriverState migrationState; private volatile ZkMigrationLeadershipState migrationLeadershipState; private volatile MetadataImage image; + private volatile ApiVersions apiVersions; public KRaftMigrationDriver( int nodeId, @@ -88,7 +91,8 @@ public KRaftMigrationDriver( MigrationClient zkMigrationClient, LegacyPropagator propagator, Consumer initialZkLoadHandler, - FaultHandler faultHandler + FaultHandler faultHandler, + ApiVersions apiVersions ) { this.nodeId = nodeId; this.zkRecordConsumer = zkRecordConsumer; @@ -104,6 +108,7 @@ public KRaftMigrationDriver( this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN; this.initialZkLoadHandler = initialZkLoadHandler; this.faultHandler = faultHandler; + this.apiVersions = apiVersions; } public void start() { @@ -134,8 +139,7 @@ private void initializeMigrationState() { } private boolean isControllerQuorumReadyForMigration() { - // TODO implement this - return true; + return this.apiVersions.isAllNodeZkMigrationReady(); } private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set brokerIds) { @@ -383,7 +387,7 @@ public void run() throws Exception { } else { // Apply the new KRaft state apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch())); - // Before becoming the controller fo ZkBrokers, we need to make sure the + // Before becoming the controller for ZkBrokers, we need to make sure the // Controller Quorum can handle migration. transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM); } From 7f0285235821e3085e020f4dea41b4862a57bc2c Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sun, 23 Apr 2023 19:03:58 +0800 Subject: [PATCH 2/8] add zkMigrationEnabled --- .../apache/kafka/clients/NetworkClient.java | 2 +- .../common/requests/ApiVersionsResponse.java | 19 +++++++++++++------ .../clients/admin/KafkaAdminClientTest.java | 3 ++- .../requests/ApiVersionsResponseTest.java | 12 ++++++++---- .../kafka/server/ApiVersionManager.scala | 15 ++++++++++----- 5 files changed, 34 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d1ddf84472fe..603c8f5fbec2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -1004,7 +1004,7 @@ private void handleInitiateApiVersionRequests(long now) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { - log.debug("Initiating API versions fetch from node {}.", node); + log.info("!!! Initiating API versions fetch from node {}.", node); // We transition the connection to the CHECKING_API_VERSIONS state only when // the ApiVersionsRequest is queued up to be sent out. Without this, the client // could remain in the CHECKING_API_VERSIONS state forever if the channel does diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index ac4562bf8733..2c657d39c49c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -152,7 +152,8 @@ public static ApiVersionsResponse createApiVersionsResponse( apiVersions, latestSupportedFeatures, Collections.emptyMap(), - UNKNOWN_FINALIZED_FEATURES_EPOCH); + UNKNOWN_FINALIZED_FEATURES_EPOCH, + false); } public static ApiVersionsResponse createApiVersionsResponse( @@ -163,7 +164,8 @@ public static ApiVersionsResponse createApiVersionsResponse( long finalizedFeaturesEpoch, NodeApiVersions controllerApiVersions, ListenerType listenerType, - boolean enableUnstableLastVersion + boolean enableUnstableLastVersion, + boolean zkMigrationEnabled ) { ApiVersionCollection apiKeys; if (controllerApiVersions != null) { @@ -186,7 +188,8 @@ public static ApiVersionsResponse createApiVersionsResponse( apiKeys, latestSupportedFeatures, finalizedFeatures, - finalizedFeaturesEpoch + finalizedFeaturesEpoch, + zkMigrationEnabled ); } @@ -195,7 +198,8 @@ public static ApiVersionsResponse createApiVersionsResponse( ApiVersionCollection apiVersions, Features latestSupportedFeatures, Map finalizedFeatures, - long finalizedFeaturesEpoch + long finalizedFeaturesEpoch, + boolean zkMigrationEnabled ) { return new ApiVersionsResponse( createApiVersionsResponseData( @@ -204,7 +208,8 @@ public static ApiVersionsResponse createApiVersionsResponse( apiVersions, latestSupportedFeatures, finalizedFeatures, - finalizedFeaturesEpoch + finalizedFeaturesEpoch, + zkMigrationEnabled ) ); } @@ -294,7 +299,8 @@ private static ApiVersionsResponseData createApiVersionsResponseData( final ApiVersionCollection apiKeys, final Features latestSupportedFeatures, final Map finalizedFeatures, - final long finalizedFeaturesEpoch + final long finalizedFeaturesEpoch, + final boolean zkMigrationEnabled ) { final ApiVersionsResponseData data = new ApiVersionsResponseData(); data.setThrottleTimeMs(throttleTimeMs); @@ -303,6 +309,7 @@ private static ApiVersionsResponseData createApiVersionsResponseData( data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures)); data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures)); data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch); + data.setZkMigrationReady(zkMigrationEnabled); return data; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index c6f886c5d539..27ae3471f4ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -634,7 +634,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER), convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()), Collections.singletonMap("test_feature_1", (short) 2), - defaultFeatureMetadata().finalizedFeaturesEpoch().get() + defaultFeatureMetadata().finalizedFeaturesEpoch().get(), + false ); } return new ApiVersionsResponse( diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index b34304000ae6..f1931bc58823 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -121,7 +121,8 @@ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() { ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, null, ListenerType.ZK_BROKER, - true + true, + false ); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); assertEquals(10, response.throttleTimeMs()); @@ -141,7 +142,8 @@ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefault 10L, null, ListenerType.ZK_BROKER, - true + true, + false ); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); @@ -169,7 +171,8 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, null, ListenerType.ZK_BROKER, - true + true, + false ); assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response)); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); @@ -188,7 +191,8 @@ public void testMetadataQuorumApisAreDisabled() { ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, null, ListenerType.ZK_BROKER, - true + true, + false ); // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index 92e6d25cecb3..21e96b69f049 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -49,7 +49,8 @@ object ApiVersionManager { forwardingManager, supportedFeatures, metadataCache, - config.unstableApiVersionsEnabled + config.unstableApiVersionsEnabled, + config.migrationEnabled ) } } @@ -58,7 +59,8 @@ class SimpleApiVersionManager( val listenerType: ListenerType, val enabledApis: collection.Set[ApiKeys], brokerFeatures: Features[SupportedVersionRange], - val enableUnstableLastVersion: Boolean + val enableUnstableLastVersion: Boolean, + val zkMigrationEnabled: Boolean ) extends ApiVersionManager { def this( @@ -69,7 +71,8 @@ class SimpleApiVersionManager( listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures(), - enableUnstableLastVersion + enableUnstableLastVersion, + zkMigrationEnabled ) } @@ -85,7 +88,8 @@ class DefaultApiVersionManager( forwardingManager: Option[ForwardingManager], features: BrokerFeatures, metadataCache: MetadataCache, - val enableUnstableLastVersion: Boolean + val enableUnstableLastVersion: Boolean, + val zkMigrationEnabled: Boolean ) extends ApiVersionManager { val enabledApis = ApiKeys.apisForListener(listenerType).asScala @@ -103,7 +107,8 @@ class DefaultApiVersionManager( finalizedFeatures.epoch, controllerApiVersions.orNull, listenerType, - enableUnstableLastVersion + enableUnstableLastVersion, + zkMigrationEnabled ) } } From 4298fc656d1c91115a8d7fd61753adb01298b571 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 24 Apr 2023 17:46:42 +0800 Subject: [PATCH 3/8] buildable --- .../java/org/apache/kafka/clients/ApiVersions.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 6 +++--- .../kafka/common/requests/ApiVersionsResponse.java | 13 ++++++++----- .../apache/kafka/clients/NodeApiVersionsTest.java | 10 +++++----- config/kraft/controller.properties | 3 +-- .../kafka/admin/BrokerApiVersionsCommand.scala | 2 +- .../main/scala/kafka/server/ApiVersionManager.scala | 7 ++++--- .../main/scala/kafka/server/ControllerServer.scala | 3 ++- .../src/main/scala/kafka/tools/TestRaftServer.scala | 2 +- .../scala/unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/server/ControllerApisTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../jmh/metadata/KRaftMetadataRequestBenchmark.java | 2 +- .../jmh/metadata/MetadataRequestBenchmark.java | 2 +- .../metadata/migration/KRaftMigrationDriver.java | 2 +- .../apache/kafka/controller/QuorumFeaturesTest.java | 2 +- .../migration/KRaftMigrationDriverTest.java | 6 ++++-- 17 files changed, 37 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java index 92aa153de6fe..2ae7e5484a3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java @@ -64,7 +64,7 @@ public synchronized byte maxUsableProduceMagic() { return maxUsableProduceMagic; } - // check all nodes are ZK Migration ready + // check if all nodes are ZK Migration ready public boolean isAllNodeZkMigrationReady() { return nodeApiVersions.values().stream().allMatch(ver -> ver.zkMigrationEnabled()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 603c8f5fbec2..b88401c75bbc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -960,9 +960,9 @@ private void handleApiVersionsResponse(List responses, apiVersionsResponse.data().zkMigrationReady()); apiVersions.update(node, nodeVersionInfo); this.connectionStates.ready(node); - log.info("!!! Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", + log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(), - apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo); + apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo); } /** @@ -1004,7 +1004,7 @@ private void handleInitiateApiVersionRequests(long now) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { - log.info("!!! Initiating API versions fetch from node {}.", node); + log.debug("Initiating API versions fetch from node {}.", node); // We transition the connection to the CHECKING_API_VERSIONS state only when // the ApiVersionsRequest is queued up to be sent out. Without this, the client // could remain in the CHECKING_API_VERSIONS state forever if the channel does diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 2c657d39c49c..cd0a2fa219dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -119,7 +119,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse( return createApiVersionsResponse( throttleTimeMs, filterApis(RecordVersion.current(), listenerType, true), - Features.emptySupportedFeatures() + Features.emptySupportedFeatures(), + false ); } @@ -131,7 +132,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse( return createApiVersionsResponse( throttleTimeMs, filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion), - Features.emptySupportedFeatures() + Features.emptySupportedFeatures(), + false ); } @@ -139,13 +141,14 @@ public static ApiVersionsResponse createApiVersionsResponse( int throttleTimeMs, ApiVersionCollection apiVersions ) { - return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures()); + return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false); } public static ApiVersionsResponse createApiVersionsResponse( int throttleTimeMs, ApiVersionCollection apiVersions, - Features latestSupportedFeatures + Features latestSupportedFeatures, + boolean zkMigrationEnabled ) { return createApiVersionsResponse( throttleTimeMs, @@ -153,7 +156,7 @@ public static ApiVersionsResponse createApiVersionsResponse( latestSupportedFeatures, Collections.emptyMap(), UNKNOWN_FINALIZED_FEATURES_EPOCH, - false); + zkMigrationEnabled); } public static ApiVersionsResponse createApiVersionsResponse( diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index f379366ac160..6fa6d6242e2f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -39,7 +39,7 @@ public class NodeApiVersionsTest { @Test public void testUnsupportedVersionsToString() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) { @@ -68,7 +68,7 @@ public void testVersionsToString() { .setMaxVersion((short) 10001)); } else versionList.add(ApiVersionsResponse.toApiVersion(apiKey)); } - NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.values()) { @@ -125,7 +125,7 @@ public void testLatestUsableVersionOutOfRangeHigh() { @Test public void testUsableVersionCalculationNoKnownVersions() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false); assertThrows(UnsupportedVersionException.class, () -> versions.latestUsableVersion(ApiKeys.FETCH)); } @@ -147,7 +147,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) { .setApiKey((short) 100) .setMinVersion((short) 0) .setMaxVersion((short) 1)); - NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false); for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) { assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey)); } @@ -157,7 +157,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) { @EnumSource(ApiMessageType.ListenerType.class) public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) { ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope); - NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false); for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) { ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey())); diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index a465cce8c970..4c495a28c796 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -27,8 +27,7 @@ process.roles=controller node.id=1 # The connect string for the controller quorum -#controller.quorum.voters=1@localhost:9093 -controller.quorum.voters=1@localhost:9093,4@localhost:9094,5@localhost:9095 +controller.quorum.voters=1@localhost:9093 ############################# Socket Server Settings ############################# # The address the socket server listens on. diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 990ac6f9448d..c882a88cc1a0 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -158,7 +158,7 @@ object BrokerApiVersionsCommand { private def getNodeApiVersions(node: Node): NodeApiVersions = { val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse] Errors.forCode(response.data.errorCode).maybeThrow() - new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures) + new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady) } /** diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index 21e96b69f049..0204d94c976f 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -65,7 +65,8 @@ class SimpleApiVersionManager( def this( listenerType: ListenerType, - enableUnstableLastVersion: Boolean + enableUnstableLastVersion: Boolean, + zkMigrationEnabled: Boolean ) = { this( listenerType, @@ -79,7 +80,7 @@ class SimpleApiVersionManager( private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { - ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures) + ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled) } } @@ -89,7 +90,7 @@ class DefaultApiVersionManager( features: BrokerFeatures, metadataCache: MetadataCache, val enableUnstableLastVersion: Boolean, - val zkMigrationEnabled: Boolean + val zkMigrationEnabled: Boolean = false ) extends ApiVersionManager { val enabledApis = ApiKeys.apisForListener(listenerType).asScala diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 1308795f77c6..4115e72ec9d4 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -175,7 +175,8 @@ class ControllerServer( val apiVersionManager = new SimpleApiVersionManager( ListenerType.CONTROLLER, - config.unstableApiVersionsEnabled + config.unstableApiVersionsEnabled, + config.migrationEnabled ) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index e3e2a0e28262..2f480053a6a3 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -74,7 +74,7 @@ class TestRaftServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true) + val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false) socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) val metaProperties = MetaProperties( diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 28b797f7c167..5bab866de3e2 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -77,7 +77,7 @@ class SocketServerTest { // Clean-up any metrics left around by previous tests TestUtils.clearYammerMetrics() - private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true) + private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false) val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES) val sockets = new ArrayBuffer[Socket] diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index ffa56de63db3..e55c8fbaf130 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -154,7 +154,7 @@ class ControllerApisTest { new KafkaConfig(props), MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId), Seq.empty, - new SimpleApiVersionManager(ListenerType.CONTROLLER, true) + new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false) ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 69052491acc1..8007c873ccc6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -184,7 +184,7 @@ class KafkaApisTest { } else { ApiKeys.apisForListener(listenerType).asScala.toSet } - val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true) + val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false) new KafkaApis( requestChannel = requestChannel, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index 8e997385fc36..25e182b895c3 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -197,7 +197,7 @@ private KafkaApis createKafkaApis() { setClusterId("clusterId"). setTime(Time.SYSTEM). setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)). + setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 86c7ff01f412..1685105e9d11 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -199,7 +199,7 @@ private KafkaApis createKafkaApis() { setClusterId("clusterId"). setTime(Time.SYSTEM). setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)). + setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)). build(); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 96066ffc99b6..fad3a4bd9773 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -403,7 +403,7 @@ public void run() throws Exception { switch (migrationState) { case WAIT_FOR_CONTROLLER_QUORUM: if (isControllerQuorumReadyForMigration()) { - log.debug("Controller Quorum is ready for Zk to KRaft migration"); + log.info("Controller Quorum is ready for Zk to KRaft migration"); // Note that leadership would not change here. Hence we do not need to // `apply` any leadership state change. transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 7cd6e6c5cbd5..07b464f7d4c8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -89,7 +89,7 @@ private static NodeApiVersions nodeApiVersions(List> setMinVersion(entry.getValue().min()). setMaxVersion(entry.getValue().max())); }); - return new NodeApiVersions(Collections.emptyList(), features); + return new NodeApiVersions(Collections.emptyList(), features, false); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 0fb5d8c6fcb6..be12e1cc8e57 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -283,7 +283,8 @@ public void testOnlySendNeededRPCsToBrokers() throws Exception { migrationClient, metadataPropagator, metadataPublisher -> { }, - new MockFaultHandler("test") + new MockFaultHandler("test"), + null ); MetadataImage image = MetadataImage.EMPTY; @@ -365,7 +366,8 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi migrationClient, metadataPropagator, metadataPublisher -> { }, - faultHandler + faultHandler, + null )) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); From 019e27674bb86f5752954f22766ee1b6d0e52df5 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 24 Apr 2023 20:43:08 +0800 Subject: [PATCH 4/8] KAFKA-14909: add tests --- .../apache/kafka/clients/ApiVersionsTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java index 89065536435c..ee11446f402b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java @@ -24,6 +24,8 @@ import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ApiVersionsTest { @@ -55,4 +57,21 @@ public void testMaxUsableProduceMagicWithRaftController() { .setMaxVersion((short) 2)))); assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic()); } + + @Test + public void testZkMigrationReady() { + ApiVersions apiVersions = new ApiVersions(); + + apiVersions.update("0", NodeApiVersions.create()); + assertFalse(apiVersions.isAllNodeZkMigrationReady()); + + apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + assertTrue(apiVersions.isAllNodeZkMigrationReady()); + + apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + assertTrue(apiVersions.isAllNodeZkMigrationReady()); + + apiVersions.update("2", NodeApiVersions.create()); + assertFalse(apiVersions.isAllNodeZkMigrationReady()); + } } From c5f490bf99439b36ea774e95bf9b9b67c8682733 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 25 Apr 2023 13:29:28 +0800 Subject: [PATCH 5/8] refctor and add more tests --- checkstyle/import-control-metadata.xml | 3 + .../org/apache/kafka/clients/ApiVersions.java | 4 - .../apache/kafka/clients/ApiVersionsTest.java | 19 -- config/kraft/controller.properties | 1 + .../scala/kafka/server/ControllerServer.scala | 2 +- .../kafka/controller/QuorumFeatures.java | 23 +++ .../migration/KRaftMigrationDriver.java | 33 ++- .../kafka/controller/QuorumFeaturesTest.java | 26 +++ .../migration/KRaftMigrationDriverTest.java | 193 +++++++++++++----- 9 files changed, 216 insertions(+), 88 deletions(-) diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index f803edaf3b61..8ec45c5941a3 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -153,6 +153,9 @@ + + + diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java index 2ae7e5484a3f..a09d58166b36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java @@ -64,8 +64,4 @@ public synchronized byte maxUsableProduceMagic() { return maxUsableProduceMagic; } - // check if all nodes are ZK Migration ready - public boolean isAllNodeZkMigrationReady() { - return nodeApiVersions.values().stream().allMatch(ver -> ver.zkMigrationEnabled()); - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java index ee11446f402b..89065536435c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java @@ -24,8 +24,6 @@ import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; public class ApiVersionsTest { @@ -57,21 +55,4 @@ public void testMaxUsableProduceMagicWithRaftController() { .setMaxVersion((short) 2)))); assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic()); } - - @Test - public void testZkMigrationReady() { - ApiVersions apiVersions = new ApiVersions(); - - apiVersions.update("0", NodeApiVersions.create()); - assertFalse(apiVersions.isAllNodeZkMigrationReady()); - - apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); - assertTrue(apiVersions.isAllNodeZkMigrationReady()); - - apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); - assertTrue(apiVersions.isAllNodeZkMigrationReady()); - - apiVersions.update("2", NodeApiVersions.create()); - assertFalse(apiVersions.isAllNodeZkMigrationReady()); - } } diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index 4c495a28c796..9e8ad62054e2 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -28,6 +28,7 @@ node.id=1 # The connect string for the controller quorum controller.quorum.voters=1@localhost:9093 + ############################# Socket Server Settings ############################# # The address the socket server listens on. diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 4115e72ec9d4..01f76b220a7c 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -272,7 +272,7 @@ class ControllerServer( fatal = false, () => {} ), - sharedServer.raftManager.apiVersions + quorumFeatures ) migrationDriver.start() migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator)) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index 58a7aea2af39..b4a25df89fc9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -128,4 +128,27 @@ VersionRange localSupportedFeature(String featureName) { boolean isControllerId(int nodeId) { return quorumNodeIds.contains(nodeId); } + + // check if all controller nodes are ZK Migration ready + public boolean isAllControllersZkMigrationReady() { + List missingApiVers = new ArrayList<>(); + List zkMigrationNotReady = new ArrayList<>(); + for (int id : quorumNodeIds) { + NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(id)); + if (nodeVersions == null) { + missingApiVers.add(String.valueOf(id)); + } else if (!nodeVersions.zkMigrationEnabled()) { + zkMigrationNotReady.add(String.valueOf(id)); + } + } + + boolean isReady = missingApiVers.isEmpty() && zkMigrationNotReady.isEmpty(); + if (!isReady) { + String missingApiVersionMsg = missingApiVers.isEmpty() ? "" : "Missing apiVersion from nodes: " + missingApiVers; + String zkMigrationNotReadyMsg = zkMigrationNotReady.isEmpty() ? "" : "Nodes don't enable `zookeeper.metadata.migration.enable`: " + zkMigrationNotReady; + log.debug("Not all controller nodes ZK migration are ready. {}. {}", zkMigrationNotReadyMsg, missingApiVersionMsg); + } + + return isReady; + } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index fad3a4bd9773..23926df45dd1 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -16,14 +16,13 @@ */ package org.apache.kafka.metadata.migration; -import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.QuorumFeatures; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; @@ -83,7 +82,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { private volatile MigrationDriverState migrationState; private volatile ZkMigrationLeadershipState migrationLeadershipState; private volatile MetadataImage image; - private volatile ApiVersions apiVersions; + private volatile QuorumFeatures quorumFeatures; public KRaftMigrationDriver( int nodeId, @@ -92,13 +91,14 @@ public KRaftMigrationDriver( LegacyPropagator propagator, Consumer initialZkLoadHandler, FaultHandler faultHandler, - ApiVersions apiVersions + QuorumFeatures quorumFeatures, + Time time ) { this.nodeId = nodeId; this.zkRecordConsumer = zkRecordConsumer; this.zkMigrationClient = zkMigrationClient; this.propagator = propagator; - this.time = Time.SYSTEM; + this.time = time; LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] "); this.log = logContext.logger(KRaftMigrationDriver.class); this.migrationState = MigrationDriverState.UNINITIALIZED; @@ -108,9 +108,22 @@ public KRaftMigrationDriver( this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN; this.initialZkLoadHandler = initialZkLoadHandler; this.faultHandler = faultHandler; - this.apiVersions = apiVersions; + this.quorumFeatures = quorumFeatures; } + public KRaftMigrationDriver( + int nodeId, + ZkRecordConsumer zkRecordConsumer, + MigrationClient zkMigrationClient, + LegacyPropagator propagator, + Consumer initialZkLoadHandler, + FaultHandler faultHandler, + QuorumFeatures quorumFeatures + ) { + this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, Time.SYSTEM); + } + + public void start() { eventQueue.prepend(new PollEvent()); } @@ -139,7 +152,11 @@ private void initializeMigrationState() { } private boolean isControllerQuorumReadyForMigration() { - return this.apiVersions.isAllNodeZkMigrationReady(); + if (!this.quorumFeatures.isAllControllersZkMigrationReady()) { + log.info("Still waiting for all controller nodes ready to begin the migration."); + return false; + } + return true; } private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set brokerIds) { @@ -403,7 +420,7 @@ public void run() throws Exception { switch (migrationState) { case WAIT_FOR_CONTROLLER_QUORUM: if (isControllerQuorumReadyForMigration()) { - log.info("Controller Quorum is ready for Zk to KRaft migration"); + log.debug("Controller Quorum is ready for Zk to KRaft migration"); // Note that leadership would not change here. Hence we do not need to // `apply` any leadership state change. transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 07b464f7d4c8..550f3f630e2e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -100,4 +100,30 @@ public void testIsControllerId() { assertTrue(quorumFeatures.isControllerId(2)); assertFalse(quorumFeatures.isControllerId(3)); } + + @Test + public void testZkMigrationReady() { + ApiVersions apiVersions = new ApiVersions(); + QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, LOCAL, Arrays.asList(0, 1, 2)); + + // create apiVersion with zkMigrationEnabled flag set for node 0, the other 2 nodes have no apiVersions info + apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + assertFalse(quorumFeatures.isAllControllersZkMigrationReady()); + + // create apiVersion with zkMigrationEnabled flag set for node 1, the other 1 node have no apiVersions info + apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + assertFalse(quorumFeatures.isAllControllersZkMigrationReady()); + + // create apiVersion with zkMigrationEnabled flag disabled for node 2, should still be not ready + apiVersions.update("2", NodeApiVersions.create()); + assertFalse(quorumFeatures.isAllControllersZkMigrationReady()); + + // update zkMigrationEnabled flag to enabled for node 2, should be ready now + apiVersions.update("2", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + assertTrue(quorumFeatures.isAllControllersZkMigrationReady()); + + // create apiVersion with zkMigrationEnabled flag disabled for a non-controller, and expect we fill filter it out + apiVersions.update("3", NodeApiVersions.create()); + assertTrue(quorumFeatures.isAllControllersZkMigrationReady()); + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index be12e1cc8e57..afa299d9023e 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.metadata.migration; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.config.ConfigResource; @@ -23,6 +26,9 @@ import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.QuorumFeatures; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; @@ -37,11 +43,13 @@ import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -54,7 +62,34 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + public class KRaftMigrationDriverTest { + List controllerNodes = Arrays.asList( + new Node(4, "host4", 0), + new Node(5, "host5", 0), + new Node(6, "host6", 0) + ); + ApiVersions apiVersions = new ApiVersions(); + QuorumFeatures quorumFeatures = QuorumFeatures.create(4, + apiVersions, + QuorumFeatures.defaultFeatureMap(), + controllerNodes); + Time mockTime = new MockTime(1) { + public long nanoseconds() { + // We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test + return System.nanoTime() - NANOSECONDS.convert(990, MILLISECONDS); + } + }; + + @BeforeEach + public void setup() { + apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + apiVersions.update("5", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + } + static class NoOpRecordConsumer implements ZkRecordConsumer { @Override public void beginMigration() { @@ -277,65 +312,65 @@ CompletableFuture enqueueMetadataChangeEventWithFuture( public void testOnlySendNeededRPCsToBrokers() throws Exception { CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3))); - KRaftMigrationDriver driver = new KRaftMigrationDriver( + try (KRaftMigrationDriver driver = new KRaftMigrationDriver( 3000, new NoOpRecordConsumer(), migrationClient, metadataPropagator, metadataPublisher -> { }, new MockFaultHandler("test"), - null - ); - - MetadataImage image = MetadataImage.EMPTY; - MetadataDelta delta = new MetadataDelta(image); - - driver.start(); - delta.replay(zkBrokerRecord(1)); - delta.replay(zkBrokerRecord(2)); - delta.replay(zkBrokerRecord(3)); - MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); - image = delta.apply(provenance); - - // Publish a delta with this node (3000) as the leader - LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1); - driver.onControllerChange(newLeader); - driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42)); - - TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); - - Assertions.assertEquals(1, metadataPropagator.images); - Assertions.assertEquals(0, metadataPropagator.deltas); - - delta = new MetadataDelta(image); - delta.replay(new ConfigRecord() - .setResourceType(ConfigResource.Type.BROKER.id()) - .setResourceName("1") - .setName("foo") - .setValue("bar")); - provenance = new MetadataProvenance(120, 1, 2); - image = delta.apply(provenance); - enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES); - - Assertions.assertEquals(1, migrationClient.capturedConfigs.size()); - Assertions.assertEquals(1, metadataPropagator.images); - Assertions.assertEquals(0, metadataPropagator.deltas); - - delta = new MetadataDelta(image); - delta.replay(new BrokerRegistrationChangeRecord() - .setBrokerId(1) - .setBrokerEpoch(0) - .setFenced(BrokerRegistrationFencingChange.NONE.value()) - .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value())); - provenance = new MetadataProvenance(130, 1, 3); - image = delta.apply(provenance); - enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES); - - Assertions.assertEquals(1, metadataPropagator.images); - Assertions.assertEquals(1, metadataPropagator.deltas); - - driver.close(); + quorumFeatures, + mockTime + )) { + + MetadataImage image = MetadataImage.EMPTY; + MetadataDelta delta = new MetadataDelta(image); + + driver.start(); + delta.replay(zkBrokerRecord(1)); + delta.replay(zkBrokerRecord(2)); + delta.replay(zkBrokerRecord(3)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + + // Publish a delta with this node (3000) as the leader + LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1); + driver.onControllerChange(newLeader); + driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42)); + + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + + Assertions.assertEquals(1, metadataPropagator.images); + Assertions.assertEquals(0, metadataPropagator.deltas); + + delta = new MetadataDelta(image); + delta.replay(new ConfigRecord() + .setResourceType(ConfigResource.Type.BROKER.id()) + .setResourceName("1") + .setName("foo") + .setValue("bar")); + provenance = new MetadataProvenance(120, 1, 2); + image = delta.apply(provenance); + enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES); + + Assertions.assertEquals(1, migrationClient.capturedConfigs.size()); + Assertions.assertEquals(1, metadataPropagator.images); + Assertions.assertEquals(0, metadataPropagator.deltas); + + delta = new MetadataDelta(image); + delta.replay(new BrokerRegistrationChangeRecord() + .setBrokerId(1) + .setBrokerEpoch(0) + .setFenced(BrokerRegistrationFencingChange.NONE.value()) + .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value())); + provenance = new MetadataProvenance(130, 1, 3); + image = delta.apply(provenance); + enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES); + + Assertions.assertEquals(1, metadataPropagator.images); + Assertions.assertEquals(1, metadataPropagator.deltas); + } } @ParameterizedTest @@ -367,7 +402,8 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi metadataPropagator, metadataPublisher -> { }, faultHandler, - null + quorumFeatures, + mockTime )) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image); @@ -385,8 +421,8 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42)); Assertions.assertTrue(claimLeaderAttempts.await(1, TimeUnit.MINUTES)); - TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.ZK_MIGRATION), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); if (authException) { Assertions.assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass()); @@ -395,4 +431,49 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi } } } + + @Test + public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() throws Exception { + CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); + CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1))); + apiVersions.remove("6"); + + try (KRaftMigrationDriver driver = new KRaftMigrationDriver( + 3000, + new NoOpRecordConsumer(), + migrationClient, + metadataPropagator, + metadataPublisher -> { }, + new MockFaultHandler("test"), + quorumFeatures, + mockTime + )) { + + MetadataImage image = MetadataImage.EMPTY; + MetadataDelta delta = new MetadataDelta(image); + + driver.start(); + delta.replay(zkBrokerRecord(1)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + + // Publish a delta with this node (3000) as the leader + LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1); + driver.onControllerChange(newLeader); + driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42)); + + // Current apiVersions are missing the controller node 6, should stay at WAIT_FOR_CONTROLLER_QUORUM state + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), + "Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state"); + + // Current apiVersions of node 6 has no zkMigrationReady set, should still stay at WAIT_FOR_CONTROLLER_QUORUM state + apiVersions.update("6", NodeApiVersions.create()); + driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM); + + // all controller nodes are zkMigrationReady, should be able to move to next state + apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + } + } } \ No newline at end of file From a90e00b7499d2ba03348af93225183081ef66cfb Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 26 Apr 2023 10:54:25 +0800 Subject: [PATCH 6/8] KAFKA-14909: refactor --- .../org/apache/kafka/controller/QuorumFeatures.java | 10 +++++----- .../metadata/migration/KRaftMigrationDriver.java | 6 ++++-- .../apache/kafka/controller/QuorumFeaturesTest.java | 13 ++++++++----- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index b4a25df89fc9..f9d2954873e8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -130,7 +130,7 @@ boolean isControllerId(int nodeId) { } // check if all controller nodes are ZK Migration ready - public boolean isAllControllersZkMigrationReady() { + public Optional reasonAllControllersZkMigrationNotReady() { List missingApiVers = new ArrayList<>(); List zkMigrationNotReady = new ArrayList<>(); for (int id : quorumNodeIds) { @@ -144,11 +144,11 @@ public boolean isAllControllersZkMigrationReady() { boolean isReady = missingApiVers.isEmpty() && zkMigrationNotReady.isEmpty(); if (!isReady) { - String missingApiVersionMsg = missingApiVers.isEmpty() ? "" : "Missing apiVersion from nodes: " + missingApiVers; - String zkMigrationNotReadyMsg = zkMigrationNotReady.isEmpty() ? "" : "Nodes don't enable `zookeeper.metadata.migration.enable`: " + zkMigrationNotReady; - log.debug("Not all controller nodes ZK migration are ready. {}. {}", zkMigrationNotReadyMsg, missingApiVersionMsg); + String zkMigrationNotReadyMsg = zkMigrationNotReady.isEmpty() ? "" : "Nodes don't enable `zookeeper.metadata.migration.enable`: " + zkMigrationNotReady + "."; + String missingApiVersionMsg = missingApiVers.isEmpty() ? "" : " Missing apiVersion from nodes: " + missingApiVers; + return Optional.of(zkMigrationNotReadyMsg + missingApiVersionMsg); } - return isReady; + return Optional.empty(); } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 23926df45dd1..468e0042d038 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -152,8 +153,9 @@ private void initializeMigrationState() { } private boolean isControllerQuorumReadyForMigration() { - if (!this.quorumFeatures.isAllControllersZkMigrationReady()) { - log.info("Still waiting for all controller nodes ready to begin the migration."); + Optional notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady(); + if (notReadyMsg.isPresent()) { + log.info("Still waiting for all controller nodes ready to begin the migration. due to:" + notReadyMsg.get()); return false; } return true; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 550f3f630e2e..4e669aecafc0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -108,22 +108,25 @@ public void testZkMigrationReady() { // create apiVersion with zkMigrationEnabled flag set for node 0, the other 2 nodes have no apiVersions info apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); - assertFalse(quorumFeatures.isAllControllersZkMigrationReady()); + assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent()); + assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Missing apiVersion from nodes: [1, 2]")); // create apiVersion with zkMigrationEnabled flag set for node 1, the other 1 node have no apiVersions info apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); - assertFalse(quorumFeatures.isAllControllersZkMigrationReady()); + assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent()); + assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Missing apiVersion from nodes: [2]")); // create apiVersion with zkMigrationEnabled flag disabled for node 2, should still be not ready apiVersions.update("2", NodeApiVersions.create()); - assertFalse(quorumFeatures.isAllControllersZkMigrationReady()); + assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent()); + assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Nodes don't enable `zookeeper.metadata.migration.enable`: [2]")); // update zkMigrationEnabled flag to enabled for node 2, should be ready now apiVersions.update("2", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true)); - assertTrue(quorumFeatures.isAllControllersZkMigrationReady()); + assertFalse(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent()); // create apiVersion with zkMigrationEnabled flag disabled for a non-controller, and expect we fill filter it out apiVersions.update("3", NodeApiVersions.create()); - assertTrue(quorumFeatures.isAllControllersZkMigrationReady()); + assertFalse(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent()); } } From 88b88308ac0206b58cdbeb17487c26e5be926763 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 27 Apr 2023 10:35:38 +0800 Subject: [PATCH 7/8] fix tests --- .../kafka/metadata/migration/KRaftMigrationDriver.java | 8 +++++--- .../metadata/migration/KRaftMigrationDriverTest.java | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 6b293ed47b8e..44a67b222c87 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -460,9 +460,11 @@ public void run() throws Exception { transitionTo(MigrationDriverState.INACTIVE); break; case PRE_MIGRATION: - // Base case when starting the migration - log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers."); - transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); + if (isControllerQuorumReadyForMigration()) { + // Base case when starting the migration + log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers."); + transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); + } break; case MIGRATION: if (!migrationLeadershipState.zkMigrationComplete()) { diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index e160cf8441a6..4bf58f99cc2b 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -470,6 +470,7 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() MetadataDelta delta = new MetadataDelta(image); driver.start(); + delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); delta.replay(zkBrokerRecord(1)); MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); image = delta.apply(provenance); From 427e6fdd43c12da1c80d97fa39c4f576ee96dac3 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 27 Apr 2023 15:19:56 +0800 Subject: [PATCH 8/8] fix test --- .../main/java/org/apache/kafka/controller/QuorumFeatures.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index f9d2954873e8..19431d2a06d7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -134,6 +134,9 @@ public Optional reasonAllControllersZkMigrationNotReady() { List missingApiVers = new ArrayList<>(); List zkMigrationNotReady = new ArrayList<>(); for (int id : quorumNodeIds) { + if (nodeId == id) { + continue; // No need to check local node because the KraftMigrationDriver will be created only when migration config set + } NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(id)); if (nodeVersions == null) { missingApiVers.add(String.valueOf(id));