Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default #11295

Merged
merged 4 commits into from Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Expand Up @@ -115,7 +115,7 @@ object ConfigCommand extends Config {
val zkClientConfig = ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile)
.getOrElse(new ZKClientConfig())
val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ConfigCommand")
val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.alterOpt))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Expand Up @@ -105,7 +105,7 @@ object ZkSecurityMigrator extends Logging {
val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator")
val enablePathCheck = opts.options.has(opts.enablePathCheckOpt)
val migrator = new ZkSecurityMigrator(zkClient)
migrator.run(enablePathCheck)
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
Expand Up @@ -95,24 +95,24 @@ object AclAuthorizer {
}
}

private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
None
new ZKClientConfig
else {
// start with the base config from the Kafka configuration
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
// add in any prefixed overlays
KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => {
val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp)
if (prefixedValue.isDefined)
zkClientConfig.get.setProperty(sysProp,
KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could remove the extra block defined by the last { on this line.

configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
(prefixedValue.get.toString.toUpperCase == "HTTPS").toString
(prefixedValue.toString.toUpperCase == "HTTPS").toString
else
prefixedValue.get.toString)
prefixedValue.toString)
}
}}
zkClientConfig
}
Expand Down Expand Up @@ -178,8 +178,8 @@ class AclAuthorizer extends Authorizer with Logging {
// createChrootIfNecessary=true is necessary in case we are running in a KRaft cluster
// because such a cluster will not create any chroot path in ZooKeeper (it doesn't connect to ZooKeeper)
zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"),
zkClientConfig = zkClientConfig, createChrootIfNecessary = true)
zkMaxInFlightRequests, time, name = "ACL authorizer", zkClientConfig = zkClientConfig,
metricGroup = "kafka.security", metricType = "AclAuthorizer", createChrootIfNecessary = true)
zkClient.createAclPaths()

extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1
Expand Down
49 changes: 21 additions & 28 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -336,7 +336,7 @@ object KafkaConfig {
ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")

private[kafka] def getZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
}

Expand All @@ -345,7 +345,7 @@ object KafkaConfig {
kafkaPropName match {
case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
case list: java.util.List[_] => list.asInstanceOf[java.util.List[_]].asScala.mkString(",")
case list: java.util.List[_] => list.asScala.mkString(",")
case _ => kafkaPropValue.toString
}
case _ => kafkaPropValue.toString
Expand All @@ -354,10 +354,10 @@ object KafkaConfig {

// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).map(_ == "true").getOrElse(false) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use exists here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, even better: contains. :)

zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
}

/** ********* General Configuration ***********/
Expand Down Expand Up @@ -1443,7 +1443,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Need to translate any system property value from true/false (String) to true/false (Boolean)
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getBoolean(propKey) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
val sysPropValue = KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some("true") => true
case Some(_) => false
Expand All @@ -1456,35 +1456,27 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getString(propKey) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some(_) => sysPropValue.get
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) match {
case Some(v) => v
case _ => getString(propKey) // not specified so use the default value
}
}
}

