From c5a3247a1a5aa869a76f1fe60d3e42a79e098e51 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 1 Feb 2024 20:50:54 +0800 Subject: [PATCH 1/2] MINOR: Improve testkit --- core/src/main/scala/kafka/server/KafkaBroker.scala | 1 + core/src/test/java/kafka/test/ClusterInstance.java | 5 +++++ .../test/junit/RaftClusterInvocationContext.java | 13 +++++++------ .../test/junit/ZkClusterInvocationContext.java | 5 +++++ .../java/kafka/testkit/KafkaClusterTestKit.java | 5 +++-- .../server/AllocateProducerIdsRequestTest.scala | 8 ++++---- .../server/ConsumerGroupHeartbeatRequestTest.scala | 6 +++--- .../src/test/scala/unit/kafka/utils/TestUtils.scala | 4 +++- 8 files changed, 31 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index e281087f12f5..8acbb5c2e950 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -97,6 +97,7 @@ trait KafkaBroker extends Logging { def credentialProvider: CredentialProvider def clientToControllerChannelManager: NodeToControllerChannelManager def tokenCache: DelegationTokenCache + def brokerFeatures: BrokerFeatures private val metricsGroup = new KafkaMetricsGroup(this.getClass) { // For backwards compatibility, we need to keep older metrics tied diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 96ce658c7a81..3d0736196953 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -19,6 +19,8 @@ import kafka.network.SocketServer; import kafka.server.BrokerFeatures; +import kafka.server.BrokerServer; +import kafka.server.KafkaBroker; import kafka.test.annotation.ClusterTest; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.network.ListenerName; @@ -28,6 +30,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Stream; public interface ClusterInstance { @@ -93,6 +96,8 @@ default Optional controlPlaneListenerName() { */ String bootstrapControllers(); + Stream brokers(); + /** * A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is * acting as the controller (since ZK controllers serve both broker and controller roles). diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index aa1cd8d136c6..ea1145e7c388 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -21,6 +21,7 @@ import kafka.server.BrokerFeatures; import kafka.server.BrokerServer; import kafka.server.ControllerServer; +import kafka.server.KafkaBroker; import kafka.test.ClusterConfig; import kafka.test.ClusterInstance; import kafka.testkit.KafkaClusterTestKit; @@ -154,7 +155,7 @@ public String bootstrapControllers() { @Override public Collection brokerSocketServers() { return brokers() - .map(BrokerServer::socketServer) + .map(KafkaBroker::socketServer) .collect(Collectors.toList()); } @@ -178,7 +179,7 @@ public Collection controllerSocketServers() { @Override public SocketServer anyBrokerSocketServer() { return brokers() - .map(BrokerServer::socketServer) + .map(KafkaBroker::socketServer) .findFirst() .orElseThrow(() -> new RuntimeException("No broker SocketServers found")); } @@ -195,14 +196,14 @@ public SocketServer anyControllerSocketServer() { public Map brokerFeatures() { return brokers().collect(Collectors.toMap( brokerServer -> brokerServer.config().nodeId(), - BrokerServer::brokerFeatures + KafkaBroker::brokerFeatures )); } @Override public String clusterId() { return controllers().findFirst().map(ControllerServer::clusterId).orElse( - brokers().findFirst().map(BrokerServer::clusterId).orElseThrow( + brokers().findFirst().map(KafkaBroker::clusterId).orElseThrow( () -> new RuntimeException("No controllers or brokers!")) ); } @@ -299,8 +300,8 @@ private BrokerServer findBrokerOrThrow(int brokerId) { .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); } - public Stream brokers() { - return clusterReference.get().brokers().values().stream(); + public Stream brokers() { + return clusterReference.get().brokers().values().stream().map(b -> b); } public Stream controllers() { diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index ccfceade7bf3..05f3f553bc36 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -20,6 +20,7 @@ import kafka.api.IntegrationTestHarness; import kafka.network.SocketServer; import kafka.server.BrokerFeatures; +import kafka.server.KafkaBroker; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.test.ClusterConfig; @@ -347,5 +348,9 @@ private KafkaServer findBrokerOrThrow(int brokerId) { public Stream servers() { return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream(); } + + public Stream brokers() { + return JavaConverters.asJavaCollection(clusterReference.get().brokers()).stream(); + } } } diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 4c6d1f9ef4ad..71a0932588a0 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -192,8 +192,9 @@ private KafkaConfig createNodeConfig(TestKitNode node) { collect(Collectors.joining(",")); props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString); - // reduce log cleaner offset map memory usage - props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); + // reduce log cleaner offset map memory usage, must be at greater than 1MB per cleaner thread, set to 2M+2 so that + // we can set 2 cleaner threads. + props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097154"); // Add associated broker node property overrides if (brokerNode != null) { diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index 5cb59573d1ad..c5b30f8c497e 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -41,13 +41,13 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] val sourceBroker = raftCluster.brokers.findFirst().get() - val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerId = sourceBroker.asInstanceOf[BrokerServer].raftManager.leaderAndEpoch.leaderId().getAsInt val controllerServer = raftCluster.controllers() .filter(_.config.nodeId == controllerId) .findFirst() .get() - val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer) + val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker.asInstanceOf[BrokerServer], controllerServer) assertEquals(Errors.NONE, allocateResponse.error) assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, allocateResponse.data.producerIdLen) assertTrue(allocateResponse.data.producerIdStart >= 0) @@ -58,13 +58,13 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] val sourceBroker = raftCluster.brokers.findFirst().get() - val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerId = sourceBroker.asInstanceOf[BrokerServer].raftManager.leaderAndEpoch.leaderId().getAsInt val controllerServer = raftCluster.controllers() .filter(_.config.nodeId != controllerId) .findFirst() .get() - val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer) + val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker.asInstanceOf[BrokerServer], controllerServer) assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(allocateResponse.data.errorCode)) } diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 250cc83aa636..ba05f52e132c 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -62,7 +62,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + brokers = raftCluster.brokers.collect(Collectors.toList[KafkaBroker]).asScala, controllers = raftCluster.controllerServers().asScala.toSeq ) @@ -151,7 +151,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + brokers = raftCluster.brokers.collect(Collectors.toList[KafkaBroker]).asScala, controllers = raftCluster.controllerServers().asScala.toSeq ) @@ -267,7 +267,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + brokers = raftCluster.brokers.collect(Collectors.toList[KafkaBroker]).asScala, controllers = raftCluster.controllerServers().asScala.toSeq ) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 17ee3139a51f..3b04198d8e04 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -373,7 +373,9 @@ object TestUtils extends Logging { props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString) props.put(KafkaConfig.LogDeleteDelayMsProp, "1000") props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100") - props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152") + // reduce log cleaner offset map memory usage, must be at greater than 1MB per cleaner thread, set to 2M+2 so that + // we can set 2 cleaner threads. + props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097154") props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") if (!props.containsKey(KafkaConfig.OffsetsTopicPartitionsProp)) props.put(KafkaConfig.OffsetsTopicPartitionsProp, "5") From 3c6604b246f25acc6428d22f7324f11f80c55286 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 1 Feb 2024 20:56:23 +0800 Subject: [PATCH 2/2] KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by kafka-configs.sh --- .../scala/kafka/admin/ConfigCommand.scala | 72 +++-- .../admin/ConfigCommandIntegrationTest.scala | 281 ++++++++++++++++-- .../unit/kafka/admin/ConfigCommandTest.scala | 4 +- docs/upgrade.html | 41 +-- 4 files changed, 314 insertions(+), 84 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index bcbeedc468e4..dcb6923f4840 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import java.nio.charset.StandardCharsets -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ExecutionException, TimeUnit} import java.util.{Collections, Properties} import joptsimple._ import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig} @@ -29,7 +29,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.config.types.Password -import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.security.JaasUtils @@ -349,7 +349,7 @@ object ConfigCommand extends Logging { } @nowarn("cat=deprecation") - private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { + private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions, useIncrementalAlterConfigs: Boolean = true): Unit = { val entityTypes = opts.entityTypes val entityNames = opts.entityNames val entityTypeHead = entityTypes.head @@ -357,9 +357,10 @@ object ConfigCommand extends Logging { val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) } val configsToBeDeleted = parseConfigsToBeDeleted(opts) + var logSucceedMsg = true entityTypeHead match { - case ConfigType.TOPIC => + case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER if useIncrementalAlterConfigs => val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false) .map { entry => (entry.name, entry) }.toMap @@ -368,14 +369,37 @@ object ConfigCommand extends Logging { if (invalidConfigs.nonEmpty) throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") - val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead) + val configResourceType = entityTypeHead match { + case ConfigType.TOPIC => ConfigResource.Type.TOPIC + case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS + case ConfigType.BROKER => ConfigResource.Type.BROKER + } + val configResource = new ConfigResource(configResourceType, entityNameHead) val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } ).asJavaCollection - adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) - case ConfigType.BROKER => + var retryUsingDeprecatedAlterConfigs = false + try { + adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) + } catch { + case e: ExecutionException => + e.getCause match { + case _: UnsupportedVersionException if entityTypeHead == ConfigType.BROKER => + retryUsingDeprecatedAlterConfigs = true + logSucceedMsg = false + case _ => throw e + } + case e: Throwable => throw e + } + + if (retryUsingDeprecatedAlterConfigs) { + // KIP-1011: Retry using deprecated alterConfigs + alterConfig(adminClient, opts, useIncrementalAlterConfigs = false) + } + + case ConfigType.BROKER if !useIncrementalAlterConfigs=> val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false) .map { entry => (entry.name, entry) }.toMap @@ -405,7 +429,7 @@ object ConfigCommand extends Logging { val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } - ).asJavaCollection + ).asJavaCollection adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) case ConfigType.USER | ConfigType.CLIENT => @@ -448,28 +472,20 @@ object ConfigCommand extends Logging { if (unknownConfigs.nonEmpty) throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}") alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted) - case ConfigType.CLIENT_METRICS => - val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false) - .map { entry => (entry.name, entry) }.toMap - - // fail the command if any of the configs to be deleted does not exist - val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) - if (invalidConfigs.nonEmpty) - throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") - - val configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead) - val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) - val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) - ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } - ).asJavaCollection - adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) - case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead") + case _ => + if (useIncrementalAlterConfigs) { + throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead") + } else { + throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead when updating using deprecated alterConfigs, only ${ConfigType.BROKER} is supported") + } } - if (entityNameHead.nonEmpty) - println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.") - else - println(s"Completed updating default config for $entityTypeHead in the cluster.") + if (logSucceedMsg) { + if (entityNameHead.nonEmpty) + println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.") + else + println(s"Completed updating default config for $entityTypeHead in the cluster.") + } } private def alterUserScramCredentialConfigs(adminClient: Admin, user: String, scramConfigsToAddMap: Map[String, ConfigEntry], scramConfigsToDelete: Seq[String]) = { diff --git a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala index 6335e978533b..980f2bf1463e 100644 --- a/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala @@ -18,39 +18,54 @@ package kafka.admin import kafka.admin.ConfigCommand.ConfigCommandOptions import kafka.cluster.{Broker, EndPoint} -import kafka.server.{KafkaConfig, QuorumTestHarness} -import kafka.utils.{Exit, Logging, TestInfoUtils} +import kafka.server.{KafkaBroker, KafkaConfig} +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance +import kafka.test.ClusterInstance +import kafka.utils.{Exit, TestUtils} import kafka.zk.{AdminZkClient, BrokerInfo} -import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.clients.admin.{AlterConfigOp, AlterConfigsOptions, AlterConfigsResult, Config} +import org.apache.kafka.common.KafkaFuture +import org.apache.kafka.common.config.{ConfigException, ConfigResource} +import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException} +import org.apache.kafka.common.internals.KafkaFutureImpl import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.config.ConfigEntityName import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.Tag +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito +import java.util.{Collection, Collections, Map => JMap} +import java.util.concurrent.ExecutionException +import java.util.stream.Collectors +import scala.annotation.nowarn import scala.collection.Seq import scala.jdk.CollectionConverters._ -class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@Tag("integration") +class ConfigCommandIntegrationTest(cluster: ClusterInstance) { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(quorum: String): Unit = { + @ClusterTest(clusterType = Type.ZK) + def shouldExitWithNonZeroStatusOnUpdatingUnallowedConfigViaZk(zkCluster: ClusterInstance): Unit = { assertNonZeroStatusExit(Array( - "--zookeeper", zkConnect, + "--zookeeper", zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect, "--entity-name", "1", "--entity-type", "brokers", "--alter", "--add-config", "security.inter.broker.protocol=PLAINTEXT")) } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(quorum: String): Unit = { + @ClusterTest(clusterType = Type.ZK) + def shouldExitWithNonZeroStatusOnZkCommandAlterUserQuota(zkCluster: ClusterInstance): Unit = { assertNonZeroStatusExit(Array( - "--zookeeper", zkConnect, + "--zookeeper", zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect, "--entity-type", "users", "--entity-name", "admin", "--alter", "--add-config", "consumer_byte_rate=20000")) @@ -74,10 +89,11 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { assertEquals(Some(1), exitStatus) } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) - def testDynamicBrokerConfigUpdateUsingZooKeeper(quorum: String): Unit = { - val brokerId = "1" + @ClusterTest(clusterType = Type.ZK) + def testDynamicBrokerConfigUpdateUsingZooKeeper(zkCluster: ClusterInstance): Unit = { + val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient + val zkConnect = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect + val brokerId = cluster.anyBrokerSocketServer().config.brokerId.toString val adminZkClient = new AdminZkClient(zkClient) val alterOpts = Array("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter") @@ -109,6 +125,9 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { verifyConfig(Map.empty, brokerId) } + // Broker configuration operations using ZooKeeper are only supported if the affected broker(s) are not running. + cluster.shutdownBroker(brokerId.toInt) + // Add config alterAndVerifyConfig(Map("message.max.size" -> "110000"), Some(brokerId)) alterAndVerifyConfig(Map("message.max.size" -> "120000"), None) @@ -162,7 +181,7 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { assertThrows(classOf[ConfigException], () => alterConfigWithZk(configs, None, encoderConfigs)) // Dynamic config updates using ZK should fail if broker is running. - registerBrokerInZk(brokerId.toInt) + registerBrokerInZk(zkClient, brokerId.toInt) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "210000"), Some(brokerId))) assertThrows(classOf[IllegalArgumentException], () => alterConfigWithZk(Map("message.max.size" -> "220000"), None)) @@ -170,11 +189,233 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging { alterAndVerifyConfig(Map("message.max.size" -> "230000"), Some("2")) } - private def registerBrokerInZk(id: Int): Unit = { + private def registerBrokerInZk(zkClient: kafka.zk.KafkaZkClient, id: Int): Unit = { zkClient.createTopLevelPaths() val securityProtocol = SecurityProtocol.PLAINTEXT val endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) val brokerInfo = BrokerInfo(Broker(id, Seq(endpoint), rack = None), MetadataVersion.latestTesting, jmxPort = 9192) zkClient.registerBroker(brokerInfo) } + + @ClusterTest + def testUpdateInvalidBrokersConfig(): Unit = { + checkInvalidBrokerConfig(None) + checkInvalidBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkInvalidBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + for (incremental <- Array(true, false)) { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "invalid=2", + "--entity-type", "brokers") + ++ entityNameParams + ), incremental) + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))) + // We will treat unknown config as sensitive + assertTrue(describeResult.contains("sensitive=true")) + // Sensitive config will not return + assertTrue(describeResult.contains("invalid=null")) + } + } + + @ClusterTest + def testUpdateInvalidTopicConfig(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + assertInstanceOf( + classOf[InvalidConfigurationException], + assertThrows( + classOf[ExecutionException], + () => ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "invalid=2", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))).getCause + ) + } + + @ClusterTest + def testUpdateAndDeleteBrokersConfig(): Unit = { + checkBrokerConfig(None) + checkBrokerConfig(Some(cluster.anyBrokerSocketServer().config.brokerId.toString)) + } + + private def checkBrokerConfig(entityNameOrDefault: Option[String]): Unit = { + val entityNameParams = entityNameOrDefault.map(name => Array("--entity-name", name)).getOrElse(Array("--entity-default")) + // add -> check -> delete -> check + for (incremental <- Array(true, false)) { + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers") + ++ entityNameParams + ), incremental) + TestUtils.waitUntilTrue( + () => cluster.brokerSocketServers().asScala.forall(broker => broker.config.getInt("log.cleaner.threads") == 2), + "Timeout waiting for topic config propagating to broker") + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))) + assertTrue(describeResult.contains("log.cleaner.threads=2")) + assertTrue(describeResult.contains("sensitive=false")) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--delete-config", "log.cleaner.threads", + "--entity-type", "brokers") + ++ entityNameParams + ), incremental) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.config.getInt("log.cleaner.threads") != 2), + "Timeout waiting for topic config propagating to broker") + + assertFalse(TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--describe", + "--entity-type", "brokers") + ++ entityNameParams + ))).contains("log.cleaner.threads")) + } + } + + @ClusterTest + def testUpdateConfigAndDeleteTopicConfig(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + // add -> check -> delete -> check + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "segment.bytes=10240000", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + )) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") == 10240000), + "Timeout waiting for topic config propagating to broker") + + val describeResult = TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--describe", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))) + assertTrue(describeResult.contains("segment.bytes=10240000")) + assertTrue(describeResult.contains("sensitive=false")) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--delete-config", "segment.bytes", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + )) + TestUtils.waitUntilTrue( + () => cluster.brokers().collect(Collectors.toList[KafkaBroker]).asScala.forall(broker => broker.logManager.logsByTopic("test-config-topic").head.config.getInt("segment.bytes") != 10240000), + "Timeout waiting for topic config propagating to broker") + + assertFalse(TestUtils.grabConsoleOutput( + ConfigCommand.describeConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--describe", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ))).contains("segment.bytes")) + } + + @ClusterTest + def testCannotUpdateTopicConfigUsingDeprecatedAlterConfigs(): Unit = { + TestUtils.createTopicWithAdminRaw( + admin = cluster.createAdminClient(), + topic = "test-config-topic", + ) + assertThrows(classOf[IllegalArgumentException], () => ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "segment.bytes=10240000", + "--entity-type", "topics", + "--entity-name", "test-config-topic") + ), useIncrementalAlterConfigs = false)) + } + + @ClusterTest + def testUpdateBrokerConfigNotAffectedByInvalidConfig(): Unit = { + // Test case from KAFKA-13788 + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "log.cleaner.threadzz=2", + "--entity-type", "brokers", + "--entity-default") + )) + + ConfigCommand.alterConfig(cluster.createAdminClient(), new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers", + "--entity-default") + )) + TestUtils.waitUntilTrue( + () => cluster.brokerSocketServers().asScala.forall(broker => broker.config.getInt("log.cleaner.threads") == 2), + "Timeout waiting for topic config propagating to broker") + } + + @nowarn("cat=deprecation") + @ClusterTest(clusterType=Type.ZK, metadataVersion = MetadataVersion.IBP_2_2_IV0) + def testFallbackToDeprecatedAlterConfigs(): Unit = { + val spyAdmin = Mockito.spy(cluster.createAdminClient()) + + val mockResult = { + val future = new KafkaFutureImpl[Void]() + future.completeExceptionally(new UnsupportedVersionException("simulated error")) + + val constructor = classOf[AlterConfigsResult].getDeclaredConstructor(classOf[JMap[ConfigResource, KafkaFuture[Void]]]) + constructor.setAccessible(true) + val result = constructor.newInstance(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), future)) + constructor.setAccessible(false) + result + } + Mockito.doReturn(mockResult).when(spyAdmin) + .incrementalAlterConfigs(any(classOf[JMap[ConfigResource, Collection[AlterConfigOp]]]), any(classOf[AlterConfigsOptions])) + + ConfigCommand.alterConfig(spyAdmin, new ConfigCommandOptions( + Array("--bootstrap-server", cluster.bootstrapServers(), + "--alter", + "--add-config", "log.cleaner.threads=2", + "--entity-type", "brokers", + "--entity-default") + )) + + Mockito.verify(spyAdmin).alterConfigs(any(classOf[JMap[ConfigResource, Config]]), any(classOf[AlterConfigsOptions])) + + TestUtils.waitUntilTrue( + () => cluster.brokerSocketServers().asScala.forall(broker => broker.config.getInt("log.cleaner.threads") == 2), + "Timeout waiting for topic config propagating to broker") + } } diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 085a2fd649a2..0258e0be97ea 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -1128,13 +1128,13 @@ class ConfigCommandTest extends Logging { describeResult } - override def alterConfigs(configs: util.Map[ConfigResource, Config], options: AlterConfigsOptions): AlterConfigsResult = { + override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]], options: AlterConfigsOptions): AlterConfigsResult = { assertEquals(1, configs.size) val entry = configs.entrySet.iterator.next val resource = entry.getKey val config = entry.getValue assertEquals(ConfigResource.Type.BROKER, resource.`type`) - config.entries.forEach { e => brokerConfigs.put(e.name, e.value) } + config.asScala.map(_.configEntry()).foreach { e => brokerConfigs.put(e.name, e.value) } alterResult } } diff --git a/docs/upgrade.html b/docs/upgrade.html index 33009f062e27..c04c3230ff1b 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -104,6 +104,12 @@
Notable changes in 3
  • All the notable changes are present in the blog post announcing the 3.7.0 release.
  • + +
  • kafka-configs.sh now uses incrementalAlterConfigs API by default to alter broker configurations instead of the deprecated alterConfigs API, + and it will fall back to use deprecated alterConfigs API if the broker doesn't support incrementalAlterConfigs API, which means the broker version is prior to 2.3.x. + This new fallback approach is deprecated and will be removed in the next major release when incrementalAlterConfigs API is always used. + See KIP-1011 for more details. +
  • Upgrading to 3.6.1 from any version 0.8.x through 3.5.x

    @@ -149,7 +155,7 @@
    Upgrading ZooKeeper-based clus -
    Upgrading KRaft-based clusters
    +
    Upgrading KRaft-based clusters

    If you are upgrading from a version prior to 3.3.0, please see the note in step 3 below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.

    For a rolling upgrade:

    @@ -210,39 +216,6 @@
    Notable changes in 3 -

    Upgrading to 3.5.2 from any version 0.8.x through 3.4.x

    - All upgrade steps remain same as upgrading to 3.5.0 -
    Notable changes in 3.5.2
    -
      -
    • - When migrating producer ID blocks from ZK to KRaft, there could be duplicate producer IDs being given to - transactional or idempotent producers. This can cause long term problems since the producer IDs are - persisted and reused for a long time. - See KAFKA-15552 for more details. -
    • -
    • - In 3.5.0 and 3.5.1, there could be an issue that the empty ISR is returned from controller after AlterPartition request - during rolling upgrade. This issue will impact the availability of the topic partition. - See KAFKA-15353 for more details. -
    • -
    - -

    Upgrading to 3.5.1 from any version 0.8.x through 3.4.x

    - All upgrade steps remain same as upgrading to 3.5.0 -
    Notable changes in 3.5.1
    -
      -
    • - Upgraded the dependency, snappy-java, to a version which is not vulnerable to - CVE-2023-34455. - You can find more information about the CVE at Kafka CVE list. -
    • -
    • - Fixed a regression introduced in 3.3.0, which caused security.protocol configuration values to be restricted to - upper case only. After the fix, security.protocol values are case insensitive. - See KAFKA-15053 for details. -
    • -
    -

    Upgrading to 3.5.0 from any version 0.8.x through 3.4.x

    Upgrading ZooKeeper-based clusters