Skip to content

Commit

Permalink
KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds…
Browse files Browse the repository at this point in the history
… do not exist in current kafka cluster (#10304)

When non-existent brokerIds value are given, the kafka-log-dirs tool will have a timeout error:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeLogDirs
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50)
at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36)
at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeLogDirs

When the brokerId entered by the user does not exist, an error message indicating that the node is not present should be printed.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
wenbingshen committed Mar 18, 2021
1 parent 8ef1619 commit bca29e2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 12 deletions.
35 changes: 23 additions & 12 deletions core/src/main/scala/kafka/admin/LogDirsCommand.scala
Expand Up @@ -21,7 +21,7 @@ import java.io.PrintStream
import java.util.Properties

import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, LogDirDescription}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription}
import org.apache.kafka.common.utils.Utils

import scala.jdk.CollectionConverters._
Expand All @@ -39,19 +39,30 @@ object LogDirsCommand {
def describe(args: Array[String], out: PrintStream): Unit = {
val opts = new LogDirsCommandOptions(args)
val adminClient = createAdminClient(opts)
val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
}
try {
val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
case Some(brokerListStr) =>
val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
(inputBrokers.intersect(clusterBrokers), inputBrokers.diff(clusterBrokers))
case None => (clusterBrokers, Set.empty)
}

out.println("Querying brokers for log directories information")
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
if (nonExistingBrokers.nonEmpty) {
out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}." +
s" Current existent brokers: ${clusterBrokers.mkString(",")}")
} else {
out.println("Querying brokers for log directories information")
val describeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }

out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
adminClient.close()
out.println(s"Received log directory information from brokers ${existingBrokers.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
}
} finally {
adminClient.close()
}
}

private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = {
Expand Down
76 changes: 76 additions & 0 deletions core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin

import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets

import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test

import scala.collection.Seq

class LogDirsCommandTest extends KafkaServerTestHarness {

def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(1, zkConnect)
.map(KafkaConfig.fromProps)
}

@Test
def checkLogDirsCommandOutput(): Unit = {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
//input exist brokerList
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
val existingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val existingBrokersLineIter = existingBrokersContent.split("\n").iterator

assertTrue(existingBrokersLineIter.hasNext)
assertTrue(existingBrokersLineIter.next().contains(s"Querying brokers for log directories information"))

//input nonexistent brokerList
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,1,2", "--describe"), printStream)
val nonExistingBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val nonExistingBrokersLineIter = nonExistingBrokersContent.split("\n").iterator

assertTrue(nonExistingBrokersLineIter.hasNext)
assertTrue(nonExistingBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))

//input duplicate ids
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,0,1,2,2", "--describe"), printStream)
val duplicateBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val duplicateBrokersLineIter = duplicateBrokersContent.split("\n").iterator

assertTrue(duplicateBrokersLineIter.hasNext)
assertTrue(duplicateBrokersLineIter.next().contains(s"ERROR: The given brokers do not exist from --broker-list: 1,2. Current existent brokers: 0"))

//use all brokerList for current cluster
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--describe"), printStream)
val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val allBrokersLineIter = allBrokersContent.split("\n").iterator

assertTrue(allBrokersLineIter.hasNext)
assertTrue(allBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
}
}

0 comments on commit bca29e2

Please sign in to comment.