Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12983: reset needsJoinPrepare flag before rejoining the group #10986

Merged

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented Jul 7, 2021

The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance.

We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method. Should be cherrypicked back to 2.8 at least

@dajac
Copy link
Contributor

dajac commented Jul 7, 2021

@ableegoldman Thanks for the patch. The change makes sense to me. I wonder if we could add a unit test which would fail without it though. This would avoid regressing in the future. What do you think?

@hachikuji
Copy link
Contributor

hachikuji commented Jul 7, 2021

@ableegoldman Thanks for the patch. I think the original idea behind the implementation was to ensure that each rebalance triggered only one call to onPartitionsRevoked. It sounds like this needs some refinement for the cooperative rebalance logic. I guess the main difference is that we could now have a call to onPartitionsLost if the memberId is lost after the initial call to onJoinPrepare? It might be nice to ensure that we can keep the same behavior for eager rebalancing.

@guozhangwang
Copy link
Contributor

@hachikuji I think the key idea behind this fix is that, if a rebalance failed with e.g. memberId lost, then conceptually we would just started a new rebalance in which we would call onJoinPrepare and in which we may call onRepartitionsRevoked again. This behavior would be the same for eager or cooperative.

Personally I think this fix is fine -- @ableegoldman if you could just add a unit test for the case of memberId lost during a first rebalance, and check that we would re-triggered onJoinPrepare again?

@hachikuji
Copy link
Contributor

To clarify, from the perspective of the eager protocol, how would this case look? Would we get multiple calls to onPartitionsRevoked with the same set of partitions or something else?

@ableegoldman
Copy link
Contributor Author

@hachikuji in the EAGER case, after the first onJoinPrepare / onPartitionsRevoked, the subscription would have been cleared. So any subsequent invocations of onPartitionsRevoked would be with an empty set of partitions

@everyone, I was having trouble getting a unit test that would actually verify this behavior but I wanted to kick off discussion on the fix ASAP (for obvious reasons) so I opened the PR without one. I do intended to add a test, I just haven't had time to pursue that yet. Suggestions welcome :P

@ableegoldman
Copy link
Contributor Author

Ok I realize we actually do have a test that reproduces this already: ConsumerCoordinatorTest.testRebalanceWithMetadataChange. This test sets up a case where a change in topic metadata triggers a rebalance after a member had joined the group, after which the change is reverted so that the metadata is ultimately the same. Then a NOT_COORDINATOR response is sent to fail the initial JoinGroup, and the test just verifies that the member attempts to rejoin until successful. It also verifies things like the number of times each rebalance callback is invoked, and the set of partitions that the callbacks receive.
This test actually only failed in the COOPERATIVE case, which confirms that the behavior remains correct for the EAGER case. When following the COOPERATIVE protocol, the test was formerly assuming that the member would retain all partitions despite actually having its generation and memberId cleared when the initial JoinGroup is failed. So it was technically asserting the wrong behavior beforehand; just fixing this gives us a unit test for this patch after all.

@ableegoldman ableegoldman force-pushed the 12983-always-invoke-onJoinPrepare branch from 5911911 to 0483d07 Compare July 13, 2021 03:30
@ableegoldman
Copy link
Contributor Author

Now ready for review @dajac @hachikuji @guozhangwang

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@ableegoldman
Copy link
Contributor Author

ableegoldman commented Jul 13, 2021

Two test failures, both ConsumerBounceTest.testCloseDuringRebalance(). This test is already known to be flaky and failed with the same error that has been reported before (KAFKA-8529), so I think we can conclude that this was unrelated.

@ableegoldman ableegoldman merged commit 1f64df9 into apache:trunk Jul 13, 2021
ableegoldman pushed a commit that referenced this pull request Jul 13, 2021
…10986)

The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method. 

Reviewers: Guozhang Wang <guozhang@apache.org>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
@ableegoldman
Copy link
Contributor Author

Merged to trunk and cherrypicked to 2.8 & 3.0 (cc @kkonstantine)

ableegoldman pushed a commit that referenced this pull request Jul 13, 2021
…10986)

The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method.

Reviewers: Guozhang Wang <guozhang@apache.org>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
ableegoldman pushed a commit to ableegoldman/kafka that referenced this pull request Sep 17, 2021
…pache#10986)

The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method.

Reviewers: Guozhang Wang <guozhang@apache.org>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Sep 17, 2021
…pache#10986)

The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method.

Reviewers: Guozhang Wang <guozhang@apache.org>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…pache#10986)

The #onJoinPrepare callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the needsJoinPrepare flag inside the resetStateAndRejoin() method. 

Reviewers: Guozhang Wang <guozhang@apache.org>, Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants