Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4948: Wait for offset commit in test to fix transient failure #3246

Closed
wants to merge 2 commits into from

Conversation

rajinisivaram
Copy link
Contributor

DescribeConsumerGroupTest#testDescribeExistingGroupWithNoMembersWithNewConsumer shuts down the consumer executor thread and then checks that the assignments returned by describeGroup contain the consume group with no members. But if the executor thread is shut down before any offsets are committed, the assignments returned by describeGroup doesn't contain the group at all. This PR waits for an offset commit by waiting for the group to appear in describeGroup assignments prior to shutting down the executor.

@rajinisivaram
Copy link
Contributor Author

@ijuma Can you take a look, please? Thank you...

// Wait for an offset commit before shutting down the group executor.
TestUtils.waitUntilTrue(() => {
val (_, assignments) = consumerGroupService.describeGroup()
!assignments.toSeq.flatMap(_.filter(_.group == group)).isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can say something along the lines of assignments.exists(_.exists(_.group == group)) (I didn't try to compile it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thank you, updated. I will merge once the builds complete.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, one minor comment, LGTM otherwise.

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4949/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4965/
Test FAILed (JDK 7 and Scala 2.11).

@rajinisivaram
Copy link
Contributor Author

Test failure is unrelated, merging to trunk and 0.11.0.

asfgit pushed a commit that referenced this pull request Jun 6, 2017
`DescribeConsumerGroupTest#testDescribeExistingGroupWithNoMembersWithNewConsumer` shuts down the consumer executor thread and then checks that the assignments returned by `describeGroup` contain the consume group with no members. But if the executor thread is shut down before any offsets are committed, the assignments returned by `describeGroup` doesn't contain the group at all. This PR waits for an offset commit by waiting for the group to appear in `describeGroup` assignments prior to shutting down the executor.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3246 from rajinisivaram/KAFKA-4948

(cherry picked from commit 3de6839)
Signed-off-by: Rajini Sivaram <rajinisivaram@googlemail.com>
@asfgit asfgit closed this in 3de6839 Jun 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants