From 4298fc656d1c91115a8d7fd61753adb01298b571 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 24 Apr 2023 17:46:42 +0800 Subject: [PATCH] buildable --- .../java/org/apache/kafka/clients/ApiVersions.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 6 +++--- .../kafka/common/requests/ApiVersionsResponse.java | 13 ++++++++----- .../apache/kafka/clients/NodeApiVersionsTest.java | 10 +++++----- config/kraft/controller.properties | 3 +-- .../kafka/admin/BrokerApiVersionsCommand.scala | 2 +- .../main/scala/kafka/server/ApiVersionManager.scala | 7 ++++--- .../main/scala/kafka/server/ControllerServer.scala | 3 ++- .../src/main/scala/kafka/tools/TestRaftServer.scala | 2 +- .../scala/unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/server/ControllerApisTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../jmh/metadata/KRaftMetadataRequestBenchmark.java | 2 +- .../jmh/metadata/MetadataRequestBenchmark.java | 2 +- .../metadata/migration/KRaftMigrationDriver.java | 2 +- .../apache/kafka/controller/QuorumFeaturesTest.java | 2 +- .../migration/KRaftMigrationDriverTest.java | 6 ++++-- 17 files changed, 37 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java index 92aa153de6fe..2ae7e5484a3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java @@ -64,7 +64,7 @@ public synchronized byte maxUsableProduceMagic() { return maxUsableProduceMagic; } - // check all nodes are ZK Migration ready + // check if all nodes are ZK Migration ready public boolean isAllNodeZkMigrationReady() { return nodeApiVersions.values().stream().allMatch(ver -> ver.zkMigrationEnabled()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 603c8f5fbec2..b88401c75bbc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -960,9 +960,9 @@ private void handleApiVersionsResponse(List responses, apiVersionsResponse.data().zkMigrationReady()); apiVersions.update(node, nodeVersionInfo); this.connectionStates.ready(node); - log.info("!!! Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", + log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.", node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(), - apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo); + apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo); } /** @@ -1004,7 +1004,7 @@ private void handleInitiateApiVersionRequests(long now) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { - log.info("!!! Initiating API versions fetch from node {}.", node); + log.debug("Initiating API versions fetch from node {}.", node); // We transition the connection to the CHECKING_API_VERSIONS state only when // the ApiVersionsRequest is queued up to be sent out. Without this, the client // could remain in the CHECKING_API_VERSIONS state forever if the channel does diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 2c657d39c49c..cd0a2fa219dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -119,7 +119,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse( return createApiVersionsResponse( throttleTimeMs, filterApis(RecordVersion.current(), listenerType, true), - Features.emptySupportedFeatures() + Features.emptySupportedFeatures(), + false ); } @@ -131,7 +132,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse( return createApiVersionsResponse( throttleTimeMs, filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion), - Features.emptySupportedFeatures() + Features.emptySupportedFeatures(), + false ); } @@ -139,13 +141,14 @@ public static ApiVersionsResponse createApiVersionsResponse( int throttleTimeMs, ApiVersionCollection apiVersions ) { - return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures()); + return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false); } public static ApiVersionsResponse createApiVersionsResponse( int throttleTimeMs, ApiVersionCollection apiVersions, - Features latestSupportedFeatures + Features latestSupportedFeatures, + boolean zkMigrationEnabled ) { return createApiVersionsResponse( throttleTimeMs, @@ -153,7 +156,7 @@ public static ApiVersionsResponse createApiVersionsResponse( latestSupportedFeatures, Collections.emptyMap(), UNKNOWN_FINALIZED_FEATURES_EPOCH, - false); + zkMigrationEnabled); } public static ApiVersionsResponse createApiVersionsResponse( diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index f379366ac160..6fa6d6242e2f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -39,7 +39,7 @@ public class NodeApiVersionsTest { @Test public void testUnsupportedVersionsToString() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) { @@ -68,7 +68,7 @@ public void testVersionsToString() { .setMaxVersion((short) 10001)); } else versionList.add(ApiVersionsResponse.toApiVersion(apiKey)); } - NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false); StringBuilder bld = new StringBuilder(); String prefix = "("; for (ApiKeys apiKey : ApiKeys.values()) { @@ -125,7 +125,7 @@ public void testLatestUsableVersionOutOfRangeHigh() { @Test public void testUsableVersionCalculationNoKnownVersions() { - NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false); assertThrows(UnsupportedVersionException.class, () -> versions.latestUsableVersion(ApiKeys.FETCH)); } @@ -147,7 +147,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) { .setApiKey((short) 100) .setMinVersion((short) 0) .setMaxVersion((short) 1)); - NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false); for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) { assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey)); } @@ -157,7 +157,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) { @EnumSource(ApiMessageType.ListenerType.class) public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) { ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope); - NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList()); + NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false); for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) { ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey())); diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index a465cce8c970..4c495a28c796 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -27,8 +27,7 @@ process.roles=controller node.id=1 # The connect string for the controller quorum -#controller.quorum.voters=1@localhost:9093 -controller.quorum.voters=1@localhost:9093,4@localhost:9094,5@localhost:9095 +controller.quorum.voters=1@localhost:9093 ############################# Socket Server Settings ############################# # The address the socket server listens on. diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 990ac6f9448d..c882a88cc1a0 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -158,7 +158,7 @@ object BrokerApiVersionsCommand { private def getNodeApiVersions(node: Node): NodeApiVersions = { val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse] Errors.forCode(response.data.errorCode).maybeThrow() - new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures) + new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady) } /** diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index 21e96b69f049..0204d94c976f 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -65,7 +65,8 @@ class SimpleApiVersionManager( def this( listenerType: ListenerType, - enableUnstableLastVersion: Boolean + enableUnstableLastVersion: Boolean, + zkMigrationEnabled: Boolean ) = { this( listenerType, @@ -79,7 +80,7 @@ class SimpleApiVersionManager( private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { - ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures) + ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled) } } @@ -89,7 +90,7 @@ class DefaultApiVersionManager( features: BrokerFeatures, metadataCache: MetadataCache, val enableUnstableLastVersion: Boolean, - val zkMigrationEnabled: Boolean + val zkMigrationEnabled: Boolean = false ) extends ApiVersionManager { val enabledApis = ApiKeys.apisForListener(listenerType).asScala diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 1308795f77c6..4115e72ec9d4 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -175,7 +175,8 @@ class ControllerServer( val apiVersionManager = new SimpleApiVersionManager( ListenerType.CONTROLLER, - config.unstableApiVersionsEnabled + config.unstableApiVersionsEnabled, + config.migrationEnabled ) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index e3e2a0e28262..2f480053a6a3 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -74,7 +74,7 @@ class TestRaftServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true) + val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false) socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) val metaProperties = MetaProperties( diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 28b797f7c167..5bab866de3e2 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -77,7 +77,7 @@ class SocketServerTest { // Clean-up any metrics left around by previous tests TestUtils.clearYammerMetrics() - private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true) + private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false) val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES) val sockets = new ArrayBuffer[Socket] diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index ffa56de63db3..e55c8fbaf130 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -154,7 +154,7 @@ class ControllerApisTest { new KafkaConfig(props), MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId), Seq.empty, - new SimpleApiVersionManager(ListenerType.CONTROLLER, true) + new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false) ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 69052491acc1..8007c873ccc6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -184,7 +184,7 @@ class KafkaApisTest { } else { ApiKeys.apisForListener(listenerType).asScala.toSet } - val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true) + val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false) new KafkaApis( requestChannel = requestChannel, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java index 8e997385fc36..25e182b895c3 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java @@ -197,7 +197,7 @@ private KafkaApis createKafkaApis() { setClusterId("clusterId"). setTime(Time.SYSTEM). setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)). + setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)). build(); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 86c7ff01f412..1685105e9d11 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -199,7 +199,7 @@ private KafkaApis createKafkaApis() { setClusterId("clusterId"). setTime(Time.SYSTEM). setTokenManager(null). - setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)). + setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)). build(); } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 96066ffc99b6..fad3a4bd9773 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -403,7 +403,7 @@ public void run() throws Exception { switch (migrationState) { case WAIT_FOR_CONTROLLER_QUORUM: if (isControllerQuorumReadyForMigration()) { - log.debug("Controller Quorum is ready for Zk to KRaft migration"); + log.info("Controller Quorum is ready for Zk to KRaft migration"); // Note that leadership would not change here. Hence we do not need to // `apply` any leadership state change. transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 7cd6e6c5cbd5..07b464f7d4c8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -89,7 +89,7 @@ private static NodeApiVersions nodeApiVersions(List> setMinVersion(entry.getValue().min()). setMaxVersion(entry.getValue().max())); }); - return new NodeApiVersions(Collections.emptyList(), features); + return new NodeApiVersions(Collections.emptyList(), features, false); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 0fb5d8c6fcb6..be12e1cc8e57 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -283,7 +283,8 @@ public void testOnlySendNeededRPCsToBrokers() throws Exception { migrationClient, metadataPropagator, metadataPublisher -> { }, - new MockFaultHandler("test") + new MockFaultHandler("test"), + null ); MetadataImage image = MetadataImage.EMPTY; @@ -365,7 +366,8 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi migrationClient, metadataPropagator, metadataPublisher -> { }, - faultHandler + faultHandler, + null )) { MetadataImage image = MetadataImage.EMPTY; MetadataDelta delta = new MetadataDelta(image);