Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 6884 consumer group command should use new admin client #5032

Conversation

asasvari
Copy link
Contributor

@asasvari asasvari commented May 17, 2018

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

  • ConsumerGroupCommand is updated to use the new AdminClient.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

  • Executed existing unit tests: DescribeConsumerGroupTest, DeleteConsumerGroupTest, DeleteConsumerGroupsTest, ResetConsumerGroupOffsetTest), ListConsumerGroupTest
  • Created a kafka-console-consumer & manually verified bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 1

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@asasvari
Copy link
Contributor Author

retest this please

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from a601f0f to 106f122 Compare May 22, 2018 13:45
@asasvari
Copy link
Contributor Author

retest this please

@hachikuji hachikuji self-assigned this May 24, 2018
@asasvari
Copy link
Contributor Author

@viktorsomogyi can you take a look? In the meantime, I am trying to fix the failing tests.

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from 106f122 to e308e9d Compare May 24, 2018 16:07
Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I did a quick look and left some comments.

val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))

val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
case Some(offset) => offset
case offset => offset.offset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the next check (case _) unreachable. Perhaps use case offset if offset != null or refactor the block to better fit the Java map get semantics.

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.errors.BrokerNotAvailableException
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, CoordinatorNotAvailableException, TimeoutException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import TimeoutException?

import org.apache.kafka.common.errors.BrokerNotAvailableException
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, CoordinatorNotAvailableException, TimeoutException}
import org.apache.kafka.common.{KafkaException, KafkaFuture, Node, TopicPartition}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import KafkaFuture?

