From 0d2914d159296d18949fe28dc07441467fa36171 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Thu, 22 Oct 2015 13:27:13 +0100 Subject: [PATCH 1/2] KAFKA-2338: add 'force' option to avoid console prompts --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 12 +++++++----- core/src/main/scala/kafka/admin/TopicCommand.scala | 13 +++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 614e3fe3b46c..5fa8b9bf586c 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -69,7 +69,7 @@ object ConfigCommand { val configsToBeDeleted = parseConfigsToBeDeleted(opts) val entityType = opts.options.valueOf(opts.entityType) val entityName = opts.options.valueOf(opts.entityName) - warnOnMaxMessagesChange(configsToBeAdded) + warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt)) // compile the final set of configs val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName) @@ -85,14 +85,15 @@ object ConfigCommand { } } - def warnOnMaxMessagesChange(configs: Properties): Unit = { + def warnOnMaxMessagesChange(configs: Properties, force :Boolean): Unit = { val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match { case n: String => n.toInt case _ => -1 } if (maxMessageBytes > Defaults.MaxMessageSize){ error(TopicCommand.longMessageSizeWarning(maxMessageBytes)) - TopicCommand.askToProceed + if (!force) + TopicCommand.askToProceed } } @@ -107,14 +108,14 @@ object ConfigCommand { for (entityName <- entityNames) { val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName) println("Configs for %s:%s are %s" - .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) } } private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = { val configsToBeAdded = opts.options.valuesOf(opts.addConfig).map(_.split("""\s*=\s*""")) require(configsToBeAdded.forall(config => config.length == 2), - "Invalid entity config: all configs to be added must be in the format \"key=val\".") + "Invalid entity config: all configs to be added must be in the format \"key=val\".") val props = new Properties configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) if (props.containsKey(LogConfig.MessageFormatVersionProp)) { @@ -164,6 +165,7 @@ object ConfigCommand { .ofType(classOf[String]) .withValuesSeparatedBy(',') val helpOpt = parser.accepts("help", "Print usage information.") + val forceOpt = parser.accepts("force", "Suppress console prompts etc") val options = parser.parse(args : _*) val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 9f1014f286b7..1fa65724d639 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -97,13 +97,13 @@ object TopicCommand extends Logging { try { if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) - warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length) + warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length, opts.options.has(opts.forceOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false) } else { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue - warnOnMaxMessagesChange(configs, replicas) + warnOnMaxMessagesChange(configs, replicas, opts.options.has(opts.forceOpt)) val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled else RackAwareMode.Enforced AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode) @@ -326,6 +326,9 @@ object TopicCommand extends Logging { "if set when creating topics, the action will only execute if the topic does not already exist") val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment") + + val forceOpt = parser.accepts("force", "Suppress console prompts etc") + val options = parser.parse(args : _*) val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt) @@ -354,7 +357,7 @@ object TopicCommand extends Logging { CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt)) } } - def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = { + def warnOnMaxMessagesChange(configs: Properties, replicas: Integer, force: Boolean): Unit = { val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match { case n: String => n.toInt case _ => -1 @@ -362,7 +365,8 @@ object TopicCommand extends Logging { if (maxMessageBytes > Defaults.MaxMessageSize) if (replicas > 1) { error(longMessageSizeWarning(maxMessageBytes)) - askToProceed + if (!force) + askToProceed } else warn(shortMessageSizeWarning(maxMessageBytes)) @@ -405,3 +409,4 @@ object TopicCommand extends Logging { s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n" } } + From b4c677e146f5704daced421b8112320cf7f9672f Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Thu, 22 Oct 2015 16:41:03 +0100 Subject: [PATCH 2/2] KAFKA-2338: tidy text and formatting --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 4 ++-- core/src/main/scala/kafka/admin/TopicCommand.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 5fa8b9bf586c..eaddd84e538d 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -85,7 +85,7 @@ object ConfigCommand { } } - def warnOnMaxMessagesChange(configs: Properties, force :Boolean): Unit = { + def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = { val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match { case n: String => n.toInt case _ => -1 @@ -165,7 +165,7 @@ object ConfigCommand { .ofType(classOf[String]) .withValuesSeparatedBy(',') val helpOpt = parser.accepts("help", "Print usage information.") - val forceOpt = parser.accepts("force", "Suppress console prompts etc") + val forceOpt = parser.accepts("force", "Suppress console prompts") val options = parser.parse(args : _*) val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 1fa65724d639..029adea1b58c 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -327,7 +327,7 @@ object TopicCommand extends Logging { val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment") - val forceOpt = parser.accepts("force", "Suppress console prompts etc") + val forceOpt = parser.accepts("force", "Suppress console prompts") val options = parser.parse(args : _*)