From 0c162e1c5913bec909f637607e14a9f1740821f2 Mon Sep 17 00:00:00 2001 From: Manikumar reddy O Date: Thu, 28 Apr 2016 21:05:29 +0530 Subject: [PATCH] KAFKA-2651; Remove deprecated config alteration from TopicCommand --- .../main/scala/kafka/admin/TopicCommand.scala | 40 ++----------------- docs/configuration.html | 10 ++--- docs/ops.html | 13 +++--- docs/upgrade.html | 1 + 4 files changed, 18 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 9f1014f286b7e..ac78cb860ed95 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -21,7 +21,6 @@ import java.util.Properties import joptsimple._ import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException} import kafka.consumer.{ConsumerConfig, Whitelist} -import kafka.coordinator.GroupCoordinator import kafka.log.{Defaults, LogConfig} import kafka.server.ConfigType import kafka.utils.ZkUtils._ @@ -122,20 +121,6 @@ object TopicCommand extends Logging { opts.options.valueOf(opts.zkConnectOpt))) } topics.foreach { topic => - val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic) - if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { - println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.") - println(" Going forward, please use kafka-configs.sh for this functionality") - - val configsToBeAdded = parseTopicConfigsToBeAdded(opts) - val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) - // compile the final set of configs - configs.putAll(configsToBeAdded) - configsToBeDeleted.foreach(config => configs.remove(config)) - AdminUtils.changeTopicConfig(zkUtils, topic, configs) - println("Updated config for topic \"%s\".".format(topic)) - } - if(opts.options.has(opts.partitionsOpt)) { if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") @@ -244,18 +229,6 @@ object TopicCommand extends Logging { props } - def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = { - if (opts.options.has(opts.deleteConfigOpt)) { - val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim()) - val propsToBeDeleted = new Properties - configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) - LogConfig.validateNames(propsToBeDeleted) - configsToBeDeleted - } - else - Seq.empty - } - def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() @@ -281,7 +254,7 @@ object TopicCommand extends Logging { val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") val deleteOpt = parser.accepts("delete", "Delete a topic") - val alterOpt = parser.accepts("alter", "Alter the number of partitions, replica assignment, and/or configuration for the topic.") + val alterOpt = parser.accepts("alter", "Alter the number of partitions and replica assignment.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " + @@ -290,16 +263,12 @@ object TopicCommand extends Logging { .describedAs("topic") .ofType(classOf[String]) val nl = System.getProperty("line.separator") - val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." + + val configOpt = parser.accepts("config", "A topic configuration override for the topic being created." + "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl + "See the Kafka documentation for full details on the topic configs.") .withRequiredArg .describedAs("name=value") .ofType(classOf[String]) - val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " + "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected") .withRequiredArg @@ -337,11 +306,10 @@ object TopicCommand extends Logging { CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt, alterOpt)) if(options.has(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt)) CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, diff --git a/docs/configuration.html b/docs/configuration.html index f9bd1e456db39..b0a04ac2c95a8 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -34,20 +34,20 @@

3.1 Broker Configs

Topic-level configuration -Configurations pertinent to topics have both a global default as well an optional per-topic override. If no per-topic configuration is given the global default is used. The override can be set at topic creation time by giving one or more --config options. This example creates a topic named my-topic with a custom max message size and flush rate: +Configurations pertinent to topics have both a global default as well an optional per-topic override. If no per-topic configuration is given the global default is used. The override can be set at topic creation time by giving one or more --config options of kafka-topics.sh script. This example creates a topic named my-topic with a custom max message size and flush rate:
  > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
         --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
 
-Overrides can also be changed or set later using the alter topic command. This example updates the max message size for my-topic: +Overrides can also be changed or set later using kafka-configs.sh script. This example updates the max message size for my-topic:
- > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
-    --config max.message.bytes=128000
+ > bin/kafka-configs.sh  --zookeeper localhost:2181 --entity-type topics --alter --entity-name my-topic
+    --add-config max.message.bytes=128000
 
To remove an override you can do
- > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
+ > bin/kafka-configs.sh  --zookeeper localhost:2181 --entity-type topics --alter --entity-name my-topic
     --delete-config max.message.bytes
 
diff --git a/docs/ops.html b/docs/ops.html index 8b1cc234c6407..ee3dc4bc9eb3f 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -25,7 +25,7 @@

Adding and removing You have the option of either adding topics manually or having them be created automatically when data is first published to a non-existent topic. If topics are auto-created then you may want to tune the default topic configurations used for auto-created topics.

-Topics are added and modified using the topic tool: +Topics are added and modified using the kafka-topics.sh script (kafka.admin.TopicCommand):

  > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
        --partitions 20 --replication-factor 3 --config x=y
@@ -40,7 +40,7 @@ 

Adding and removing

Modifying topics

-You can change the configuration or partitioning of a topic using the same topic tool. +You can change the partitioning of a topic using the same topic tool.

To add partitions you can do

@@ -49,15 +49,18 @@ 

Modifying topi

Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.

+ +You can change the configuration of a topic using the kafka-configs.sh script (kafka.admin.ConfigCommand): To add configs:

- > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
+ > bin/kafka-configs.sh  --zookeeper zk_host:port/chroot --entity-type topics --alter --entity-name my_topic_name --add-config x=y
 
To remove a config:
- > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x
+ > bin/kafka-configs.sh  --zookeeper zk_host:port/chroot --entity-type topics --alter --entity-name my_topic_name --delete-config x
 
-And finally deleting a topic: + +And finally deleting a topic using kafka-topics.sh script:
  > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
 
diff --git a/docs/upgrade.html b/docs/upgrade.html index b9c4bec012c10..1623624f178f2 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -89,6 +89,7 @@
Notable changes in 0.1
  • The new consumer now exposes the configuration parameter exclude.internal.topics to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.
  • The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible.
  • The new consumer API has been marked stable.
  • +
  • Altering topic configuration functionality from the kafka-topics.sh script (kafka.admin.TopicCommand) has been removed. Please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality.
  • Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0