Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster #10304

Merged
merged 14 commits into from Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 21 additions & 11 deletions core/src/main/scala/kafka/admin/LogDirsCommand.scala
Expand Up @@ -39,19 +39,29 @@ object LogDirsCommand {
def describe(args: Array[String], out: PrintStream): Unit = {
val opts = new LogDirsCommandOptions(args)
val adminClient = createAdminClient(opts)
val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
}
val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resource declaration should be followed by try block.

val adminClient = createAdminClient(opts)
try {

}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good.It has been modified in the latest submission.

try {
val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
case Some(brokerListStr) =>
val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
(inputBrokers, inputBrokers.diff(clusterBrokers))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the variable is called existingBrokers , we should find out the "true" existent brokers. In short, it should return inputBrokers.intersect(clusterBrokers) rather than inputBrokers

case None => (clusterBrokers, Set.empty)
}

out.println("Querying brokers for log directories information")
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
if (nonExistingBrokers.nonEmpty) {
out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we say --broker-list instead of broker-list? Also, should we say broker(s) instead of node(s) to be consistent with the message below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.I will act right away.

} else {
out.println("Querying brokers for log directories information")
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DescribeLogDirsResult can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot this, I will change it right away

val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }

out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
adminClient.close()
out.println(s"Received log directory information from brokers ${existingBrokers.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
}
} finally {
adminClient.close()
}
}

private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirDescription]], topicSet: Set[String]): String = {
Expand Down
52 changes: 52 additions & 0 deletions core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
@@ -0,0 +1,52 @@
package unit.kafka.admin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must add the licence header here. You can copy it from another file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, after checking the compilation report, I have realized this problem and I have made changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the package name should be kafka.admin rather than package unit.kafka.admin


import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets

import kafka.admin.LogDirsCommand
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test

import scala.collection.Seq

class LogDirsCommandTest extends KafkaServerTestHarness {

def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(1, zkConnect)
.map(KafkaConfig.fromProps)
}

@Test
def checkLogDirsCommandOutput(): Unit = {
val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
//input exist brokerList
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
val existBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val existBrokersLineIter = existBrokersContent.split("\n").iterator

assertTrue(existBrokersLineIter.hasNext)
assertTrue(existBrokersLineIter.next().contains(s"Querying brokers for log directories information"))

//input nonExist brokerList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using nonexistent instead of nonExist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been modified.

byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,1,2", "--describe"), printStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you input duplicate ids and the check the output does not include duplicates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 I have added the duplicate ids test and it passed.

val nonExistBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val nonExistBrokersLineIter = nonExistBrokersContent.split("\n").iterator

assertTrue(nonExistBrokersLineIter.hasNext)
assertTrue(nonExistBrokersLineIter.next().contains(s"ERROR: The given node(s) does not exist from broker-list: 1,2. Current cluster exist node(s): 0"))

//use all brokerList for current cluster
byteArrayOutputStream.reset()
LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--describe"), printStream)
val allBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
val allBrokersLineIter = allBrokersContent.split("\n").iterator

assertTrue(allBrokersLineIter.hasNext)
assertTrue(allBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
}
}