diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 8832e8878aad..9f5aeb322ca1 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -32,6 +32,7 @@ import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.zk.{KafkaZkClient, ZkMigrationClient} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.message.ApiMessageType.ListenerType +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.LogContext @@ -182,7 +183,8 @@ class ControllerServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) socketServer = new SocketServer(config, - metrics, + // metrics will be null when restarting a controller, this will only happen in test. + if (metrics == null) new Metrics() else metrics, time, credentialProvider, apiVersionManager) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index a9c1d28d982b..e50671d63f76 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -80,7 +80,7 @@ class KRaftClusterTest { } @Test - def testCreateClusterAndRestartNode(): Unit = { + def testCreateClusterAndRestartBrokerNode(): Unit = { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). @@ -96,6 +96,32 @@ class KRaftClusterTest { } } + @Test + def testCreateClusterAndRestartControllerNode(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(2).build()).build() + try { + cluster.format() + cluster.startup() + val controller = cluster.controllers().values().iterator().asScala.filter(_.controller.isActive).next() + val port = controller.socketServer.boundPort(controller.config.controllerListeners.head.listenerName) + + // shutdown active controller + controller.shutdown() + // Rewrite The `listeners` config to avoid controller socket server init using different port + val config = controller.sharedServer.controllerConfig.props + config.asInstanceOf[java.util.HashMap[String,String]].put(KafkaConfig.ListenersProp, s"CONTROLLER://localhost:$port") + controller.sharedServer.controllerConfig.updateCurrentConfig(new KafkaConfig(config)) + + // restart controller + controller.startup() + } finally { + cluster.close() + } + } + @Test def testCreateClusterAndWaitForBrokerInRunningState(): Unit = { val cluster = new KafkaClusterTestKit.Builder( diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 1ae8d38f92a5..4c7a5b73cdd0 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2028,8 +2028,11 @@ public CompletableFuture finalizedFeatures( if (lastCommittedOffset == -1) { return CompletableFuture.completedFuture(new FinalizedControllerFeatures(Collections.emptyMap(), -1)); } + // It's possible for a standby controller to receive ApiVersionRequest and we do not have any timeline snapshot + // in a standby controller, in this case we use SnapshotRegistry.LATEST_EPOCH. + long epoch = isActive() ? lastCommittedOffset : SnapshotRegistry.LATEST_EPOCH; return appendReadEvent("getFinalizedFeatures", context.deadlineNs(), - () -> featureControl.finalizedFeatures(lastCommittedOffset)); + () -> featureControl.finalizedFeatures(epoch)); } @Override