Skip to content

Commit

Permalink
MINOR: Install ControllerServer metadata publishers sooner (#14215)
Browse files Browse the repository at this point in the history
This patch is a follow up of #14169 that installs the metadata publishers before blocking on the authorizer future.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
mumrah committed Aug 15, 2023
1 parent c199840 commit 0a531b7
Showing 1 changed file with 25 additions and 26 deletions.
51 changes: 25 additions & 26 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -302,32 +302,6 @@ class ControllerServer(
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)

val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = endpointReadyFutures.futures().asScala.toMap

/**
* Enable the controller endpoint(s). If we are using an authorizer which stores
* ACLs in the metadata log, such as StandardAuthorizer, we will be able to start
* accepting requests from principals included super.users right after this point,
* but we will not be able to process requests from non-superusers until the
* QuorumController declares that we have caught up to the high water mark of the
* metadata log. See @link{QuorumController#maybeCompleteAuthorizerInitialLoad}
* and KIP-801 for details.
*/
val socketServerFuture = socketServer.enableRequestProcessing(authorizerFutures)

// Block here until all the authorizer futures are complete
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)

// Wait for all the SocketServer ports to be open, and the Acceptors to be started.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)

// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)

// Set up the metadata features publisher.
metadataPublishers.add(featuresPublisher)

Expand Down Expand Up @@ -376,6 +350,31 @@ class ControllerServer(
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the controller metadata publishers to be installed",
sharedServer.loader.installPublishers(metadataPublishers), startupDeadline, time)

val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = endpointReadyFutures.futures().asScala.toMap

/**
* Enable the controller endpoint(s). If we are using an authorizer which stores
* ACLs in the metadata log, such as StandardAuthorizer, we will be able to start
* accepting requests from principals included super.users right after this point,
* but we will not be able to process requests from non-superusers until AclPublisher
* publishes metadata from the QuorumController. MetadataPublishers do not publish
* metadata until the controller has caught up to the high watermark.
*/
val socketServerFuture = socketServer.enableRequestProcessing(authorizerFutures)

// Block here until all the authorizer futures are complete
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)

// Wait for all the SocketServer ports to be open, and the Acceptors to be started.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)

// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
Expand Down

0 comments on commit 0a531b7

Please sign in to comment.