Skip to content

Commit

Permalink
KAFKA-8670; Fix exception for kafka-topics.sh --describe without --to…
Browse files Browse the repository at this point in the history
…pic mentioned (#7094)

If there are **no topics** in a cluster, kafka-topics.sh --describe without a --topic option should return empty list, not throw an exception.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
Tirtha Chatterjee authored and hachikuji committed Jul 18, 2019
1 parent f899489 commit ab8a7ff
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
26 changes: 14 additions & 12 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Expand Up @@ -208,7 +208,7 @@ object TopicCommand extends Logging {
override def alterTopic(opts: TopicCommandOptions): Unit = {
val topic = new CommandTopicPartition(opts)
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics)
ensureTopicExists(topics, opts.topic)
val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).values()
adminClient.createPartitions(topics.map {topicName =>
if (topic.hasReplicaAssignment) {
Expand Down Expand Up @@ -267,7 +267,7 @@ object TopicCommand extends Logging {

override def deleteTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics)
ensureTopicExists(topics, opts.topic)
adminClient.deleteTopics(topics.asJavaCollection).all().get()
}

Expand Down Expand Up @@ -317,7 +317,7 @@ object TopicCommand extends Logging {
override def alterTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
val tp = new CommandTopicPartition(opts)
ensureTopicExists(topics, opts.ifExists)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
val adminZkClient = new AdminZkClient(zkClient)
topics.foreach { topic =>
val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
Expand Down Expand Up @@ -354,8 +354,7 @@ object TopicCommand extends Logging {

override def describeTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
val topicOptWithExits = opts.topic.isDefined && opts.ifExists
ensureTopicExists(topics, topicOptWithExits)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
val describeOptions = new DescribeOptions(opts, liveBrokers)
val adminZkClient = new AdminZkClient(zkClient)
Expand Down Expand Up @@ -401,7 +400,7 @@ object TopicCommand extends Logging {

override def deleteTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics, opts.ifExists)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
topics.foreach { topic =>
try {
if (Topic.isInternal(topic)) {
Expand Down Expand Up @@ -433,14 +432,17 @@ object TopicCommand extends Logging {
/**
* ensures topic existence and throws exception if topic doesn't exist
*
* @param opts
* @param topics
* @param topicOptWithExists
* @param foundTopics Topics that were found to match the requested topic name.
* @param requestedTopic Name of the topic that was requested.
* @param requireTopicExists Indicates if the topic needs to exist for the operation to be successful.
* If set to true, the command will throw an exception if the topic with the
* requested name does not exist.
*/
private def ensureTopicExists(topics: Seq[String], topicOptWithExists: Boolean = false) = {
if (topics.isEmpty && !topicOptWithExists) {
private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String], requireTopicExists: Boolean = true) = {
// If no topic name was mentioned, do not need to throw exception.
if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty) {
// If given topic doesn't exist then throw exception
throw new IllegalArgumentException(s"Topics in [${topics.mkString(",")}] does not exist")
throw new IllegalArgumentException(s"Topic '${requestedTopic.get}' does not exist as expected")
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
Expand Up @@ -416,6 +416,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
topicService.describeTopic(describeOpts)
}

// describe all topics
val describeOptsAllTopics = new TopicCommandOptions(Array())
// should not throw any error
topicService.describeTopic(describeOptsAllTopics)

// describe topic that does not exist with --if-exists
val describeOptsWithExists = new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists"))
// should not throw any error
Expand Down

0 comments on commit ab8a7ff

Please sign in to comment.