@@ -267,6 +270,7 @@ object ConsumerGroupCommand extends Logging {
val offsetsOptPresent = opts.options.has(opts.offsetsOpt)
val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra blank line

adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
val result = adminClient.listConsumerGroups()
val listings = result.valid().get()
listings.map(_.groupId()).toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Any reason for not putting all this in one line?
adminClient.listConsumerGroups().valid().get().map(_.groupId()).toList

@@ -640,10 +659,10 @@ object ConsumerGroupCommand extends Logging {
if (consumer != null) consumer.close()
}

private def createAdminClient(): AdminClient = {
private def createAdminClient(): org.apache.kafka.clients.admin.AdminClient = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could use an import like below
import org.apache.kafka.clients.admin.{AdminClient => NewAdminClient}
and use only NewAdminClient here (i.e. private def createAdminClient(): NewAdminClient).

Also few lines below (NewAdminClient.create(props))

case currentState =>
printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
val t = opts.options.valueOf(opts.timeoutMsOpt).intValue()
val opt = new DescribeConsumerGroupsOptions().timeoutMs(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe a one-liner can for these two lines?
val opt = new DescribeConsumerGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion it is more readable and easier to debug this way.
Extracted to a new function (it is used multiple times).

@@ -789,13 +820,13 @@ object ConsumerGroupCommand extends Logging {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToCurrentOpt)) {
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
val currentCommittedOffsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a few blocks above the same thing has been done in two lines, i.e.

val consumerGroupOffsets = adminClient.listConsumerGroupOffsets(groupId)
val currentCommittedOffsets = consumerGroupOffsets.partitionsToOffsetAndMetadata().get()

They should ideally match.

@ijuma
Copy link
Contributor

ijuma commented May 26, 2018

Shall we take the chance and remove the methods from the internal Scala AdminClient?

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from e308e9d to b447bee Compare May 28, 2018 08:01
@asasvari
Copy link
Contributor Author

@ijuma I checked that in order to safely delete the methods, we need to adjust other tests, such as LegacyAdminClientTest, too. I would rather remove those methods from the internal Scala AdminClient in a follow-up JIRA. Doing so would keep this PR relatively small and easy to review.

import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient => NewAdminClient}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asasvari Other commands import o.a.k.clients.admin.AdminClient as JAdminClient. I believe being consistent in naming helps others understand and maintain this piece of code.

case e : ExecutionException if e.getCause.isInstanceOf[KafkaException] =>
printError(s"listGroup failed. Error message: " + e.getMessage)
if (e.getCause != null) {
throw new KafkaException(e.getCause.getCause.getMessage, e.getCause.getCause) // authentication exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we keeping backward compatibility here by throwing the cause of cause? Is it always an AuthenticationException?

import scala.collection.JavaConverters._
import scala.collection.{Seq, Set, mutable}
import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}

object ConsumerGroupCommand extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add tests that list, describe, etc. works with an empty string as group id? Currently it is possible to use an empty string as a group id in case of KafkaConsumer.assign() (but not with KafkaConsumer.subscribe().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • According to GroupCoordinator empty group id is invalid.
  • Anyway, I tried to create a addConsumerGroupExecutor(numConsumers = 1, group = "") in DescribeConsumerGroupTest join fails with INVALID_GROUP_ID error:
[2018-05-29 15:35:48,754] ERROR [Consumer clientId=consumer-1, groupId=] Attempt to join group failed due to fatal error: The configured groupId is invalid (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:538)

Can you give me some hints @viktorsomogyi ?

val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
AdminClient.create(props)
org.apache.kafka.clients.admin.AdminClient.create(props)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewAdminClient.create(props)?

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from b447bee to cc552e0 Compare May 28, 2018 14:18
@asasvari
Copy link
Contributor Author

@hachikuji I rebased the patch to the latest commit b7513214860008070568eae2f28b7eb581166d72. I can see there is a commit "MINOR: Remove MaxPermSize from gradle.properties" 7f19df29ac60ec9aef1dbd14704a2dd045b1cfbf that affects my patch, and timeout related tests such as testDescribeGroupOffsetsWithShortInitializationTimeout in DescribeConsumerGroupTest now fail with The consumer group command should fail due to low initialization timeout. It looks like retry backoff changes "disable" the --timeout argument. I tested that if I revert the commit 7f19df2, tests pass. Can you give me some hints why this could happen?

@ijuma
Copy link
Contributor

ijuma commented May 29, 2018

@asasvari, it's fine to remove the methods in a separate PR.

@ijuma
Copy link
Contributor

ijuma commented May 29, 2018

Also, to clarify, @asasvari is stating that the retry backoff changes (7f19df2) are causing some of the tests to fail, not the MaxPermSize changes.

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from cc552e0 to 025e753 Compare May 31, 2018 12:55
@asasvari
Copy link
Contributor Author

asasvari commented Jun 1, 2018

retest this please

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from 025e753 to e83f906 Compare June 1, 2018 15:31
@asasvari
Copy link
Contributor Author

asasvari commented Jun 4, 2018

retest this please

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from e83f906 to d5ebc65 Compare June 4, 2018 14:42
@asasvari
Copy link
Contributor Author

asasvari commented Jun 4, 2018

@ijuma, @hachikuji I addressed most of the review comments above. Can you or a committer take a look at the changes?

The JDK 8 tests have timed out; it does not look related to my changes (ConsumerGroupCommand tests passed).

16:17:32 kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition STARTED
17:43:26 Build timed out (after 180 minutes). Marking the build as aborted.
17:43:27 Build was aborted
17:43:27 [FINDBUGS] Skipping publisher since build result is ABORTED

@asasvari
Copy link
Contributor Author

asasvari commented Jun 6, 2018

retest this please

Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments inline.

import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AbstractOptions, DeleteConsumerGroupsOptions, DescribeConsumerGroupsOptions, ListConsumerGroupOffsetsOptions, ListConsumerGroupsOptions, AdminClient => JAdminClient}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting @juma from another PR "The Scala AdminClient is going away and there is no need to have this rename in classes that don't need it."
You could do import org.apache.kafka.clients.admin and use admin.AdminClient to avoid the name clash.

@@ -42,8 +42,11 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils

import scala.collection.JavaConversions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaConversions is deprecated starting as of Scala 2.12. You should use JavaConverters instead.
In a recent PR all usages of JavaConversions were removed.

)
val groupId = opts.options.valueOf(opts.groupOpt)
val consumerGroups = adminClient.describeConsumerGroups(
util.Arrays.asList(groupId),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This can be done as List(groupId).asJava using JavaConverters (no need to import java.util). There are a couple of other instances of this.

}

(Some(consumerGroupSummary.state), assignments)
val assignments = rowsWithConsumer ++ rowsWithoutConsumer
(Some(state.toString), Some(assignments))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a one liner works too: (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))

val consumerGroups = adminClient.describeConsumerGroups(
util.Arrays.asList(groupId),
withTimeoutMs(new DescribeConsumerGroupsOptions))
.describedGroups()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal opinion: this is more readable written like this

val consumerGroups = adminClient.describeConsumerGroups(
  util.Arrays.asList(groupId),
  withTimeoutMs(new DescribeConsumerGroupsOptions)
).describedGroups()

