diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 3cd1575e2c..310934fa68 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -333,14 +333,7 @@ class BrokerServer( sharedServer.metadataLoaderFaultHandler) // AutoMQ for Kafka inject start - - if (config.elasticStreamEnabled) { - if (!ElasticLogManager.init(config, clusterId, this)) { - throw new UnsupportedOperationException("Elastic stream client failed to be configured. Please check your configuration.") - } - } else { - warn("Elastic stream is disabled. This node will store data locally.") - } + initElasticLogManager() // AutoMQ for Kafka inject end val networkListeners = new ListenerCollection() @@ -656,4 +649,15 @@ class BrokerServer( retryTimeout ) } + + protected def initElasticLogManager(): Unit = { + if (config.elasticStreamEnabled) { + if (!ElasticLogManager.init(config, clusterId, this)) { + throw new UnsupportedOperationException("Elastic stream client failed to be configured. Please check your configuration.") + } + } else { + warn("Elastic stream is disabled. This node will store data locally.") + } + } + } diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 908a51d80b..299c7a27b1 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -75,20 +75,13 @@ class KafkaRaftServer( ) private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) { - Some(new BrokerServer( - sharedServer, - offlineDirs - )) + Some(brokerServer()) } else { None } private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) { - Some(new ControllerServer( - sharedServer, - KafkaRaftServer.configSchema, - bootstrapMetadata, - )) + Some(controllerServer()) } else { None } @@ -114,6 +107,24 @@ class KafkaRaftServer( broker.foreach(_.awaitShutdown()) controller.foreach(_.awaitShutdown()) } + + // AutoMQ for Kafka inject start + protected def brokerServer(): BrokerServer = { + new BrokerServer( + sharedServer, + offlineDirs + ) + } + + protected def controllerServer(): ControllerServer = { + new ControllerServer( + sharedServer, + KafkaRaftServer.configSchema, + bootstrapMetadata, + ) + } + // AutoMQ for Kafka inject end + } object KafkaRaftServer {