Skip to content

Commit

Permalink
MINOR: Generalize server startup to make way for KIP-500 (#9883)
Browse files Browse the repository at this point in the history
This patch attempts to generalize server initialization for KIP-500. It adds a `Server` trait which `KafkaServer` extends for the legacy Zookeeper server, and a new `KafkaRaftServer` for the new server. I have also added stubs for `KafkaRaftBroker` and `KafkaRaftController` to give a clearer idea how this will be used.

Note that this patch removes `KafkaServerStartable`, which was intended to enable custom startup logic, but was not codified into an official API and is not planned to be supported after KIP-500. 

Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
hachikuji committed Jan 16, 2021
1 parent 06c9a39 commit 8f063c1
Show file tree
Hide file tree
Showing 21 changed files with 385 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,27 @@ public ConfigDef define(String name, Type type, Importance importance, String do
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param importance
* @param importance The importance of this config (i.e. is this something you will likely need to change?)
* @return This ConfigDef so you can chain calls
*/
public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) {
return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.<String>emptyList(), null, true));
}

/**
* Define a new internal configuration. Internal configuration won't show up in the docs and aren't
* intended for general use.
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
* @param validator The validator to use in checking the correctness of the config
* @param importance The importance of this config (i.e. is this something you will likely need to change?)
* @return This ConfigDef so you can chain calls
*/
public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Validator validator, final Importance importance) {
return define(new ConfigKey(name, type, defaultValue, validator, importance, "", "", -1, Width.NONE, name, Collections.<String>emptyList(), null, true));
}

/**
* Get the configuration keys
* @return a map containing all configuration keys
Expand Down
44 changes: 38 additions & 6 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package kafka
import java.util.Properties

import joptsimple.OptionParser
import kafka.server.KafkaServerStartable
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils}
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -63,10 +63,27 @@ object Kafka extends Logging {
props
}

private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
if (config.processRoles.isEmpty) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
} else {
new KafkaRaftServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
}
}

def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
val server = buildServer(serverProps)

try {
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
Expand All @@ -78,10 +95,25 @@ object Kafka extends Logging {
}

// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
Exit.addShutdownHook("kafka-shutdown-hook", {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
})

try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
server.awaitShutdown()
}
catch {
case e: Throwable =>
Expand Down
37 changes: 36 additions & 1 deletion core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
extends AbstractConfig(LogConfig.configDef, props, false) {
/**
* Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig
* should also go in [[kafka.server.KafkaServer.copyKafkaConfigToLog]].
* should also go in [[LogConfig.extractLogConfigMap()]].
*/
val segmentSize = getInt(LogConfig.SegmentBytesProp)
val segmentMs = getLong(LogConfig.SegmentMsProp)
Expand Down Expand Up @@ -375,4 +375,39 @@ object LogConfig {
MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp
)


/**
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...
*/
def extractLogConfigMap(
kafkaConfig: KafkaConfig
): java.util.Map[String, Object] = {
val logProps = new java.util.HashMap[String, Object]()
logProps.put(SegmentBytesProp, kafkaConfig.logSegmentBytes)
logProps.put(SegmentMsProp, kafkaConfig.logRollTimeMillis)
logProps.put(SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis)
logProps.put(SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes)
logProps.put(FlushMessagesProp, kafkaConfig.logFlushIntervalMessages)
logProps.put(FlushMsProp, kafkaConfig.logFlushIntervalMs)
logProps.put(RetentionBytesProp, kafkaConfig.logRetentionBytes)
logProps.put(RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long)
logProps.put(MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
logProps.put(IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
logProps.put(DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
logProps.put(MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
logProps.put(MaxCompactionLagMsProp, kafkaConfig.logCleanerMaxCompactionLagMs)
logProps.put(FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
logProps.put(MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
logProps.put(CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
logProps.put(MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas)
logProps.put(CompressionTypeProp, kafkaConfig.compressionType)
logProps.put(UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
logProps.put(PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
logProps.put(MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
logProps.put(MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
logProps.put(MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long)
logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
logProps
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ object LogManager {
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel): LogManager = {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultProps = LogConfig.extractLogConfigMap(config)

LogConfig.validateValues(defaultProps)
val defaultLogConfig = LogConfig(defaultProps)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ object KafkaMetricsReporter {
val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
private var reporters: ArrayBuffer[KafkaMetricsReporter] = null

def startReporters (verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = {
def startReporters(verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = {
ReporterStarted synchronized {
if (!ReporterStarted.get()) {
reporters = ArrayBuffer[KafkaMetricsReporter]()
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
if(metricsConfig.reporters.nonEmpty) {
if (metricsConfig.reporters.nonEmpty) {
metricsConfig.reporters.foreach(reporterType => {
val reporter = CoreUtils.createObject[KafkaMetricsReporter](reporterType)
reporter.init(verifiableProps)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture

import kafka.log.{Log, LogConfig, LogManager}
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaServer, LogDirFailureChannel}
import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel}
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient}
Expand Down Expand Up @@ -96,15 +96,15 @@ trait RaftManager[T] {
}

class KafkaRaftManager[T](
nodeId: Int,
config: KafkaConfig,
baseLogDir: String,
recordSerde: RecordSerde[T],
topicPartition: TopicPartition,
config: KafkaConfig,
time: Time,
metrics: Metrics
) extends RaftManager[T] with Logging {

private val nodeId = config.brokerId
private val raftConfig = new RaftConfig(config.originals)
private val logContext = new LogContext(s"[RaftManager $nodeId] ")
this.logIdent = logContext.logPrefix()
Expand Down Expand Up @@ -197,7 +197,7 @@ class KafkaRaftManager[T](
}

private def buildMetadataLog(): KafkaMetadataLog = {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultProps = LogConfig.extractLogConfigMap(config)
LogConfig.validateValues(defaultProps)
val defaultLogConfig = LogConfig(defaultProps)

Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

/**
* Stubbed implementation of the KIP-500 broker which processes state
* from the `@metadata` topic which is replicated through Raft.
*/
class BrokerServer {
def startup(): Unit = ???
def shutdown(): Unit = ???
def awaitShutdown(): Unit = ???
}
27 changes: 27 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

/**
* Stubbed implementation of the KIP-500 controller which is responsible
* for managing the `@metadata` topic which is replicated through Raft.
*/
class ControllerServer {
def startup(): Unit = ???
def shutdown(): Unit = ???
def awaitShutdown(): Unit = ???
}
21 changes: 21 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.log.LogConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
Expand Down Expand Up @@ -353,6 +354,7 @@ object KafkaConfig {
val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val EnableMetadataQuorumProp = "enable.metadata.quorum"
val ProcessRolesProp = "process.roles"

/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
Expand Down Expand Up @@ -1032,6 +1034,7 @@ object KafkaConfig {

// Experimental flag to turn on APIs required for the internal metadata quorum (KIP-500)
.defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW)
.defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH)

/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
Expand Down Expand Up @@ -1453,6 +1456,24 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val processRoles = parseProcessRoles()

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole
case "controller" => ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}

val distinctRoles: Set[ProcessRole] = roles.toSet

if (distinctRoles.size != roles.size) {
throw new ConfigException(s"Duplicate role names found in `${KafkaConfig.ProcessRolesProp}`: $roles")
}

distinctRoles
}

def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
Expand Down
Loading

0 comments on commit 8f063c1

Please sign in to comment.