Noticed this in a few places.

val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))

val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
case Some(offset) => offset
case offset if offset != null=> offset.offset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before =>

@@ -19,10 +19,11 @@ package kafka.admin
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.RoundRobinAssignor
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, TimeoutException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused CoordinatorNotAvailableException?

@@ -12,7 +12,7 @@
*/
package kafka.api

import java.io.FileOutputStream
import java.io.{BufferedWriter, FileOutputStream, FileWriter, PrintWriter}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused added imports

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from d5ebc65 to cd8d0fe Compare June 8, 2018 07:15
@asasvari
Copy link
Contributor Author

asasvari commented Jun 8, 2018

@vahidhashemian thanks for the review!

@asasvari
Copy link
Contributor Author

asasvari commented Jun 8, 2018

retest this please

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from c9dfdb9 to 253051a Compare June 12, 2018 10:00
Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! Left a few small comments.


try {
val listings = result.all().get().asScala
listings.map(_.groupId()).toList

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need for parenthesis for getters like groupId

listings.map(_.groupId()).toList
}
catch {
case e : ExecutionException if e.getCause.isInstanceOf[KafkaException] =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unclear why we have the custom exception handling here. Previously we just raised all errors? Should we raise the cause in all cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way the new KafkaAdminClient works is a little bit different than previously. I have seen that certain kind of exceptions (e.g. SaslAuthenticationException) are now nested, and I wanted to keep similar behaviour.

"--describe",
"--group", "test.group",
"--command-config", propsFile.getAbsolutePath)
"--describe",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: was this intentional?

Map.empty
}
} catch {
case e : ExecutionException if e.getCause.isInstanceOf[CoordinatorNotAvailableException] =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again it's a little unclear why COORDINATOR_NOT_AVAILABLE is handled specially. Note that with #5278, the admin client should handle this error internally.

@@ -456,6 +519,13 @@ object ConsumerGroupCommand extends Logging {
}
}

private def getPartitionsToOffsetAndMetadata(groupId: String) = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a simpler name is getCommittedOffsets?

