diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
index f803edaf3b61..8ec45c5941a3 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -153,6 +153,9 @@
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 433440ce9332..b88401c75bbc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -956,12 +956,13 @@ private void handleApiVersionsResponse(List responses,
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(
apiVersionsResponse.data().apiKeys(),
- apiVersionsResponse.data().supportedFeatures());
+ apiVersionsResponse.data().supportedFeatures(),
+ apiVersionsResponse.data().zkMigrationReady());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
- log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
+ log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, ZK migration ready: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
- apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo);
+ apiVersionsResponse.data().supportedFeatures(), apiVersionsResponse.data().zkMigrationReady(), nodeVersionInfo);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index a3aaa88fee19..44c1fe4d8159 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -48,6 +48,8 @@ public class NodeApiVersions {
private final Map supportedFeatures;
+ private final boolean zkMigrationEnabled;
+
/**
* Create a NodeApiVersions object with the current ApiVersions.
*
@@ -76,7 +78,7 @@ public static NodeApiVersions create(Collection overrides) {
}
if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
}
- return new NodeApiVersions(apiVersions, Collections.emptyList());
+ return new NodeApiVersions(apiVersions, Collections.emptyList(), false);
}
@@ -95,7 +97,7 @@ public static NodeApiVersions create(short apiKey, short minVersion, short maxVe
.setMaxVersion(maxVersion)));
}
- public NodeApiVersions(Collection nodeApiVersions, Collection nodeSupportedFeatures) {
+ public NodeApiVersions(Collection nodeApiVersions, Collection nodeSupportedFeatures, boolean zkMigrationEnabled) {
for (ApiVersion nodeApiVersion : nodeApiVersions) {
if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
@@ -112,6 +114,7 @@ public NodeApiVersions(Collection nodeApiVersions, Collection allSupportedApiVersions() {
public Map supportedFeatures() {
return supportedFeatures;
}
+
+ public boolean zkMigrationEnabled() {
+ return zkMigrationEnabled;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index ac4562bf8733..cd0a2fa219dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -119,7 +119,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, true),
- Features.emptySupportedFeatures()
+ Features.emptySupportedFeatures(),
+ false
);
}
@@ -131,7 +132,8 @@ public static ApiVersionsResponse defaultApiVersionsResponse(
return createApiVersionsResponse(
throttleTimeMs,
filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
- Features.emptySupportedFeatures()
+ Features.emptySupportedFeatures(),
+ false
);
}
@@ -139,20 +141,22 @@ public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions
) {
- return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
+ return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
}
public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
- Features latestSupportedFeatures
+ Features latestSupportedFeatures,
+ boolean zkMigrationEnabled
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
latestSupportedFeatures,
Collections.emptyMap(),
- UNKNOWN_FINALIZED_FEATURES_EPOCH);
+ UNKNOWN_FINALIZED_FEATURES_EPOCH,
+ zkMigrationEnabled);
}
public static ApiVersionsResponse createApiVersionsResponse(
@@ -163,7 +167,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType,
- boolean enableUnstableLastVersion
+ boolean enableUnstableLastVersion,
+ boolean zkMigrationEnabled
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
@@ -186,7 +191,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
- finalizedFeaturesEpoch
+ finalizedFeaturesEpoch,
+ zkMigrationEnabled
);
}
@@ -195,7 +201,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
ApiVersionCollection apiVersions,
Features latestSupportedFeatures,
Map finalizedFeatures,
- long finalizedFeaturesEpoch
+ long finalizedFeaturesEpoch,
+ boolean zkMigrationEnabled
) {
return new ApiVersionsResponse(
createApiVersionsResponseData(
@@ -204,7 +211,8 @@ public static ApiVersionsResponse createApiVersionsResponse(
apiVersions,
latestSupportedFeatures,
finalizedFeatures,
- finalizedFeaturesEpoch
+ finalizedFeaturesEpoch,
+ zkMigrationEnabled
)
);
}
@@ -294,7 +302,8 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
final ApiVersionCollection apiKeys,
final Features latestSupportedFeatures,
final Map finalizedFeatures,
- final long finalizedFeaturesEpoch
+ final long finalizedFeaturesEpoch,
+ final boolean zkMigrationEnabled
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
data.setThrottleTimeMs(throttleTimeMs);
@@ -303,6 +312,7 @@ private static ApiVersionsResponseData createApiVersionsResponseData(
data.setSupportedFeatures(createSupportedFeatureKeys(latestSupportedFeatures));
data.setFinalizedFeatures(createFinalizedFeatureKeys(finalizedFeatures));
data.setFinalizedFeaturesEpoch(finalizedFeaturesEpoch);
+ data.setZkMigrationReady(zkMigrationEnabled);
return data;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index f379366ac160..6fa6d6242e2f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -39,7 +39,7 @@ public class NodeApiVersionsTest {
@Test
public void testUnsupportedVersionsToString() {
- NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
+ NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
@@ -68,7 +68,7 @@ public void testVersionsToString() {
.setMaxVersion((short) 10001));
} else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
}
- NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
+ NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
StringBuilder bld = new StringBuilder();
String prefix = "(";
for (ApiKeys apiKey : ApiKeys.values()) {
@@ -125,7 +125,7 @@ public void testLatestUsableVersionOutOfRangeHigh() {
@Test
public void testUsableVersionCalculationNoKnownVersions() {
- NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
+ NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList(), false);
assertThrows(UnsupportedVersionException.class,
() -> versions.latestUsableVersion(ApiKeys.FETCH));
}
@@ -147,7 +147,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
.setApiKey((short) 100)
.setMinVersion((short) 0)
.setMaxVersion((short) 1));
- NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
+ NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList(), false);
for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
}
@@ -157,7 +157,7 @@ public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
@EnumSource(ApiMessageType.ListenerType.class)
public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
- NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
+ NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false);
for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c6f886c5d539..27ae3471f4ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -634,7 +634,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
Collections.singletonMap("test_feature_1", (short) 2),
- defaultFeatureMetadata().finalizedFeaturesEpoch().get()
+ defaultFeatureMetadata().finalizedFeaturesEpoch().get(),
+ false
);
}
return new ApiVersionsResponse(
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index b34304000ae6..f1931bc58823 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -121,7 +121,8 @@ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
- true
+ true,
+ false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
@@ -141,7 +142,8 @@ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefault
10L,
null,
ListenerType.ZK_BROKER,
- true
+ true,
+ false
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
@@ -169,7 +171,8 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
- true
+ true,
+ false
);
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
@@ -188,7 +191,8 @@ public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER,
- true
+ true,
+ false
);
// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 990ac6f9448d..c882a88cc1a0 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -158,7 +158,7 @@ object BrokerApiVersionsCommand {
private def getNodeApiVersions(node: Node): NodeApiVersions = {
val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
Errors.forCode(response.data.errorCode).maybeThrow()
- new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
+ new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures, response.data.zkMigrationReady)
}
/**
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 92e6d25cecb3..0204d94c976f 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -49,7 +49,8 @@ object ApiVersionManager {
forwardingManager,
supportedFeatures,
metadataCache,
- config.unstableApiVersionsEnabled
+ config.unstableApiVersionsEnabled,
+ config.migrationEnabled
)
}
}
@@ -58,25 +59,28 @@ class SimpleApiVersionManager(
val listenerType: ListenerType,
val enabledApis: collection.Set[ApiKeys],
brokerFeatures: Features[SupportedVersionRange],
- val enableUnstableLastVersion: Boolean
+ val enableUnstableLastVersion: Boolean,
+ val zkMigrationEnabled: Boolean
) extends ApiVersionManager {
def this(
listenerType: ListenerType,
- enableUnstableLastVersion: Boolean
+ enableUnstableLastVersion: Boolean,
+ zkMigrationEnabled: Boolean
) = {
this(
listenerType,
ApiKeys.apisForListener(listenerType).asScala,
BrokerFeatures.defaultSupportedFeatures(),
- enableUnstableLastVersion
+ enableUnstableLastVersion,
+ zkMigrationEnabled
)
}
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
- ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
+ ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled)
}
}
@@ -85,7 +89,8 @@ class DefaultApiVersionManager(
forwardingManager: Option[ForwardingManager],
features: BrokerFeatures,
metadataCache: MetadataCache,
- val enableUnstableLastVersion: Boolean
+ val enableUnstableLastVersion: Boolean,
+ val zkMigrationEnabled: Boolean = false
) extends ApiVersionManager {
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
@@ -103,7 +108,8 @@ class DefaultApiVersionManager(
finalizedFeatures.epoch,
controllerApiVersions.orNull,
listenerType,
- enableUnstableLastVersion
+ enableUnstableLastVersion,
+ zkMigrationEnabled
)
}
}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 022fd9d899ab..01f76b220a7c 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -175,7 +175,8 @@ class ControllerServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
- config.unstableApiVersionsEnabled
+ config.unstableApiVersionsEnabled,
+ config.migrationEnabled
)
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
@@ -270,7 +271,8 @@ class ControllerServer(
"zk migration",
fatal = false,
() => {}
- )
+ ),
+ quorumFeatures
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index e3e2a0e28262..2f480053a6a3 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -74,7 +74,7 @@ class TestRaftServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
- val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
+ val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
val metaProperties = MetaProperties(
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 28b797f7c167..5bab866de3e2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -77,7 +77,7 @@ class SocketServerTest {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()
- private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
+ private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true, false)
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index ffa56de63db3..e55c8fbaf130 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -154,7 +154,7 @@ class ControllerApisTest {
new KafkaConfig(props),
MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
- new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
+ new SimpleApiVersionManager(ListenerType.CONTROLLER, true, false)
)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 69052491acc1..8007c873ccc6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -184,7 +184,7 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
- val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
+ val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true, false)
new KafkaApis(
requestChannel = requestChannel,
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 8e997385fc36..25e182b895c3 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -197,7 +197,7 @@ private KafkaApis createKafkaApis() {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
- setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)).
+ setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false, false)).
build();
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 86c7ff01f412..1685105e9d11 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -199,7 +199,7 @@ private KafkaApis createKafkaApis() {
setClusterId("clusterId").
setTime(Time.SYSTEM).
setTokenManager(null).
- setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)).
+ setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false, false)).
build();
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 58a7aea2af39..19431d2a06d7 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -128,4 +128,30 @@ VersionRange localSupportedFeature(String featureName) {
boolean isControllerId(int nodeId) {
return quorumNodeIds.contains(nodeId);
}
+
+ // check if all controller nodes are ZK Migration ready
+ public Optional reasonAllControllersZkMigrationNotReady() {
+ List missingApiVers = new ArrayList<>();
+ List zkMigrationNotReady = new ArrayList<>();
+ for (int id : quorumNodeIds) {
+ if (nodeId == id) {
+ continue; // No need to check local node because the KraftMigrationDriver will be created only when migration config set
+ }
+ NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(id));
+ if (nodeVersions == null) {
+ missingApiVers.add(String.valueOf(id));
+ } else if (!nodeVersions.zkMigrationEnabled()) {
+ zkMigrationNotReady.add(String.valueOf(id));
+ }
+ }
+
+ boolean isReady = missingApiVers.isEmpty() && zkMigrationNotReady.isEmpty();
+ if (!isReady) {
+ String zkMigrationNotReadyMsg = zkMigrationNotReady.isEmpty() ? "" : "Nodes don't enable `zookeeper.metadata.migration.enable`: " + zkMigrationNotReady + ".";
+ String missingApiVersionMsg = missingApiVers.isEmpty() ? "" : " Missing apiVersion from nodes: " + missingApiVers;
+ return Optional.of(zkMigrationNotReadyMsg + missingApiVersionMsg);
+ }
+
+ return Optional.empty();
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 879fc0958507..44a67b222c87 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -47,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
@@ -91,6 +93,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private volatile MigrationDriverState migrationState;
private volatile ZkMigrationLeadershipState migrationLeadershipState;
private volatile MetadataImage image;
+ private volatile QuorumFeatures quorumFeatures;
private volatile boolean firstPublish;
public KRaftMigrationDriver(
@@ -99,15 +102,17 @@ public KRaftMigrationDriver(
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer initialZkLoadHandler,
- FaultHandler faultHandler
+ FaultHandler faultHandler,
+ QuorumFeatures quorumFeatures,
+ Time time
) {
this.nodeId = nodeId;
this.zkRecordConsumer = zkRecordConsumer;
this.zkMigrationClient = zkMigrationClient;
this.propagator = propagator;
- this.time = Time.SYSTEM;
+ this.time = time;
this.logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
- this.log = this.logContext.logger(KRaftMigrationDriver.class);
+ this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
@@ -116,8 +121,22 @@ public KRaftMigrationDriver(
this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
this.initialZkLoadHandler = initialZkLoadHandler;
this.faultHandler = faultHandler;
+ this.quorumFeatures = quorumFeatures;
}
+ public KRaftMigrationDriver(
+ int nodeId,
+ ZkRecordConsumer zkRecordConsumer,
+ MigrationClient zkMigrationClient,
+ LegacyPropagator propagator,
+ Consumer initialZkLoadHandler,
+ FaultHandler faultHandler,
+ QuorumFeatures quorumFeatures
+ ) {
+ this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, Time.SYSTEM);
+ }
+
+
public void start() {
eventQueue.prepend(new PollEvent());
}
@@ -149,6 +168,15 @@ private void recoverMigrationStateFromZK() {
transitionTo(MigrationDriverState.INACTIVE);
}
+ private boolean isControllerQuorumReadyForMigration() {
+ Optional notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady();
+ if (notReadyMsg.isPresent()) {
+ log.info("Still waiting for all controller nodes ready to begin the migration. due to:" + notReadyMsg.get());
+ return false;
+ }
+ return true;
+ }
+
private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set brokerIds) {
for (BrokerRegistration broker : image.cluster().brokers().values()) {
if (broker.isMigratingZkBroker()) {
@@ -432,9 +460,11 @@ public void run() throws Exception {
transitionTo(MigrationDriverState.INACTIVE);
break;
case PRE_MIGRATION:
- // Base case when starting the migration
- log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
- transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
+ if (isControllerQuorumReadyForMigration()) {
+ // Base case when starting the migration
+ log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
+ transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
+ }
break;
case MIGRATION:
if (!migrationLeadershipState.zkMigrationComplete()) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
index 7cd6e6c5cbd5..4e669aecafc0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -89,7 +89,7 @@ private static NodeApiVersions nodeApiVersions(List>
setMinVersion(entry.getValue().min()).
setMaxVersion(entry.getValue().max()));
});
- return new NodeApiVersions(Collections.emptyList(), features);
+ return new NodeApiVersions(Collections.emptyList(), features, false);
}
@Test
@@ -100,4 +100,33 @@ public void testIsControllerId() {
assertTrue(quorumFeatures.isControllerId(2));
assertFalse(quorumFeatures.isControllerId(3));
}
+
+ @Test
+ public void testZkMigrationReady() {
+ ApiVersions apiVersions = new ApiVersions();
+ QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, LOCAL, Arrays.asList(0, 1, 2));
+
+ // create apiVersion with zkMigrationEnabled flag set for node 0, the other 2 nodes have no apiVersions info
+ apiVersions.update("0", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
+ assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Missing apiVersion from nodes: [1, 2]"));
+
+ // create apiVersion with zkMigrationEnabled flag set for node 1, the other 1 node have no apiVersions info
+ apiVersions.update("1", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
+ assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Missing apiVersion from nodes: [2]"));
+
+ // create apiVersion with zkMigrationEnabled flag disabled for node 2, should still be not ready
+ apiVersions.update("2", NodeApiVersions.create());
+ assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
+ assertTrue(quorumFeatures.reasonAllControllersZkMigrationNotReady().get().contains("Nodes don't enable `zookeeper.metadata.migration.enable`: [2]"));
+
+ // update zkMigrationEnabled flag to enabled for node 2, should be ready now
+ apiVersions.update("2", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ assertFalse(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
+
+ // create apiVersion with zkMigrationEnabled flag disabled for a non-controller, and expect we fill filter it out
+ apiVersions.update("3", NodeApiVersions.create());
+ assertFalse(quorumFeatures.reasonAllControllersZkMigrationNotReady().isPresent());
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 5d927359a5e0..4bf58f99cc2b 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.metadata.migration;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.config.ConfigResource;
@@ -23,6 +26,9 @@
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -37,6 +43,7 @@
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -55,7 +62,34 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
public class KRaftMigrationDriverTest {
+ List controllerNodes = Arrays.asList(
+ new Node(4, "host4", 0),
+ new Node(5, "host5", 0),
+ new Node(6, "host6", 0)
+ );
+ ApiVersions apiVersions = new ApiVersions();
+ QuorumFeatures quorumFeatures = QuorumFeatures.create(4,
+ apiVersions,
+ QuorumFeatures.defaultFeatureMap(),
+ controllerNodes);
+ Time mockTime = new MockTime(1) {
+ public long nanoseconds() {
+ // We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test
+ return System.nanoTime() - NANOSECONDS.convert(990, MILLISECONDS);
+ }
+ };
+
+ @BeforeEach
+ public void setup() {
+ apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ apiVersions.update("5", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ }
+
static class NoOpRecordConsumer implements ZkRecordConsumer {
@Override
public void beginMigration() {
@@ -292,65 +326,66 @@ CompletableFuture enqueueMetadataChangeEventWithFuture(
public void testOnlySendNeededRPCsToBrokers() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)));
- KRaftMigrationDriver driver = new KRaftMigrationDriver(
+ try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
- new MockFaultHandler("test")
- );
-
- MetadataImage image = MetadataImage.EMPTY;
- MetadataDelta delta = new MetadataDelta(image);
-
- driver.start();
- delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
- delta.replay(zkBrokerRecord(1));
- delta.replay(zkBrokerRecord(2));
- delta.replay(zkBrokerRecord(3));
- MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
- image = delta.apply(provenance);
-
- // Publish a delta with this node (3000) as the leader
- LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
- driver.onControllerChange(newLeader);
- driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
-
- TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
- "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
-
- Assertions.assertEquals(1, metadataPropagator.images);
- Assertions.assertEquals(0, metadataPropagator.deltas);
-
- delta = new MetadataDelta(image);
- delta.replay(new ConfigRecord()
- .setResourceType(ConfigResource.Type.BROKER.id())
- .setResourceName("1")
- .setName("foo")
- .setValue("bar"));
- provenance = new MetadataProvenance(120, 1, 2);
- image = delta.apply(provenance);
- enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
-
- Assertions.assertEquals(1, migrationClient.capturedConfigs.size());
- Assertions.assertEquals(1, metadataPropagator.images);
- Assertions.assertEquals(0, metadataPropagator.deltas);
-
- delta = new MetadataDelta(image);
- delta.replay(new BrokerRegistrationChangeRecord()
- .setBrokerId(1)
- .setBrokerEpoch(0)
- .setFenced(BrokerRegistrationFencingChange.NONE.value())
- .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
- provenance = new MetadataProvenance(130, 1, 3);
- image = delta.apply(provenance);
- enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
-
- Assertions.assertEquals(1, metadataPropagator.images);
- Assertions.assertEquals(1, metadataPropagator.deltas);
-
- driver.close();
+ new MockFaultHandler("test"),
+ quorumFeatures,
+ mockTime
+ )) {
+
+ MetadataImage image = MetadataImage.EMPTY;
+ MetadataDelta delta = new MetadataDelta(image);
+
+ driver.start();
+ delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+ delta.replay(zkBrokerRecord(1));
+ delta.replay(zkBrokerRecord(2));
+ delta.replay(zkBrokerRecord(3));
+ MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+ image = delta.apply(provenance);
+
+ // Publish a delta with this node (3000) as the leader
+ LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+ driver.onControllerChange(newLeader);
+ driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+ "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
+
+ Assertions.assertEquals(1, metadataPropagator.images);
+ Assertions.assertEquals(0, metadataPropagator.deltas);
+
+ delta = new MetadataDelta(image);
+ delta.replay(new ConfigRecord()
+ .setResourceType(ConfigResource.Type.BROKER.id())
+ .setResourceName("1")
+ .setName("foo")
+ .setValue("bar"));
+ provenance = new MetadataProvenance(120, 1, 2);
+ image = delta.apply(provenance);
+ enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
+
+ Assertions.assertEquals(1, migrationClient.capturedConfigs.size());
+ Assertions.assertEquals(1, metadataPropagator.images);
+ Assertions.assertEquals(0, metadataPropagator.deltas);
+
+ delta = new MetadataDelta(image);
+ delta.replay(new BrokerRegistrationChangeRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(0)
+ .setFenced(BrokerRegistrationFencingChange.NONE.value())
+ .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
+ provenance = new MetadataProvenance(130, 1, 3);
+ image = delta.apply(provenance);
+ enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
+
+ Assertions.assertEquals(1, metadataPropagator.images);
+ Assertions.assertEquals(1, metadataPropagator.deltas);
+ }
}
@ParameterizedTest
@@ -381,7 +416,9 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
migrationClient,
metadataPropagator,
metadataPublisher -> { },
- faultHandler
+ faultHandler,
+ quorumFeatures,
+ mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@@ -400,8 +437,8 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
Assertions.assertTrue(claimLeaderAttempts.await(1, TimeUnit.MINUTES));
- TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.ZK_MIGRATION),
- "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+ "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
if (authException) {
Assertions.assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass());
@@ -412,6 +449,52 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
}
@Test
+ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() throws Exception {
+ CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+ CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1)));
+ apiVersions.remove("6");
+
+ try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
+ 3000,
+ new NoOpRecordConsumer(),
+ migrationClient,
+ metadataPropagator,
+ metadataPublisher -> {
+ },
+ new MockFaultHandler("test"),
+ quorumFeatures,
+ mockTime
+ )) {
+
+ MetadataImage image = MetadataImage.EMPTY;
+ MetadataDelta delta = new MetadataDelta(image);
+
+ driver.start();
+ delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+ delta.replay(zkBrokerRecord(1));
+ MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+ image = delta.apply(provenance);
+
+ // Publish a delta with this node (3000) as the leader
+ LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+ driver.onControllerChange(newLeader);
+ driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+ // Current apiVersions are missing the controller node 6, should stay at WAIT_FOR_CONTROLLER_QUORUM state
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
+ "Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
+
+ // Current apiVersions of node 6 has no zkMigrationReady set, should still stay at WAIT_FOR_CONTROLLER_QUORUM state
+ apiVersions.update("6", NodeApiVersions.create());
+ driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
+
+ // all controller nodes are zkMigrationReady, should be able to move to next state
+ apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
+ TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+ "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
+ }
+ }
+
public void testSkipWaitForBrokersInDualWrite() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet());
@@ -422,7 +505,9 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
migrationClient,
metadataPropagator,
metadataPublisher -> { },
- faultHandler
+ faultHandler,
+ quorumFeatures,
+ mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);