From 8f063c15da51d77a40aa63f110e48c5eeeaf5dfc Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 15 Jan 2021 17:30:29 -0800 Subject: [PATCH] MINOR: Generalize server startup to make way for KIP-500 (#9883) This patch attempts to generalize server initialization for KIP-500. It adds a `Server` trait which `KafkaServer` extends for the legacy Zookeeper server, and a new `KafkaRaftServer` for the new server. I have also added stubs for `KafkaRaftBroker` and `KafkaRaftController` to give a clearer idea how this will be used. Note that this patch removes `KafkaServerStartable`, which was intended to enable custom startup logic, but was not codified into an official API and is not planned to be supported after KIP-500. Reviewers: Ismael Juma , Colin P. McCabe --- .../apache/kafka/common/config/ConfigDef.java | 16 ++- core/src/main/scala/kafka/Kafka.scala | 44 +++++++- core/src/main/scala/kafka/log/LogConfig.scala | 37 ++++++- .../src/main/scala/kafka/log/LogManager.scala | 2 +- .../kafka/metrics/KafkaMetricsReporter.scala | 4 +- .../main/scala/kafka/raft/RaftManager.scala | 8 +- .../scala/kafka/server/BrokerServer.scala | 27 +++++ .../scala/kafka/server/ControllerServer.scala | 27 +++++ .../main/scala/kafka/server/KafkaConfig.scala | 21 ++++ .../scala/kafka/server/KafkaRaftServer.scala | 103 ++++++++++++++++++ .../main/scala/kafka/server/KafkaServer.scala | 93 +++------------- .../kafka/server/KafkaServerStartable.scala | 75 ------------- core/src/main/scala/kafka/server/Server.scala | 73 +++++++++++++ .../scala/kafka/server/ZkAdminManager.scala | 4 +- .../scala/kafka/tools/TestRaftServer.scala | 3 +- .../kafka/utils/VerifiableProperties.scala | 8 ++ .../DynamicBrokerReconfigurationTest.scala | 2 +- .../scala/unit/kafka/log/LogConfigTest.scala | 4 +- .../KafkaMetricReporterClusterIdTest.scala | 4 +- .../server/KafkaMetricsReporterTest.scala | 4 +- .../unit/kafka/server/ServerMetricsTest.scala | 4 +- 21 files changed, 385 insertions(+), 178 deletions(-) create mode 100644 core/src/main/scala/kafka/server/BrokerServer.scala create mode 100644 core/src/main/scala/kafka/server/ControllerServer.scala create mode 100644 core/src/main/scala/kafka/server/KafkaRaftServer.scala delete mode 100644 core/src/main/scala/kafka/server/KafkaServerStartable.scala create mode 100644 core/src/main/scala/kafka/server/Server.scala diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 0df9335e7c9c..21a95253c027 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -404,13 +404,27 @@ public ConfigDef define(String name, Type type, Importance importance, String do * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present - * @param importance + * @param importance The importance of this config (i.e. is this something you will likely need to change?) * @return This ConfigDef so you can chain calls */ public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) { return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.emptyList(), null, true)); } + /** + * Define a new internal configuration. Internal configuration won't show up in the docs and aren't + * intended for general use. + * @param name The name of the config parameter + * @param type The type of the config + * @param defaultValue The default value to use if this config isn't present + * @param validator The validator to use in checking the correctness of the config + * @param importance The importance of this config (i.e. is this something you will likely need to change?) + * @return This ConfigDef so you can chain calls + */ + public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Validator validator, final Importance importance) { + return define(new ConfigKey(name, type, defaultValue, validator, importance, "", "", -1, Width.NONE, name, Collections.emptyList(), null, true)); + } + /** * Get the configuration keys * @return a map containing all configuration keys diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index f01df6682219..1e1f345cf336 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -20,10 +20,10 @@ package kafka import java.util.Properties import joptsimple.OptionParser -import kafka.server.KafkaServerStartable +import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ import kafka.utils.{CommandLineUtils, Exit, Logging} -import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils} +import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils} import scala.jdk.CollectionConverters._ @@ -63,10 +63,27 @@ object Kafka extends Logging { props } + private def buildServer(props: Properties): Server = { + val config = KafkaConfig.fromProps(props, false) + if (config.processRoles.isEmpty) { + new KafkaServer( + config, + Time.SYSTEM, + threadNamePrefix = None + ) + } else { + new KafkaRaftServer( + config, + Time.SYSTEM, + threadNamePrefix = None + ) + } + } + def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args) - val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) + val server = buildServer(serverProps) try { if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) @@ -78,10 +95,25 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown()) + Exit.addShutdownHook("kafka-shutdown-hook", { + try server.shutdown() + catch { + case _: Throwable => + fatal("Halting Kafka.") + // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. + Exit.halt(1) + } + }) + + try server.startup() + catch { + case _: Throwable => + // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code + fatal("Exiting Kafka.") + Exit.exit(1) + } - kafkaServerStartable.startup() - kafkaServerStartable.awaitShutdown() + server.awaitShutdown() } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index c0a6d9f0c2d7..4299534bc663 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -71,7 +71,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] extends AbstractConfig(LogConfig.configDef, props, false) { /** * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig - * should also go in [[kafka.server.KafkaServer.copyKafkaConfigToLog]]. + * should also go in [[LogConfig.extractLogConfigMap()]]. */ val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) @@ -375,4 +375,39 @@ object LogConfig { MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp ) + + /** + * Copy the subset of properties that are relevant to Logs. The individual properties + * are listed here since the names are slightly different in each Config class... + */ + def extractLogConfigMap( + kafkaConfig: KafkaConfig + ): java.util.Map[String, Object] = { + val logProps = new java.util.HashMap[String, Object]() + logProps.put(SegmentBytesProp, kafkaConfig.logSegmentBytes) + logProps.put(SegmentMsProp, kafkaConfig.logRollTimeMillis) + logProps.put(SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis) + logProps.put(SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes) + logProps.put(FlushMessagesProp, kafkaConfig.logFlushIntervalMessages) + logProps.put(FlushMsProp, kafkaConfig.logFlushIntervalMs) + logProps.put(RetentionBytesProp, kafkaConfig.logRetentionBytes) + logProps.put(RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long) + logProps.put(MaxMessageBytesProp, kafkaConfig.messageMaxBytes) + logProps.put(IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes) + logProps.put(DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs) + logProps.put(MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs) + logProps.put(MaxCompactionLagMsProp, kafkaConfig.logCleanerMaxCompactionLagMs) + logProps.put(FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs) + logProps.put(MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio) + logProps.put(CleanupPolicyProp, kafkaConfig.logCleanupPolicy) + logProps.put(MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas) + logProps.put(CompressionTypeProp, kafkaConfig.compressionType) + logProps.put(UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable) + logProps.put(PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) + logProps.put(MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version) + logProps.put(MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name) + logProps.put(MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long) + logProps.put(MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean) + logProps + } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 30b6b6a42496..f5f77e03492b 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1187,7 +1187,7 @@ object LogManager { time: Time, brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel): LogManager = { - val defaultProps = KafkaServer.copyKafkaConfigToLog(config) + val defaultProps = LogConfig.extractLogConfigMap(config) LogConfig.validateValues(defaultProps) val defaultLogConfig = LogConfig(defaultProps) diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala index 814cdb2b91b7..30baad3464c3 100755 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -55,12 +55,12 @@ object KafkaMetricsReporter { val ReporterStarted: AtomicBoolean = new AtomicBoolean(false) private var reporters: ArrayBuffer[KafkaMetricsReporter] = null - def startReporters (verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = { + def startReporters(verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = { ReporterStarted synchronized { if (!ReporterStarted.get()) { reporters = ArrayBuffer[KafkaMetricsReporter]() val metricsConfig = new KafkaMetricsConfig(verifiableProps) - if(metricsConfig.reporters.nonEmpty) { + if (metricsConfig.reporters.nonEmpty) { metricsConfig.reporters.foreach(reporterType => { val reporter = CoreUtils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index fe199ad584e6..cc9a82ca1892 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture import kafka.log.{Log, LogConfig, LogManager} import kafka.raft.KafkaRaftManager.RaftIoThread -import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaServer, LogDirFailureChannel} +import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel} import kafka.utils.timer.SystemTimer import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread} import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient} @@ -96,15 +96,15 @@ trait RaftManager[T] { } class KafkaRaftManager[T]( - nodeId: Int, + config: KafkaConfig, baseLogDir: String, recordSerde: RecordSerde[T], topicPartition: TopicPartition, - config: KafkaConfig, time: Time, metrics: Metrics ) extends RaftManager[T] with Logging { + private val nodeId = config.brokerId private val raftConfig = new RaftConfig(config.originals) private val logContext = new LogContext(s"[RaftManager $nodeId] ") this.logIdent = logContext.logPrefix() @@ -197,7 +197,7 @@ class KafkaRaftManager[T]( } private def buildMetadataLog(): KafkaMetadataLog = { - val defaultProps = KafkaServer.copyKafkaConfigToLog(config) + val defaultProps = LogConfig.extractLogConfigMap(config) LogConfig.validateValues(defaultProps) val defaultLogConfig = LogConfig(defaultProps) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala new file mode 100644 index 000000000000..90f95ed2c430 --- /dev/null +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +/** + * Stubbed implementation of the KIP-500 broker which processes state + * from the `@metadata` topic which is replicated through Raft. + */ +class BrokerServer { + def startup(): Unit = ??? + def shutdown(): Unit = ??? + def awaitShutdown(): Unit = ??? +} diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala new file mode 100644 index 000000000000..b648e7732847 --- /dev/null +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +/** + * Stubbed implementation of the KIP-500 controller which is responsible + * for managing the `@metadata` topic which is replicated through Raft. + */ +class ControllerServer { + def startup(): Unit = ??? + def shutdown(): Unit = ??? + def awaitShutdown(): Unit = ??? +} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 895612e2f7d0..7eeb65992ad4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -27,6 +27,7 @@ import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} import kafka.log.LogConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec} import kafka.security.authorizer.AuthorizerUtils +import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole} import kafka.utils.CoreUtils import kafka.utils.Implicits._ import org.apache.kafka.clients.CommonClientConfigs @@ -353,6 +354,7 @@ object KafkaConfig { val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG val EnableMetadataQuorumProp = "enable.metadata.quorum" + val ProcessRolesProp = "process.roles" /************* Authorizer Configuration ***********/ val AuthorizerClassNameProp = "authorizer.class.name" @@ -1032,6 +1034,7 @@ object KafkaConfig { // Experimental flag to turn on APIs required for the internal metadata quorum (KIP-500) .defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW) + .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH) /************* Authorizer Configuration ***********/ .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) @@ -1453,6 +1456,24 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) + val processRoles = parseProcessRoles() + + private def parseProcessRoles(): Set[ProcessRole] = { + val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map { + case "broker" => BrokerRole + case "controller" => ControllerRole + case role => throw new ConfigException(s"Unknown process role '$role'" + + " (only 'broker' and 'controller' are allowed roles)") + } + + val distinctRoles: Set[ProcessRole] = roles.toSet + + if (distinctRoles.size != roles.size) { + throw new ConfigException(s"Duplicate role names found in `${KafkaConfig.ProcessRolesProp}`: $roles") + } + + distinctRoles + } def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala new file mode 100644 index 000000000000..a63abfe4b9a8 --- /dev/null +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} +import kafka.raft.KafkaRaftManager +import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} +import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.{AppInfoParser, Time} +import org.apache.kafka.raft.internals.StringSerde + +/** + * This class implements the KIP-500 server which relies on a self-managed + * Raft quorum for maintaining cluster metadata. It is responsible for + * constructing the controller and/or broker based on the `process.roles` + * configuration and for managing their basic lifecycle (startup and shutdown). + * + * Note that this server is a work in progress and relies on stubbed + * implementations of the controller [[ControllerServer]] and broker + * [[BrokerServer]]. + */ +class KafkaRaftServer( + config: KafkaConfig, + time: Time, + threadNamePrefix: Option[String] +) extends Server with Logging { + + KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) + KafkaYammerMetrics.INSTANCE.configure(config.originals) + + private val metrics = Server.initializeMetrics( + config, + time, + clusterId = "FIXME" + ) + + private val raftManager = new KafkaRaftManager( + config, + config.logDirs.head, + new StringSerde, + KafkaRaftServer.MetadataPartition, + time, + metrics + ) + + private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) { + Some(new BrokerServer()) + } else { + None + } + + private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) { + Some(new ControllerServer()) + } else { + None + } + + override def startup(): Unit = { + Mx4jLoader.maybeLoad() + raftManager.startup() + controller.foreach(_.startup()) + broker.foreach(_.startup()) + AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) + } + + override def shutdown(): Unit = { + broker.foreach(_.shutdown()) + raftManager.shutdown() + controller.foreach(_.shutdown()) + CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this) + + } + + override def awaitShutdown(): Unit = { + broker.foreach(_.awaitShutdown()) + controller.foreach(_.awaitShutdown()) + } + +} + +object KafkaRaftServer { + val MetadataTopic = "@metadata" + val MetadataPartition = new TopicPartition(MetadataTopic, 0) + + sealed trait ProcessRole + case object BrokerRole extends ProcessRole + case object ControllerRole extends ProcessRole +} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 168844b30127..b1748ef10713 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -19,7 +19,6 @@ package kafka.server import java.io.{File, IOException} import java.net.{InetAddress, SocketTimeoutException} -import java.util import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} @@ -29,16 +28,16 @@ import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, I import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator -import kafka.log.{LogConfig, LogManager} +import kafka.log.LogManager import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter, KafkaYammerMetrics, LinuxIoMetricsCollector} import kafka.network.SocketServer import kafka.security.CredentialProvider import kafka.utils._ import kafka.zk.{BrokerInfo, KafkaZkClient} -import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, CommonClientConfigs, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} +import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.message.ControlledShutdownRequestData -import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter, _} +import org.apache.kafka.common.metrics.{Metrics, MetricsReporter} import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} @@ -54,43 +53,6 @@ import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ object KafkaServer { - // Copy the subset of properties that are relevant to Logs - // I'm listing out individual properties here since the names are slightly different in each Config class... - private[kafka] def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = { - val logProps = new util.HashMap[String, Object]() - logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes) - logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis) - logProps.put(LogConfig.SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis) - logProps.put(LogConfig.SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes) - logProps.put(LogConfig.FlushMessagesProp, kafkaConfig.logFlushIntervalMessages) - logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs) - logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes) - logProps.put(LogConfig.RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long) - logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes) - logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes) - logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs) - logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs) - logProps.put(LogConfig.MaxCompactionLagMsProp, kafkaConfig.logCleanerMaxCompactionLagMs) - logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs) - logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio) - logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy) - logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas) - logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType) - logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable) - logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) - logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version) - logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name) - logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long) - logProps.put(LogConfig.MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean) - logProps - } - - private[server] def metricConfig(kafkaConfig: KafkaConfig): MetricConfig = { - new MetricConfig() - .samples(kafkaConfig.metricNumSamples) - .recordLevel(Sensor.RecordingLevel.forName(kafkaConfig.metricRecordingLevel)) - .timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS) - } def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) = if (!config.zkSslClientEnable && !forceZkSslClientEnable) @@ -121,22 +83,21 @@ object KafkaServer { * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ -class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, - kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup { +class KafkaServer( + val config: KafkaConfig, + time: Time = Time.SYSTEM, + threadNamePrefix: Option[String] = None, +) extends Server with Logging with KafkaMetricsGroup { + private val startupComplete = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false) private val isStartingUp = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) - - //properties for MetricsContext - private val metricsPrefix: String = "kafka.server" - private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id" - private val KAFKA_BROKER_ID: String = "kafka.broker.id" - - private var logContext: LogContext = null + private val kafkaMetricsReporters: Seq[KafkaMetricsReporter] = + KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) var kafkaYammerMetrics: KafkaYammerMetrics = null var metrics: Metrics = null @@ -217,7 +178,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ - def startup(): Unit = { + override def startup(): Unit = { try { info("starting") @@ -269,16 +230,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* create and configure metrics */ kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE kafkaYammerMetrics.configure(config.originals) - - val jmxReporter = new JmxReporter() - jmxReporter.configure(config.originals) - - val reporters = new util.ArrayList[MetricsReporter] - reporters.add(jmxReporter) - - val metricConfig = KafkaServer.metricConfig(config) - val metricsContext = createKafkaMetricsContext() - metrics = new Metrics(metricConfig, reporters, time, true, metricsContext) + metrics = Server.initializeMetrics(config, time, clusterId) /* register broker metrics */ _brokerTopicStats = new BrokerTopicStats @@ -421,7 +373,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) - AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) + AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info("started") } } @@ -441,22 +393,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = { - val metricsContext = createKafkaMetricsContext() + val metricsContext = Server.createKafkaMetricsContext(clusterId, config) metricsReporters.foreach { case x: MetricsReporter => x.contextChange(metricsContext) case _ => //do nothing } } - private[server] def createKafkaMetricsContext() : KafkaMetricsContext = { - val contextLabels = new util.HashMap[String, Object] - contextLabels.put(KAFKA_CLUSTER_ID, clusterId) - contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString) - contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)) - val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels) - metricsContext - } - protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = { new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager) @@ -691,7 +634,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP * Shutdown API for shutting down a single instance of the Kafka server. * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread */ - def shutdown(): Unit = { + override def shutdown(): Unit = { try { info("shutting down") @@ -776,7 +719,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP startupComplete.set(false) isShuttingDown.set(false) - CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this) + CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this) shutdownLatch.countDown() info("shut down completed") } @@ -792,7 +735,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /** * After calling shutdown(), use this API to wait until the shutdown is complete */ - def awaitShutdown(): Unit = shutdownLatch.await() + override def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager: LogManager = logManager diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala deleted file mode 100644 index d4f4c152db0a..000000000000 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.util.Properties - -import kafka.metrics.KafkaMetricsReporter -import kafka.utils.{Exit, Logging, VerifiableProperties} - -import scala.collection.Seq - -object KafkaServerStartable { - def fromProps(serverProps: Properties): KafkaServerStartable = { - fromProps(serverProps, None) - } - - def fromProps(serverProps: Properties, threadNamePrefix: Option[String]): KafkaServerStartable = { - val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) - new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters, threadNamePrefix) - } -} - -class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging { - private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix) - - def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty) - - def startup(): Unit = { - try server.startup() - catch { - case _: Throwable => - // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code - fatal("Exiting Kafka.") - Exit.exit(1) - } - } - - def shutdown(): Unit = { - try server.shutdown() - catch { - case _: Throwable => - fatal("Halting Kafka.") - // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. - Exit.halt(1) - } - } - - /** - * Allow setting broker state from the startable. - * This is needed when a custom kafka server startable want to emit new states that it introduces. - */ - def setServerState(newState: Byte): Unit = { - server.brokerState.newState(newState) - } - - def awaitShutdown(): Unit = server.awaitShutdown() - -} - - diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala new file mode 100644 index 000000000000..e22160b7b147 --- /dev/null +++ b/core/src/main/scala/kafka/server/Server.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.concurrent.TimeUnit + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor} +import org.apache.kafka.common.utils.Time + +trait Server { + def startup(): Unit + def shutdown(): Unit + def awaitShutdown(): Unit +} + +object Server { + + def initializeMetrics( + config: KafkaConfig, + time: Time, + clusterId: String + ): Metrics = { + val jmxReporter = new JmxReporter() + jmxReporter.configure(config.originals) + + val reporters = new java.util.ArrayList[MetricsReporter] + reporters.add(jmxReporter) + + val metricConfig = buildMetricsConfig(config) + val metricsContext = createKafkaMetricsContext(clusterId, config) + new Metrics(metricConfig, reporters, time, true, metricsContext) + } + + def buildMetricsConfig( + kafkaConfig: KafkaConfig + ): MetricConfig = { + new MetricConfig() + .samples(kafkaConfig.metricNumSamples) + .recordLevel(Sensor.RecordingLevel.forName(kafkaConfig.metricRecordingLevel)) + .timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS) + } + + val MetricsPrefix: String = "kafka.server" + private val ClusterIdLabel: String = "kafka.cluster.id" + private val BrokerIdLabel: String = "kafka.broker.id" + + private[server] def createKafkaMetricsContext( + clusterId: String, + config: KafkaConfig + ): KafkaMetricsContext = { + val contextLabels = new java.util.HashMap[String, Object] + contextLabels.put(ClusterIdLabel, clusterId) + contextLabels.put(BrokerIdLabel, config.brokerId.toString) + contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)) + val metricsContext = new KafkaMetricsContext(MetricsPrefix, contextLabels) + metricsContext + } +} diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 701b619ca4c6..acb1592a10af 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -116,7 +116,7 @@ class ZkAdminManager(val config: KafkaConfig, configs: Properties, assignments: Map[Int, Seq[Int]]): Unit = { metadataAndConfigs.get(topicName).foreach { result => - val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), configs) + val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(config), configs) val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _) val topicConfigs = logConfig.values.asScala.map { case (k, v) => val entry = createEntry(k, v) @@ -410,7 +410,7 @@ class ZkAdminManager(val config: KafkaConfig, if (metadataCache.contains(topic)) { // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) - val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps) + val logConfig = LogConfig.fromProps(LogConfig.extractLogConfigMap(config), topicProps) createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation)) } else { new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 49a759ae1c2b..9ac3f0f4904a 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -70,11 +70,10 @@ class TestRaftServer( socketServer.startup(startProcessingRequests = false) raftManager = new KafkaRaftManager[Array[Byte]]( - config.brokerId, + config, config.logDirs.head, new ByteArraySerde, partition, - config, time, metrics ) diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index bbacd95dc3b3..878398ae0ffa 100755 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -22,7 +22,15 @@ import java.util.Collections import scala.collection._ import kafka.message.{CompressionCodec, NoCompressionCodec} import scala.jdk.CollectionConverters._ +import kafka.utils.Implicits._ +object VerifiableProperties { + def apply(map: java.util.Map[String, AnyRef]): VerifiableProperties = { + val props = new Properties() + props ++= map.asScala + new VerifiableProperties(props) + } +} class VerifiableProperties(val props: Properties) extends Logging { private val referenceSet = mutable.HashSet[String]() diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index e34bd09faf95..e748679fb2e4 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -584,7 +584,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } // Verify that configs of existing logs have been updated - val newLogConfig = LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config)) + val newLogConfig = LogConfig(LogConfig.extractLogConfigMap(servers.head.config)) TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig == newLogConfig, "Config not updated in LogManager") diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index da7d21f4df7f..9c79027780a0 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -17,7 +17,7 @@ package kafka.log -import kafka.server.{KafkaConfig, KafkaServer, ThrottledReplicaListValidator} +import kafka.server.{KafkaConfig, ThrottledReplicaListValidator} import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM import org.apache.kafka.common.config.ConfigDef.Type.INT @@ -55,7 +55,7 @@ class LogConfigTest { kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2") val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - val logProps = KafkaServer.copyKafkaConfigToLog(kafkaConfig) + val logProps = LogConfig.extractLogConfigMap(kafkaConfig) assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentMsProp)) assertEquals(2 * millisInHour, logProps.get(LogConfig.SegmentJitterMsProp)) assertEquals(2 * millisInHour, logProps.get(LogConfig.RetentionMsProp)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala index 62d99c08b53e..3e11290037c4 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala @@ -76,7 +76,7 @@ object KafkaMetricReporterClusterIdTest { } class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { - var server: KafkaServerStartable = null + var server: KafkaServer = null var config: KafkaConfig = null @Before @@ -88,7 +88,7 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true") props.setProperty(KafkaConfig.BrokerIdProp, "-1") config = KafkaConfig.fromProps(props) - server = KafkaServerStartable.fromProps(props, threadNamePrefix = Option(this.getClass.getName)) + server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName)) server.startup() } diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala index 4e0c9f862afe..7b363c4020aa 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala @@ -60,7 +60,7 @@ object KafkaMetricsReporterTest { } class KafkaMetricsReporterTest extends ZooKeeperTestHarness { - var server: KafkaServerStartable = null + var server: KafkaServer = null var config: KafkaConfig = null @Before @@ -71,7 +71,7 @@ class KafkaMetricsReporterTest extends ZooKeeperTestHarness { props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true") props.setProperty(KafkaConfig.BrokerIdProp, "-1") config = KafkaConfig.fromProps(props) - server = KafkaServerStartable.fromProps(props, threadNamePrefix = Option(this.getClass.getName)) + server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName)) server.startup() } diff --git a/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala b/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala index 42229636c7c5..5e97d21ca67c 100755 --- a/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerMetricsTest.scala @@ -33,7 +33,7 @@ class ServerMetricsTest { for (recordingLevel <- recordingLevels) { props.put(KafkaConfig.MetricRecordingLevelProp, recordingLevel.name) val config = KafkaConfig.fromProps(props) - val metricConfig = KafkaServer.metricConfig(config) + val metricConfig = Server.buildMetricsConfig(config) assertEquals(recordingLevel, metricConfig.recordLevel) } @@ -41,7 +41,7 @@ class ServerMetricsTest { assertThrows(classOf[IllegalArgumentException], () => { props.put(KafkaConfig.MetricRecordingLevelProp, illegalName) val config = KafkaConfig.fromProps(props) - KafkaServer.metricConfig(config) + Server.buildMetricsConfig(config) }) }