private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = {
Option(getString(propKey)) match {
case config: Some[String] => config
case _ => KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
Option(getString(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
}
}
private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = {
Option(getPassword(propKey)) match {
case config: Some[Password] => config
case _ => {
val sysProp = KafkaConfig.getZooKeeperClientProperty (zkClientConfigViaSystemProperties, propKey)
if (sysProp.isDefined) Some (new Password (sysProp.get) ) else None
}
Option(getPassword(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map(new Password(_))
}
}
private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = {
Option(getList(propKey)) match {
case config: Some[util.List[String]] => config
case _ => {
val sysProp = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
if (sysProp.isDefined) Some(sysProp.get.split("\\s*,\\s*").toList.asJava) else None
Option(getList(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map { sysProp =>
sysProp.split("\\s*,\\s*").toBuffer.asJava
}
}
}
Expand All @@ -1505,12 +1497,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Need to translate any system property value from true/false to HTTPS/<blank>
val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
val actuallyProvided = originals.containsKey(kafkaProp)
if (actuallyProvided) getString(kafkaProp) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp)
sysPropValue match {
if (actuallyProvided)
getString(kafkaProp)
else {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) match {
case Some("true") => "HTTPS"
case Some(_) => ""
case _ => getString(kafkaProp) // not specified so use the default value
case None => getString(kafkaProp) // not specified so use the default value
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Expand Up @@ -56,11 +56,9 @@ import scala.jdk.CollectionConverters._

object KafkaServer {

def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) =
if (!config.zkSslClientEnable && !forceZkSslClientEnable)
None
else {
val clientConfig = new ZKClientConfig()
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
val clientConfig = new ZKClientConfig
if (config.zkSslClientEnable || forceZkSslClientEnable) {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
Expand All @@ -75,8 +73,9 @@ object KafkaServer {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
Some(clientConfig)
}
clientConfig
}

val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000
}
Expand Down Expand Up @@ -144,7 +143,7 @@ class KafkaServer(
var metadataCache: ZkMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null

val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
private var _zkClient: KafkaZkClient = null
private var configRepository: ZkConfigRepository = null

Expand Down Expand Up @@ -454,7 +453,7 @@ class KafkaServer(
s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")

_zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig),
config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig,
createChrootIfNecessary = true)
_zkClient.createTopLevelPaths()
}
Expand Down
27 changes: 21 additions & 6 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Expand Up @@ -17,7 +17,6 @@
package kafka.zk

import java.util.Properties

import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
Expand All @@ -38,6 +37,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}

Expand Down Expand Up @@ -1940,18 +1940,33 @@ object KafkaZkClient {
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
name: String,
zkClientConfig: ZKClientConfig,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener",
name: Option[String] = None,
zkClientConfig: Option[ZKClientConfig] = None,
createChrootIfNecessary: Boolean = false
): KafkaZkClient = {

/* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB.
* This causes a regression if Kafka tries to retrieve a large amount of data across many
* znodes – in such a case the ZooKeeper client will repeatedly emit a message of the form
* "java.io.IOException: Packet len <####> is out of range".
*
* We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
* auto configures itself if certain system properties have been set).
*
* See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
* changed in 3.6.0.
*/
if (zkClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER) == null)
zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, ((4096 * 1024).toString))

if (createChrootIfNecessary) {
val chrootIndex = connectString.indexOf("/")
if (chrootIndex > 0) {
val zkConnWithoutChrootForChrootCreation = connectString.substring(0, chrootIndex)
val zkClientForChrootCreation = KafkaZkClient(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig)
val zkClientForChrootCreation = apply(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
connectionTimeoutMs, maxInFlightRequests, time, name, zkClientConfig, metricGroup, metricType)
try {
val chroot = connectString.substring(chrootIndex)
if (!zkClientForChrootCreation.pathExists(chroot)) {
Expand All @@ -1963,7 +1978,7 @@ object KafkaZkClient {
}
}
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
time, metricGroup, metricType, name, zkClientConfig)
time, metricGroup, metricType, zkClientConfig, name)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}

Expand Down
25 changes: 4 additions & 21 deletions core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
Expand Up @@ -61,24 +61,10 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String,
name: Option[String],
zkClientConfig: Option[ZKClientConfig]) extends Logging with KafkaMetricsGroup {

def this(connectString: String,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
metricGroup: String,
metricType: String) = {
this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None,
None)
}
private[zookeeper] val clientConfig: ZKClientConfig,
name: String) extends Logging with KafkaMetricsGroup {

this.logIdent = name match {
case Some(n) => s"[ZooKeeperClient $n] "
case _ => "[ZooKeeperClient] "
}
this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
Expand Down Expand Up @@ -109,13 +95,10 @@ class ZooKeeperClient(connectString: String,
}
}

private val clientConfig = zkClientConfig getOrElse new ZKClientConfig()

info(s"Initializing a new session to $connectString.")
// Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
private[zookeeper] def getClientConfig = clientConfig

newGauge("SessionState", () => connectionState.toString)

Expand Down Expand Up @@ -436,7 +419,7 @@ class ZooKeeperClient(connectString: String,
}, delayMs, period = -1L, unit = TimeUnit.MILLISECONDS)
}

private def threadPrefix: String = name.map(n => n.replaceAll("\\s", "") + "-").getOrElse("")
private def threadPrefix: String = name.replaceAll("\\s", "") + "-"

// package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/integration/kafka/api/SaslSetup.scala
Expand Up @@ -195,7 +195,7 @@ trait SaslSetup {
val zkClientConfig = new ZKClientConfig()
val zkClient = KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig)
val adminZkClient = new AdminZkClient(zkClient)

val entityType = ConfigType.User
Expand Down