-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding #2414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager.
Thank you for opening a pull request to fix the issue. I think we also need to cover another case: What happens when the number of shards has been reduced in a resharding and some fetchers are now without a shard? I think in that case, the worker also needs to emit a final Long.MAX_VALUE, and it has to fail once it gets a shard assigned again. |
Ah yes, correct. I'll update this soon. |
To include the missing case @rmetzger mentioned, it turns out the complete fix is actually more complicated than I expected to perform correct case determination after every reshard, and perhaps might require a little bit of rework on the current shard discovery mechanism to get it right. Heads-up notice that this will probably need a re-review. Sorry for the delay, I'm currently still on it, hopefully will update the PR by the end of today ;) I'll notify when it's ready. |
Thank you for the pull request. I'll merge it to master and the release-1.1 branch. |
… fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager. This closes #2414
Thanks @rmetzger ! |
… fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager. This closes apache#2414
This is a short-term fix, until the min-watermark service for the JobManager described in the JIRA discussion is available.
The way this fix works is that we let idle subtasks that initially don't get assigned shards emit a
Long.MAX_VALUE
watermark. Also, to avoid messing up the watermarks on resharding, we only deliberately fail hard if the new shards are assigned to idle subtasks. So, if all subtasks are not initially idle on startup (i.e., when total number of shards >= consumer parallelism), the Kinesis consumer can still transparently handle resharding like before without failing.I've tested exactly-once with our manual tests (with and w/o resharding) and the fix works nicely, still retaining exactly-once guarantee despite non-transparency.
However, I can't reproduce the unbounded state & akka frame size exceeding with window operators w/o this change (perhaps the window I'm testing with is too simple?), so I'm not sure if that issue is also correctly fixed with this change; I'll need a bit of help to let us clarify this.
This change should also go into the 1.1.2 bugfix release branch.
R: @rmetzger and @aljoscha for review. Thanks in advance!