Skip to content

Commit

Permalink
KAFKA-16181: Use incrementalAlterConfigs when updating broker configs…
Browse files Browse the repository at this point in the history
… by kafka-configs.sh
  • Loading branch information
dengziming committed Feb 1, 2024
1 parent 6ba4a19 commit efc3eaf
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 57 deletions.
46 changes: 23 additions & 23 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
Expand Down Expand Up @@ -340,7 +340,7 @@ object ConfigCommand extends Logging {

try {
if (opts.options.has(opts.alterOpt))
alterConfig(adminClient, opts)
alterConfig(adminClient, opts, true)
else if (opts.options.has(opts.describeOpt))
describeConfig(adminClient, opts)
} finally {
Expand All @@ -349,7 +349,7 @@ object ConfigCommand extends Logging {
}

@nowarn("cat=deprecation")
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions, useIncrementalAlterConfigs: Boolean): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
val entityTypeHead = entityTypes.head
Expand All @@ -359,7 +359,7 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)

entityTypeHead match {
case ConfigType.TOPIC =>
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER if useIncrementalAlterConfigs =>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

Expand All @@ -368,14 +368,29 @@ object ConfigCommand extends Logging {
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead)
val configResourceType = entityTypeHead match {
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
case ConfigType.BROKER => ConfigResource.Type.BROKER
}
val configResource = new ConfigResource(configResourceType, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.BROKER =>
var retryUsingDeprecatedAlterConfigs = false
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().whenComplete((_, e) => {
if (e.isInstanceOf[UnsupportedVersionException] && entityTypeHead == ConfigType.BROKER) {
retryUsingDeprecatedAlterConfigs = true
}
}).get(60, TimeUnit.SECONDS)
if (retryUsingDeprecatedAlterConfigs) {
// KIP-1011: Retry using deprecated alterConfigs
alterConfig(adminClient, opts, false)
}

case ConfigType.BROKER if !useIncrementalAlterConfigs=>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

Expand Down Expand Up @@ -405,7 +420,7 @@ object ConfigCommand extends Logging {
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.USER | ConfigType.CLIENT =>
Expand Down Expand Up @@ -448,21 +463,6 @@ object ConfigCommand extends Logging {
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
case ConfigType.CLIENT_METRICS =>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

// fail the command if any of the configs to be deleted does not exist
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,47 @@ package kafka.admin

import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.cluster.{Broker, EndPoint}
import kafka.server.{KafkaConfig, QuorumTestHarness}
import kafka.utils.{Exit, Logging, TestInfoUtils}
import kafka.server.{KafkaBroker, KafkaConfig}
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.test.ClusterInstance
import kafka.utils.{Exit, TestUtils}
import kafka.zk.{AdminZkClient, BrokerInfo}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ConfigEntityName
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.Tag

import java.util.concurrent.ExecutionException
import java.util.stream.Collectors
import scala.collection.Seq
import scala.jdk.CollectionConverters._

class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
@Tag("integration")
class ConfigCommandIntegrationTest2(cluster: ClusterInstance) {

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(quorum: String): Unit = {
@ClusterTest(clusterType = Type.ZK)
def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(zkCluster: ClusterInstance): Unit = {
assertNonZeroStatusExit(Array(
"--zookeeper", zkConnect,
"--zookeeper", zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect,
"--entity-name", "1",
"--entity-type", "brokers",
"--alter",
"--add-config", "security.inter.broker.protocol=PLAINTEXT"))
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(quorum: String): Unit = {
@ClusterTest(clusterType = Type.ZK)
def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(zkCluster: ClusterInstance): Unit = {
assertNonZeroStatusExit(Array(
"--zookeeper", zkConnect,
"--zookeeper", zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect,
"--entity-type", "users",
"--entity-name", "admin",
"--alter", "--add-config", "consumer_byte_rate=20000"))
Expand All @@ -74,10 +82,11 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
assertEquals(Some(1), exitStatus)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
def testDynamicBrokerConfigUpdateUsingZooKeeper(quorum: String): Unit = {
val brokerId = "1"
@ClusterTest(clusterType = Type.ZK)
def testDynamicBrokerConfigUpdateUsingZooKeeper(zkCluster: ClusterInstance): Unit = {
val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
val zkConnect = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect
val brokerId = cluster.anyBrokerSocketServer().config.brokerId.toString
val adminZkClient = new AdminZkClient(zkClient)
val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter")

Expand Down Expand Up @@ -109,6 +118,9 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
verifyConfig(Map.empty, brokerId)
}

// Broker configuration operations using ZooKeeper are only supported if the affected broker(s) are not running.
cluster.shutdownBroker(brokerId.toInt)

// Add config
alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId))
alterAndVerifyConfig(Map("message.max.size" -> "120000"), None)
Expand Down Expand Up @@ -162,19 +174,198 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, None, encoderConfigs))

// Dynamic config updates using ZK should fail if broker is running.
registerBrokerInZk(brokerId.toInt)
registerBrokerInZk(zkClient, brokerId.toInt)
assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId)))
assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "220000"), None))

