Skip to content

Commit

Permalink
KAFKA-16181: Use incrementalAlterConfigs when updating broker configs…
Browse files Browse the repository at this point in the history
… by kafka-configs.sh
  • Loading branch information
dengziming committed Mar 6, 2024
1 parent c5a3247 commit 3c6604b
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 84 deletions.
72 changes: 44 additions & 28 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.admin

import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
Expand All @@ -29,7 +29,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
Expand Down Expand Up @@ -349,17 +349,18 @@ object ConfigCommand extends Logging {
}

@nowarn("cat=deprecation")
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions, useIncrementalAlterConfigs: Boolean = true): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
val entityTypeHead = entityTypes.head
val entityNameHead = entityNames.head
val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability
val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
var logSucceedMsg = true

entityTypeHead match {
case ConfigType.TOPIC =>
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER if useIncrementalAlterConfigs =>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

Expand All @@ -368,14 +369,37 @@ object ConfigCommand extends Logging {
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead)
val configResourceType = entityTypeHead match {
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
case ConfigType.BROKER => ConfigResource.Type.BROKER
}
val configResource = new ConfigResource(configResourceType, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.BROKER =>
var retryUsingDeprecatedAlterConfigs = false
try {
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
} catch {
case e: ExecutionException =>
e.getCause match {
case _: UnsupportedVersionException if entityTypeHead == ConfigType.BROKER =>
retryUsingDeprecatedAlterConfigs = true
logSucceedMsg = false
case _ => throw e
}
case e: Throwable => throw e
}

if (retryUsingDeprecatedAlterConfigs) {
// KIP-1011: Retry using deprecated alterConfigs
alterConfig(adminClient, opts, useIncrementalAlterConfigs = false)
}

case ConfigType.BROKER if !useIncrementalAlterConfigs=>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

Expand Down Expand Up @@ -405,7 +429,7 @@ object ConfigCommand extends Logging {
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.USER | ConfigType.CLIENT =>
Expand Down Expand Up @@ -448,28 +472,20 @@ object ConfigCommand extends Logging {
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
case ConfigType.CLIENT_METRICS =>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

// fail the command if any of the configs to be deleted does not exist
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
case _ =>
if (useIncrementalAlterConfigs) {
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
} else {
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead when updating using deprecated alterConfigs, only ${ConfigType.BROKER} is supported")
}
}

if (entityNameHead.nonEmpty)
println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
else
println(s"Completed updating default config for $entityTypeHead in the cluster.")
if (logSucceedMsg) {
if (entityNameHead.nonEmpty)
println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
else
println(s"Completed updating default config for $entityTypeHead in the cluster.")
}
}

private def alterUserScramCredentialConfigs(adminClient: Admin, user: String, scramConfigsToAddMap: Map[String, ConfigEntry], scramConfigsToDelete: Seq[String]) = {
Expand Down
Loading

0 comments on commit 3c6604b

Please sign in to comment.