New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3158: ConsumerGroupCommand should tell whether group is actually dead #1429
Conversation
dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.
if (consumerSummaries.isEmpty) | ||
println(s"Consumer group `${group}` does not exist or is rebalancing.") | ||
if (consumerSummaries == None) | ||
println(s"Consumer Group `${group}` does not exist") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change "Group" to "group" in the print message to be consistent with previous format?
Also a period at the end would be nice.
LGTM, it seems to be following @hachikuji's suggestion on the JIRA. |
dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.
Thanks @vahidhashemian |
@@ -106,7 +106,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { | |||
|
|||
val consumerSummaries = client.describeConsumerGroup(groupId) | |||
assertEquals(1, consumerSummaries.size) | |||
assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet) | |||
assertEquals(Set(tp, tp2), consumerSummaries.get.head.assignment.toSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to add a couple test cases for dead and unstable groups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Did you mean just add a function like this - testDescribeConsumerGroupForDeadGroup() or to add a new test file for the ConsumerGroupCommand.scala file altogether? I think the testDescribeConsumerGroupForNonExistentGroup() function checks the dead group scenario and am unsure what you meant exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right, we do have the dead case covered. I was puzzled because I was expecting an update in this patch, but I guess isEmpty
works for Option
as well. So the only uncovered case is a group which is in the middle of a rebalance. This is harder to test without mocking. The only thing I can think of is to send a JoinGroup request directly through NetworkClient
and then immediately call describeGroup(). That seems like a lot of work for a small gain, so maybe it's fine without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit nicer to do:
assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet))
The error message in case None
is returned will be nicer if you do it like the above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma not sure what you meant by None return case since the None case is prevalent in ConsumerGroupCommand.scala in the describeGroup function but this file is testing the nonexistentgroup and existent group scenarios via the describeConsumerGroup function in AdminClient.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imandhan Did you see the code I proposed above? It's generally bad form to call Option.get
because it gives an error that's not very helpful.
If your test fails due to a regression that causes None
to be returned, it will throw a NoSuchElementException. The code I suggested will say something like: "Expected Some(Set(...)), actual None".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I get what you meant by the last part of the comment now :) Yup and so I made the change above in line 109
…y dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.
val partitionOffsets = topicPartitions.flatMap { topicPartition => | ||
Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => | ||
topicPartition -> offsetAndMetadata.offset | ||
val consumerSummaries = adminClient.describeConsumerGroup(group) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this variable anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
Thanks for the PR. A couple of comments (one from me and one from Jason) and the rest looks good. |
…AFKA-3158 Signed-off-by: Ishita Mandhan <imandha@us.ibm.com>
Thanks for the PR, LGTM. Will merge to trunk once the tests pass. |
KAFKA-3735: Dispose all RocksObejcts upon completeness Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Roger Hoover, Eno Thereska, Ismael Juma Closes apache#1411 from guozhangwang/K3735-dispose-rocksobject MINOR: Specify keyalg RSA for SSL key generation Author: Sriharsha Chintalapani <harsha@hortonworks.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes apache#1416 from harshach/ssl-doc-fix KAFKA-3747; Close `RecordBatch.records` when append to batch fails With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change). Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#1418 from ijuma/kafka-3747-close-record-batch-when-append-fails MINOR: Fix documentation table of contents and `BLOCK_ON_BUFFER_FULL_DOC` Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Gwen Shapira Closes apache#1423 from ijuma/minor-doc-fixes Minor: Fix ps command example in docs Process grep command has been updated. Previous "ps | grep server-1.properties" command is showing nothing. Author: Satendra Kumar <satendra@knoldus.com> Reviewers: Gwen Shapira Closes apache#1386 from satendrakumar06/patch-1 KAFKA-3683; Add file descriptor recommendation to ops guide Adding sizing recommendations for file descriptors to the ops guide. Author: Dustin Cote <dustin@confluent.io> Author: Dustin Cote <dustin@dustins-mbp.attlocal.net> Reviewers: Gwen Shapira Closes apache#1353 from cotedm/KAFKA-3683 and squashes the following commits: 8120318 [Dustin Cote] Adding file descriptor sizing recommendations 0908aa9 [Dustin Cote] Merge https://github.com/apache/kafka into trunk 32315e4 [Dustin Cote] Merge branch 'trunk' of https://github.com/cotedm/kafka into trunk 13309ed [Dustin Cote] Update links for new consumer API 4dcffc1 [Dustin Cote] Update links for new consumer API MINOR: Add virtual env to Kafka system test README.md Author: Liquan Pei <liquanpei@gmail.com> Reviewers: Gwen Shapira Closes apache#1346 from Ishiihara/add-venv MINOR: Removed 1/2 of the hardcoded sleeps in Streams Author: Eno Thereska <eno.thereska@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes apache#1422 from enothereska/minor-integration-timeout2 KAFKA-3732: Add an auto accept option to kafka-acls.sh Added a new argument to AclCommand: --yes. When set, automatically answer yes to prompts Author: Mickael Maison <mickael.maison@gmail.com> Reviewers: Gwen Shapira Closes apache#1406 from mimaison/KAFKA-3732 KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper. The GroupCoordinator appends group membership information into the __consumer_offsets topic by: 1. making a message with group membership information 2. making a MessageSet with the single message compressed with the source codec 3. doing a log.append on the MessageSet Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec. Author: Onur Karaman <okaraman@linkedin.com> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#1394 from onurkaraman/KAFKA-3718 Setting broker state as running after publishing to ZK junrao Currently, the broker state is set to running before it registers itself in ZooKeeper. This is too early in the broker lifecycle. If clients use the broker state as an indicator that the broker is ready to accept requests, they will get errors. This change is to delay setting the broker state to running until it's registered in ZK. Author: Roger Hoover <roger.hoover@gmail.com> Reviewers: Jun Rao <junrao@gmail.com> Closes apache#1426 from theduderog/broker-running-after-zk MINOR: Use `--force` instead of `--yes` in `AclCommand` To be consistent with `ConfigCommand` and `TopicCommand`. No release includes this option yet, so we can simply change it. Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Mickael Maison, Grant Henke Closes apache#1430 from ijuma/use-force-instead-of-yes-in-acl-command and squashes the following commits: bdf3a57 [Ismael Juma] Update `AclCommandTest` 78b8467 [Ismael Juma] Change variable name to `forceOpt` 0bb27af [Ismael Juma] Use `--force` instead of `--yes` in `AclCommand` MINOR: Fix wrong comments Author: Yukun Guo <gyk.net@gmail.com> Reviewers: Gwen Shapira Closes apache#1198 from gyk/fix-comment KAFKA-3723: Cannot change size of schema cache for JSON converter Author: Christian Posta <christian.posta@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#1401 from christian-posta/ceposta-connect-class-cast-error KAFKA-3710: MemoryOffsetBackingStore shutdown ExecutorService needs to be shutdown on close, lest a zombie thread prevent clean shutdown. ewencp Author: Peter Davis <peter.davis@expeditors.com> Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#1383 from davispw/KAFKA-3710 MINOR: Delete unused code in FileStreamSourceTask Author: leisore <leisore@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#1433 from leisore/master KAFKA-3749; fix "BOOSTRAP_SERVERS_DOC" typo Author: manuzhang <owenzhang1990@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#1420 from manuzhang/KAFKA-3749 MINOR: Fix tracing in KafkaApis.handle() requestObj() returns null for the o.a.k.c.requests objects so use header() for these. Once all the requests will have been replaced by o.a.k.c.requests objects, we should be able to clean that up, but in the meantime it's useful to trace both. Author: Mickael Maison <mickael.maison@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes apache#1435 from mimaison/kafkaapis_trace MINOR: Fix a couple of scaladoc typos Author: Vahid Hashemian <vahidhashemian@us.ibm.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes apache#1440 from vahidhashemian/typo06/fix_typos_in_code_comments KAFKA-3682; ArrayIndexOutOfBoundsException thrown by SkimpyOffsetMap.get() when full Limited number of attempts to number of map slots after the internal positionOf() goes into linear search mode. Added unit test Co-developed with mimaison Author: edoardo <ecomar@uk.ibm.com> Reviewers: Jun Rao <junrao@gmail.com> Closes apache#1352 from edoardocomar/KAFKA-3682 KAFKA-3678: Removed sleep from streams integration tests Author: Eno Thereska <eno.thereska@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#1439 from enothereska/KAFKA-3678-timeouts1 KAFKA-3767; Add missing license to connect-test.properties This address to https://issues.apache.org/jira/browse/KAFKA-3767. Author: Sasaki Toru <sasakitoa@nttdata.co.jp> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes apache#1443 from sasakitoa/test_failure_no_license KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message. Author: Ishita Mandhan <imandha@us.ibm.com> Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#1429 from imandhan/KAFKA-3158 KAFKA-3765; Kafka Code style corrections Removed explicit returns, not needed parentheses, corrected variables, removed unused imports Using isEmpty/nonEmpty instead of size check, using head, flatmap instead of map-flatten Author: Joshi <rekhajoshm@gmail.com> Author: Rekha Joshi <rekhajoshm@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes apache#1442 from rekhajoshm/KAFKA-3765 MINOR: Remove synchronized as the tasks are executed sequentially Author: Liquan Pei <liquanpei@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#1441 from Ishiihara/remove-synchronized MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset` `numAcks` is only used in the `trace` logging statement so it should be a `def` instead of a `val`. Also took the chance to improve the code and documentation a little. Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#1449 from ijuma/minor-avoid-trace-logging-computation-in-partition
…y dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message. Author: Ishita Mandhan <imandha@us.ibm.com> Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#1429 from imandhan/KAFKA-3158
This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.