val rowsWithConsumer = if (offsetsAndMetadataByTopic.isEmpty) {
List[PartitionAssignmentState]()
} else {
consumerGroup.members().asScala.filter(!_.assignment().topicPartitions().isEmpty).toList.sortWith(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded parenthesis for getters

val result = deletedGroups.mapValues { f =>
Try(f.get()) match {
case _: Success[_] => Errors.NONE
case Failure(ee: ExecutionException) if ee.getMessage.contains("TimeoutException") => Errors.COORDINATOR_NOT_AVAILABLE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weird. Can you explain the reasoning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to follow the logic of the previous version of this method:

val successfullyDeleted = result.filter {
case (_, error) => error == Errors.NONE
}.keySet
if (successfullyDeleted.size == result.size)
println(s"Deletion of requested consumer groups (${successfullyDeleted.mkString("'", "', '", "'")}) was successful.")
else {
printError("Deletion of some consumer groups failed:")
result.foreach {
case (group, error) if error != Errors.NONE => println(s"* Group '$group' could not be deleted due to: ${error.toString}")

KafkaAdminClient now returns futures as part of DeleteConsumerGroupsResult when you try to delete with deleteConsumerGroups(). Please let me know if you have an idea how to simplify it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the part that is unclear to me is why we are checking for TimeoutException in the exception message. It seems like a brittle solution. Can we make it simple and just treat all exceptions as failures?

if (verbose) consumer.assignment else List())
})
)
private[admin] def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems misaligned? Same for collectGroupState.



import kafka.utils._
import kafka.utils.Implicits._
import org.I0Itec.zkclient.exception.ZkNoNodeException

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need this import? There are a few more unneeded imports to clean up as well.

Copy link
Contributor

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the updates. I left a few minor comments.

import javax.xml.datatype.DatatypeFactory
import joptsimple.{OptionParser, OptionSpec}
import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused imports? Also an extra blank line below



import kafka.utils._
import kafka.utils.Implicits._
import org.I0Itec.zkclient.exception.ZkNoNodeException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import?

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, CoordinatorNotAvailableException, SaslAuthenticationException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BrokerNotAvailableException import does not seem to be used.

import scala.collection.JavaConverters._
import scala.collection.{Seq, Set, mutable}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused/duplicate imports? I think this line can be safely removed.

printError(s"listGroup failed. Error message: " + e.getMessage)
if (e.getCause.getCause.isInstanceOf[SaslAuthenticationException]) {
throw e.getCause.getCause
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing duplicate / inconsistent error reporting, for example when the broker cannot be found:

Error: listGroup failed. Error message: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
Error: Executing consumer group command failed due to Failed to find brokers to send ListGroups

or

Error: listGroup failed. Error message: org.apache.kafka.common.errors.ClusterAuthorizationException: Error listing groups on devstack1:9092 (id: 0 rack: null)
Error: Executing consumer group command failed due to Error listing groups on devstack1:9092 (id: 0 rack: null)

I think this can be improved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I removed extra logging call.
  • Regarding the consistency of messages: returned error depends on a broker's reply to the listGroup operation sent by the new KafkaAdminClient (Failed to find brokers / Error listing groups on). So, it does not look 100% deterministic / depends on multiple factors what kind error message is returned by a broker. Can you suggest how to handle the specific examples / different exceptions?
  • I agree that "Executing consumer group command failed due to Failed to find " sounds a bit weird. I will replace it with something like: Executing consumer group command failed with error: "<ERROR_MESSAGE>"

Map(topicPartition -> Some(offset.offset())),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect indentation on these three lines?

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from b955820 to 77fe7a6 Compare June 26, 2018 09:44
Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left a few more comments.

listings.map(_.groupId).toList
}
catch {
case e : ExecutionException if e.getCause.isInstanceOf[KafkaException] =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how precisely we need to maintain the error reporting. I'm wondering if we can just raise e.getCause for all instances of ExecutionException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this "special" case, it is not enough to raise e.getCause.

When there is a SASL authentication failure, listings.map(_.groupId).toList throws:

concurrent.ExecutionException
--> org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    --> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-256

KafkaAdminClient throws an ExecutionException when we try to get all results of the returned futures of listGroups (called from testConsumerGroupServiceWithAuthenticationFailure). The cause of the ExecutionException is a org.apache.kafka.common.KafkaException "Failed to find brokers to send ListGroups". Its cause is the SaslAuthenticationException if there is an authentication failure.

@@ -126,7 +128,21 @@ object ConsumerGroupCommand extends Logging {
private var consumer: KafkaConsumer[String, String] = _

def listGroups(): List[String] = {
adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
val result = adminClient.listConsumerGroups(
withTimeoutMs(new ListConsumerGroupsOptions()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded parenthesis for empty constructor

}
} catch {
case _: Throwable =>
printError(s"Coordinator not available for the group '$groupId'.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely one possible case, but are we sure exceptions can't be thrown in other cases? Also, we probably should pass the exception to printError rather than swallowing it.

collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicPartition),
Map(topicPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably useful to have a docstring describing the return value.


val state = consumerGroup.state()
val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
var assignedTopicPartitions = Array[TopicPartition]()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to use a ListBuffer or something like that.

withTimeoutMs(new DescribeConsumerGroupsOptions())
).describedGroups().get(groupId).get()

val state = consumerGroup.state()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded parenthesis. there are a few of these

val rowsWithConsumer = if (committedOffsets.isEmpty) {
List[PartitionAssignmentState]()
} else {
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toList.sortWith(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the alignment is a little weird. How about this?

        consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
          .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size)
          .flatMap { consumerSummary =>
            val topicPartitions = consumerSummary.assignment.topicPartitions.asScala 
            ...

printError(s"Assignments can only be reset if the group '$groupId' is inactive, but the current state is $currentState.")
Map.empty
}
private def withTimeoutMs [T <: AbstractOptions[T] ] (options : T) = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded space after AbstractOptions[T]

val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset"))
val currentOffset = currentCommittedOffsets.asScala.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's a little clearer to do use map before the call to getOrElse.

val result = deletedGroups.mapValues { f =>
Try(f.get()) match {
case _: Success[_] => Errors.NONE
case Failure(ee: ExecutionException) if ee.getMessage.contains("TimeoutException") => Errors.COORDINATOR_NOT_AVAILABLE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the part that is unclear to me is why we are checking for TimeoutException in the exception message. It seems like a brittle solution. Can we make it simple and just treat all exceptions as failures?

@asasvari asasvari force-pushed the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch from 7949612 to 6f42eda Compare July 4, 2018 10:25
@asasvari
Copy link
Contributor Author

asasvari commented Jul 9, 2018

retest this please

@asasvari
Copy link
Contributor Author

retest this please

@asasvari
Copy link
Contributor Author

Jenkins jobs failed without an obvious reason. See:

I could not reproduce the issue surfaced in previous runs. There the problem was an CoordinatorLoadInProgressException was thrown by the admin client when listgroup was called. I added retry mechanism to waitUntilTrue in TestUtils, but this test failed again. In the latest PR added more information via logging to see where the exception is originated from (was the RetriableException actually retried?).

@hachikuji Please let me know if you have additional hints how to handle / track down sporadical test failures (e.g. when a broker returns CoordinatorLoadInProgressException).

@asasvari
Copy link
Contributor Author

retest this please

1 similar comment
@asasvari
Copy link
Contributor Author

retest this please

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Left a couple comments and a question about the error handling.

listings.map(_.groupId).toList
}
catch {
case e : ExecutionException if e.getCause.isInstanceOf[KafkaException] =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded space before semicolon

catch {
case e : ExecutionException if e.getCause.isInstanceOf[KafkaException] =>
val kafkaException = e.getCause
if (kafkaException.getCause != null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a little arbitrary to go one additional level. I'd suggest we just raise the cause of the ExecutionException.

By the way, this seems to be the only case where we worry about the ExecutionException. For other admin APIs, we just let it propagate. It would be nice to be consistent if possible. Going forward, I think we expect retriable errors to be handled internally in the AdminClient, which means only fatal errors and timeouts will reach this level. It may be useful to catch the timeouts and print a helpful message, but for anything else, we probably just want to raise the exception and fail the command. At least that seems like a conservative option until we've had a chance to refine the expected AdminClient error handling. So maybe we should remove this try/catch. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification, I removed mentioned try-catch blocks. In general, I find it easier to read helpful error messages than stack traces, but you are right; KafkaAdminClient class has @InterfaceStability.Evolving annotation. Error handling can be refined in a separate PR.

throw kafkaException

case e: Exception =>
throw e

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're just re-throwing the exception, then we can leave this case out.

} catch {
case ex: Throwable =>
val exceptionMessage = ex.getMessage
printError(s"Coordinator not available for the group '$groupId'. Error message: '$exceptionMessage'")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little dangerous if we're catching Throwable. Unexpected errors may not have a useful message and we would probably need the trace anyway. Similar to the comment above, maybe we can leave out this try/catch and let the exception propagate?

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looking pretty good. Just a couple more comments.

* @param msg error message
* @param waitTime maximum time to wait and retest the condition before failing the test
* @param pause delay between condition checks
* @param maxRetry maximum number of retries to check the given condition if a retriable exception is thrown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maxRetries?

}
catch {
case _: RetriableException if retry < maxRetry => {
info("Retrying")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more useful to log the exception at DEBUG level? Also, maybe we should pause between retries?

val result = deletedGroups.mapValues { f =>
Try(f.get) match {
case _: Success[_] => Errors.NONE
case Failure(ee: ExecutionException) => Errors.forException(ee.getCause)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's reasonable to assume we will have an ApiException. My concern is that we will lose information i the exception gets mapped to UNKNOWN_ERROR. I'm wondering if we could skip the translation to error code and use the exception directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteGroups() returns a Map[String, Errors] -

def deleteGroups(): Map[String, Errors] = {

I can change it to return Map[String, Throwable] and update all related tests (such as DeleteConsumerGroupsTest that expects certain errors to happen e.g. Errors.GROUP_ID_NOT_FOUND). If I do this, output for a delete group failure would change a bit. For example:

Error: Deletion of some consumer groups failed:
* Group 'test.group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty

I will update the PR soon.

Copy link

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

@asasvari
Copy link
Contributor Author

retest this please

@asasvari
Copy link
Contributor Author

@hachikuji thanks for the review!

@hachikuji hachikuji merged commit 8ec8ec5 into apache:trunk Jul 17, 2018
@hachikuji
Copy link

@asasvari Thanks, merged to trunk. Note I made a trivial tweak to waitUntil.

By the way, I created https://issues.apache.org/jira/browse/KAFKA-7173, which seems to be the last use of the old AdminClient. Not sure you have any interest, but thought I'd mention it.

@asasvari asasvari deleted the KAFKA-6884_ConsumerGroupCommand_should_use_new_AdminClient branch October 19, 2022 21:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants