Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9525: add enforceRebalance method to Consumer API #8087

Merged
merged 2 commits into from
Mar 1, 2020

Conversation

ableegoldman
Copy link
Contributor

As described in KIP-568.

Waiting on acceptance of the KIP to write the tests, on the off chance something changes. But rest assured unit tests are coming ⚡️

Will also kick off existing Streams system tests which leverage this new API (eg version probing, sometimes broker bounce)

@ableegoldman
Copy link
Contributor Author

The KIP has just been accepted: call for review @guozhangwang @hachikuji

@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the PR. Just a wild question if we could have scenarios where some partitions claimed to be owned by more than one instance at the joined subscriptions and if yes do we already handle it gracefully?

Comment on lines 2262 to 2267
* this method can be used to trigger a reassignment. The kind of system change that would merit calling this
* method is one that would also be reflected in the {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}
* userdata that is passed to the assignor so it can make assignment decisions based on more than group membership
* and metadata. If your assignor does not use this userdata, or you do not use a custom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kind of ... more than group membership and metadata is a bit unclear what it tries to convey. Or are you trying to say "E.g. you want to trigger a rebalance with newly encoded subscription metadata, but none of the membership or subscription topics or patterns have changed so consumer group would not automatically rebalance, you can use this function"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general motivation behind this was just to avoid users calling this unnecessarily or for the wrong reasons: basically users should not use this to try and manually intervene with the group membership, as that is and should be handled entirely internally.

The partition assignment only has two types of info with which to determine the assignment, one being the cluster metadata and the other being the subscription of each member. If there is a membership/cluster metadata change, the consumer will rebalance on its own. And if nothing changes, there's no reason to trigger a rebalance as the assignment should be the same. However, if there is a change that affects the userdata encoded in the subscription, the consumer will not know about this and therefore won't trigger a rebalance -- unless you tell it to.

Does that make sense? Again, the purpose is just to avoid users calling this except for the specific reason it exists, a change affecting subscription userdata. I'll try to think of a better way to phrase this in the javadocs without adding 1000 lines to the PR -- suggestions welcome :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense :) I think we are on the same page, and I agree it's a bit hard to convey the message in a straight-forward phrasing. I do not have a strong suggestion here and I think your last sentence actually is good: "E.g. if there is a change that affects the userdata encoded in the subscription", consumers would not automatically rebalance and the user may want to use this.

}
}
}

private void enforceRebalance() {
consumer.unsubscribe();
subscribeConsumer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously in unsubscribe we do make sure all tasks are closed before re-subscribing, now we are relying on the consumer's response to let us know which (subset of) tasks are to be closed -- so although the task-migrated is triggered by fenced, if the heartbeat thread has not got the error code and reset the group info we would not call onPartitionsLost before re-joining the group, in that case would we possibly have more than one instance claiming to own some same partitions? Worth checking the consumer.assign function to see if the code already handles this case.

Generally speaking I do not know if it would expose any edge cases or not. Better run our system tests a couple of times to make sure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point, I think we actually do want to use the old method of unsubscribing in the specific case of a TaskMigratedException, and this new API only for version probing, KIP-441, etc.

I don't think it would be incorrect to do this, just inefficient -- if the heartbeat thread hasn't reset the group by the next poll call, the consumer will call onPartitionsRevoked (eager) or nothing (cooperative) and send a JoinGroup. On the response it will realize it dropped out and then invoke onPartitionsLost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Actually, looking at onLeavePrepare it seems if the heartbeat thread hasn't reset the group then we won't call onPartitionsLost even on unsubscribe
You partially addressed this here, but what happens if we aren't in a rebalance yet? I suppose we could actually combine the old method and the new by calling enforceRebalance and then unsubscribe but that feels a bit hacky. WDYT?
Looking at this again I'm not sure I understand/remember why it's correct to invoke onPartitionsLost instead of onPartitionsRevoked if a rebalance is in progress during unsubscribe -- can you explain that a bit further?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's always a race condition because the fencing error could be notified from the brokers in two ways: 1) heartbeat, 2) commit (including txnal send-offsets). Either one could be ahead of the other in getting the error code.

When we get task-migrated it means 2) got the error, at that time 1) may not get it yet so the local state may still be "stable" and generation not reset yet. In that case here we would revokeAll still and committing which is doom to fail again, otherwise we would lostAll (note the PR you referenced just make sure that "otherwise" case happens). In my other PR I also realized this from some system test failures and did this change to always "handleLostAll" before re-subscribing:

https://github.com/apache/kafka/pull/8058/files#diff-045aeaddb4232a85a8560186b4901e69R763

In some sense this is to be conservative and make sure when re-subscribing we do not encode any "owned partitions".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit concerned with this change, as we are calling onJoinPrepare before onLeavePrepare which changes the existing sequence for no specific reason.

Copy link
Contributor

@guozhangwang guozhangwang Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current condition is only generation() == Generation.NO_GENERATION || rebalanceInProgress() and enforceRebalance only sets rebalanceNeeded to false, so after that calling unsubscribe sill does not guarantee we trigger onPartitionsLost instead of onPartitionsRevoked right?

That's why I think even with my previous fix it is not safe to say we'd always trigger lost, and hence I added the taskManager#handleAllLost. What's your concern of adding this call before enforceRebalance here?

Copy link
Contributor Author

@ableegoldman ableegoldman Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, yeah, I was confusing MemberState.REBALANCING with needsRejoin. Calling enforceRebalance first wouldn't help.
I guess my preference would be to fix this in the consumer client somehow, ie make sure it invokes the correct callback during unsubscribe. For example if we get TaskMigratedException due to a failed commit, the consumer will reset the generation itself. But I'm not sure we can rely on it resetting the generation for any arbitrary cause of TaskMigratedException -- which can happen for reasons outside of consumer calls failing IIUC.
If we can't fix in the consumer client, then I would agree we should catch TaskMigratedException and call handleLostAll followed by unsubscribe (and resubscribe).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my real preference would be to consider adding a second new Consumer API as part of this KIP, which acts similar to calling unsubscribe -> resubscribe but always invokes onPartitionsLost. Something like rejoinGroupAsZombie or rejoinAfterDroppingOut to force the consumer to acknowledge that it's dropped out and lost all its partitions.
Alternatively we could add a Consumer API that basically just calls resetGeneration, and then leave it up to the user to call unsubscribe or enforceRebalance themselves. But, not sure if this is exposing too much through the Consumer interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if ConsumerCoordinator already reset generation on all occasions when throwing the consumer fenced exception that is still not safe, sine we could still be fenced on the producer side.

Personally I'm okay with either calling handleAllLost before unsubscribe, or adding a boolean parameter as in enforceRebalance(boolean resetGeneration), seems @ableegoldman is preferring the latter here so I'm giving it to you :) (feeling we are making the new API more and more advanced-usage only now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd rather handle it within Streams or add a new API altogether than start adding semi-orthogonal parameters to this API. Since we can handle this in a reasonable way within Streams (for now) I'm fine with doing so and we can always revisit the issue later

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API change LGTM overall, we just need to add some unit tests to KafkaConsumer and ConsumerCoordinator

* of the current rebalance. Only when it is not possible to validate whether the latest metadata was used based
* on the received assignment should you consider retrying based on the return value.
* <p>
* You should not need to call this during normal processing, as the consumer group will manage itself
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/should not need to/don't need to

}
}
}

private void enforceRebalance() {
consumer.unsubscribe();
subscribeConsumer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit concerned with this change, as we are calling onJoinPrepare before onLeavePrepare which changes the existing sequence for no specific reason.

@@ -293,6 +293,11 @@ boolean tryToCompleteRestoration() {
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
if (activeTaskIds().isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @abbccdda We are still at risk of the race condition on a TaskMigratedException where we call onRevoked instead of onLost -- when we directly call handleLostAll we'll remove all active tasks, then invoking handleRevocation will throw IllegalStateException below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just discussed this with @abbccdda when we were discussing another PR: #8168 and it turns out that in handleRevocation we should actually not throw IllegalStateException at all, since with KIP-429 the handleAssignment triggered from the streams-assignor should close the tasks already so when handleRevocation is triggered we would not have those tasks to be suspended, so we should actually remove that throw statement and replaced with a info log only.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Comment on lines 2262 to 2267
* this method can be used to trigger a reassignment. The kind of system change that would merit calling this
* method is one that would also be reflected in the {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription Subscription}
* userdata that is passed to the assignor so it can make assignment decisions based on more than group membership
* and metadata. If your assignor does not use this userdata, or you do not use a custom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense :) I think we are on the same page, and I agree it's a bit hard to convey the message in a straight-forward phrasing. I do not have a strong suggestion here and I think your last sentence actually is good: "E.g. if there is a change that affects the userdata encoded in the subscription", consumers would not automatically rebalance and the user may want to use this.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment about unit test, otherwise LGTM!

@@ -1951,6 +1951,19 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
};
}

class CountingRebalanceListener implements ConsumerRebalanceListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe extract out the MockRebalanceListener out of ConsumerCoordinatorTest to be shared by multiple test classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack good call

@guozhangwang
Copy link
Contributor

@ableegoldman Could you trigger a system test before we merge?

@ableegoldman
Copy link
Contributor Author

I actually did run all the system tests a few days ago and don't think I changed anything since then, but, to be safe here is a fresh run:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3781/

@guozhangwang
Copy link
Contributor

@ableegoldman https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3781/ fails on 12 tests, some of them are upgrade ones so maybe not related, but I'm not sure for the others.

@ableegoldman
Copy link
Contributor Author

They all hit the Cannot perform operation after producer has been closed bug, rebased to get the fix and kicked off yet another round:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3790/

@ableegoldman
Copy link
Contributor Author

ableegoldman commented Feb 29, 2020

All tests passed except version probing, which failed for reasons unrelated to this PR

We must have updated the "triggering a rebalance" message logged in StreamThread recently but missed updating the system test which looks for that. I'm a little concerned because it should have been failing since the 20th, which was when I was on call, but I never received any notice of a failure in the system tests whatsoever..

Fixed the test and kicked off another build of just the version probing test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3793

@ableegoldman
Copy link
Contributor Author

With the log message fix the version probing test now passes (ran with 10 repeats). Should be ready for merge @guozhangwang

@guozhangwang
Copy link
Contributor

Thanks for digging into the system tests!

@guozhangwang guozhangwang merged commit a1f2ece into apache:trunk Mar 1, 2020
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
@ableegoldman ableegoldman deleted the 9525 branch June 26, 2020 22:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
consumer kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants