Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by kafka-configs.sh #15304

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 44 additions & 28 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.admin

import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
Expand All @@ -29,7 +29,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
Expand Down Expand Up @@ -349,17 +349,18 @@ object ConfigCommand extends Logging {
}

@nowarn("cat=deprecation")
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions, useIncrementalAlterConfigs: Boolean = true): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
val entityTypeHead = entityTypes.head
val entityNameHead = entityNames.head
val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala.toMap // no need for mutability
val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
var logSucceedMsg = true

entityTypeHead match {
case ConfigType.TOPIC =>
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER if useIncrementalAlterConfigs =>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

Expand All @@ -368,14 +369,37 @@ object ConfigCommand extends Logging {
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead)
val configResourceType = entityTypeHead match {
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
case ConfigType.BROKER => ConfigResource.Type.BROKER
}
val configResource = new ConfigResource(configResourceType, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.BROKER =>
var retryUsingDeprecatedAlterConfigs = false
try {
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
} catch {
case e: ExecutionException =>
e.getCause match {
case _: UnsupportedVersionException if entityTypeHead == ConfigType.BROKER =>
retryUsingDeprecatedAlterConfigs = true
logSucceedMsg = false
case _ => throw e
}
case e: Throwable => throw e
}

if (retryUsingDeprecatedAlterConfigs) {
// KIP-1011: Retry using deprecated alterConfigs
alterConfig(adminClient, opts, useIncrementalAlterConfigs = false)
}

case ConfigType.BROKER if !useIncrementalAlterConfigs=>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

Expand Down Expand Up @@ -405,7 +429,7 @@ object ConfigCommand extends Logging {
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
).asJavaCollection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change maybe not needed!

adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.USER | ConfigType.CLIENT =>
Expand Down Expand Up @@ -448,28 +472,20 @@ object ConfigCommand extends Logging {
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
case ConfigType.CLIENT_METRICS =>
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

// fail the command if any of the configs to be deleted does not exist
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
case _ =>
if (useIncrementalAlterConfigs) {
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
} else {
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead when updating using deprecated alterConfigs, only ${ConfigType.BROKER} is supported")
}
}

if (entityNameHead.nonEmpty)
println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
else
println(s"Completed updating default config for $entityTypeHead in the cluster.")
if (logSucceedMsg) {
if (entityNameHead.nonEmpty)
println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
else
println(s"Completed updating default config for $entityTypeHead in the cluster.")
}
}

private def alterUserScramCredentialConfigs(adminClient: Admin, user: String, scramConfigsToAddMap: Map[String, ConfigEntry], scramConfigsToDelete: Seq[String]) = {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ trait KafkaBroker extends Logging {
def credentialProvider: CredentialProvider
def clientToControllerChannelManager: NodeToControllerChannelManager
def tokenCache: DelegationTokenCache
def brokerFeatures: BrokerFeatures

private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
// For backwards compatibility, we need to keep older metrics tied
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/kafka/test/ClusterInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.KafkaBroker;
import kafka.test.annotation.ClusterTest;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
Expand All @@ -28,6 +30,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;

public interface ClusterInstance {

Expand Down Expand Up @@ -93,6 +96,8 @@ default Optional<ListenerName> controlPlaneListenerName() {
*/
String bootstrapControllers();

Stream<KafkaBroker> brokers();

/**
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
* acting as the controller (since ZK controllers serve both broker and controller roles).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.testkit.KafkaClusterTestKit;
Expand Down Expand Up @@ -154,7 +155,7 @@ public String bootstrapControllers() {
@Override
public Collection<SocketServer> brokerSocketServers() {
return brokers()
.map(BrokerServer::socketServer)
.map(KafkaBroker::socketServer)
.collect(Collectors.toList());
}

Expand All @@ -178,7 +179,7 @@ public Collection<SocketServer> controllerSocketServers() {
@Override
public SocketServer anyBrokerSocketServer() {
return brokers()
.map(BrokerServer::socketServer)
.map(KafkaBroker::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
}
Expand All @@ -195,14 +196,14 @@ public SocketServer anyControllerSocketServer() {
public Map<Integer, BrokerFeatures> brokerFeatures() {
return brokers().collect(Collectors.toMap(
brokerServer -> brokerServer.config().nodeId(),
BrokerServer::brokerFeatures
KafkaBroker::brokerFeatures
));
}

@Override
public String clusterId() {
return controllers().findFirst().map(ControllerServer::clusterId).orElse(
brokers().findFirst().map(BrokerServer::clusterId).orElseThrow(
brokers().findFirst().map(KafkaBroker::clusterId).orElseThrow(
() -> new RuntimeException("No controllers or brokers!"))
);
}
Expand Down Expand Up @@ -299,8 +300,8 @@ private BrokerServer findBrokerOrThrow(int brokerId) {
.orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
}

public Stream<BrokerServer> brokers() {
return clusterReference.get().brokers().values().stream();
public Stream<KafkaBroker> brokers() {
return clusterReference.get().brokers().values().stream().map(b -> b);
}

public Stream<ControllerServer> controllers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
Expand Down Expand Up @@ -347,5 +348,9 @@ private KafkaServer findBrokerOrThrow(int brokerId) {
public Stream<KafkaServer> servers() {
return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
}

public Stream<KafkaBroker> brokers() {
return JavaConverters.asJavaCollection(clusterReference.get().brokers()).stream();
}
}
}
5 changes: 3 additions & 2 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
collect(Collectors.joining(","));
props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);

// reduce log cleaner offset map memory usage
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
// reduce log cleaner offset map memory usage, must be at greater than 1MB per cleaner thread, set to 2M+2 so that
// we can set 2 cleaner threads.
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097154");

// Add associated broker node property overrides
if (brokerNode != null) {
Expand Down