// Dynamic config updates using ZK should for a different broker that is not running should succeed
alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2"))
}

private def registerBrokerInZk(id: Int): Unit = {
private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): Unit = {
zkClient.createTopLevelPaths()
val securityProtocol = SecurityProtocol.PLAINTEXT
val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192)
zkClient.registerBroker(brokerInfo)
}

@ClusterTest
def testUpdateInvalidBrokersConfig(): Unit = {
checkInvalidBrokerConfig(None)
checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
}

private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): Unit = {
for (incremental <- Array(true, false)) {
val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default"))
ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "invalid=2",
"--entity-type", "brokers")
++ entityNameParams
), incremental)

val describeResult = TestUtils.grabConsoleOutput(
ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--describe",
"--entity-type", "brokers")
++ entityNameParams
)))
// We will treat unknown config as sensitive
assertTrue(describeResult.contains("sensitive=true"))
// Sensitive config will not return
assertTrue(describeResult.contains("invalid=null"))
}
}

@ClusterTest
def testUpdateInvalidTopicConfig(): Unit = {
TestUtils.createTopicWithAdminRaw(
admin = cluster.createAdminClient(),
topic = "test-config-topic",
)
assertInstanceOf(
classOf[InvalidConfigurationException],
assertThrows(
classOf[ExecutionException],
() => ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "invalid=2",
"--entity-type", "topics",
"--entity-name", "test-config-topic")
), true)).getCause
)
}

@ClusterTest
def testUpdateAndDeleteBrokersConfig(): Unit = {
checkBrokerConfig(None)
checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString))
}

private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = {
val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default"))
ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "log.cleaner.threads=2",
"--entity-type", "brokers")
++ entityNameParams
), true)
TestUtils.waitUntilTrue(
() => cluster.brokerSocketServers().asScala.forall(broker => broker.config.getInt("log.cleaner.threads") == 2),
"Timeout waiting for topic config propagating to broker")

val describeResult = TestUtils.grabConsoleOutput(
ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--describe",
"--entity-type", "brokers")
++ entityNameParams
)))
assertTrue(describeResult.contains("log.cleaner.threads=2"))
assertTrue(describeResult.contains("sensitive=false"))

ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--delete-config", "log.cleaner.threads",
"--entity-type", "brokers")
++ entityNameParams
), true)
TestUtils.waitUntilTrue(
() => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.config.getInt("log.cleaner.threads") != 2),
"Timeout waiting for topic config propagating to broker")

assertFalse(TestUtils.grabConsoleOutput(
ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--describe",
"--entity-type", "brokers")
++ entityNameParams
))).contains("log.cleaner.threads"))
}

@ClusterTest
def testUpdateConfigAndDeleteTopicConfig(): Unit = {
TestUtils.createTopicWithAdminRaw(
admin = cluster.createAdminClient(),
topic = "test-config-topic",
)
ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "segment.bytes=10240000",
"--entity-type", "topics",
"--entity-name", "test-config-topic")
), true)
TestUtils.waitUntilTrue(
() => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") == 10240000),
"Timeout waiting for topic config propagating to broker")

val describeResult = TestUtils.grabConsoleOutput(
ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--describe",
"--entity-type", "topics",
"--entity-name", "test-config-topic")
)))
assertTrue(describeResult.contains("segment.bytes=10240000"))
assertTrue(describeResult.contains("sensitive=false"))

ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--delete-config", "segment.bytes",
"--entity-type", "topics",
"--entity-name", "test-config-topic")
), true)
TestUtils.waitUntilTrue(
() => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") != 10240000),
"Timeout waiting for topic config propagating to broker")

assertFalse(TestUtils.grabConsoleOutput(
ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--describe",
"--entity-type", "topics",
"--entity-name", "test-config-topic")
))).contains("segment.bytes"))
}

@ClusterTest
def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = {
// Test case from KAFKA-13788
ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "log.cleaner.threadzz=2",
"--entity-type", "brokers",
"--entity-default")
), true)

ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "log.cleaner.threads=2",
"--entity-type", "brokers",
"--entity-default")
), true)
}

// TODO this test doesn't make sense because we can't produce `UnsupportedVersionException` by setting inter.broker.protocol.version
@ClusterTest(clusterType=Type.ZK, metadataVersion = MetadataVersion.IBP_3_2_IV0)
def testFallbackToDeprecatedAlterConfigs(): Unit = {
ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions(
Array("--bootstrap-server", s"${cluster.bootstrapServers()}",
"--alter",
"--add-config", "log.cleaner.threads=2",
"--entity-type", "brokers",
"--entity-default")
), true)
}
}
Loading

0 comments on commit efc3eaf

Please sign in to comment.