-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[Flink 29627][streaming] Fix duplicate key exception during recovery more than 1 committable. #21101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Flink 29627][streaming] Fix duplicate key exception during recovery more than 1 committable. #21101
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments to improve the test setup, hopefully. I'll take another more in-depth look tomorrow, but I think we can finish it up by EOD tomorrow.
Please also adjust the commit message formatting i.e. [FLINK-29627].
| assertThat(subtaskCommittableManagerCheckpoint2.getSubtaskId()) | ||
| .isEqualTo(subtaskId); | ||
|
|
||
| int i = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the expectedNumberOfPendingRequestsPerCommittable and committableIterator always have same size you can zip (guavas Streams.zip) them it also makes the loop like nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented your suggestion,
however Streams.zip requiters as a third argument a BiFunction that has to return something.
In my case it raterusn a Pair of CheckpointCommittableManagerImpl and List<Integer> expectedNumberOfPendingRequestsPerCommittable elements from respective streams.
In my opinion it is slightly less readable then previous version with while loop but I'm ok with Streams.zip, since we don't have to maintain the index.
| .map(CommitRequestImpl::getCommittable) | ||
| .collect(Collectors.toList())) | ||
| .containsExactly((Integer) expectedPendingRequestCount); | ||
| } else if (expectedPendingRequestCount instanceof Integer[]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you use containsExactlyElementsOf and always pass a list of integers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this hint, I was looking for some alternative method in the API but I must have missed that one, I was focused to find the one that checks also the order as the containsExactly does.
I've implemented your proposition which made assertPendingRequests method much simpler.
However assertCommittableCollector now has to accept List<List<Integer>> expectedNumbersOfPendingRequestsPerCommittable as an argument.
The reason for this is that for testCommittableCollectorV2SerDe test we will have Two committable Managers with one committable each, where for testCommittablesForSameSubtaskIdV2SerDe test we will have one committable manager but with two committables after recovery.
2bd6daf to
9817234
Compare
fapaul
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % rename one of the variables for the inline comment.
Can you also create the backports for 1.16/1.15?
| CommittableCollector<Integer> committableCollector) { | ||
| int expectedNumberOfSubtasks, | ||
| CommittableCollector<Integer> committableCollector, | ||
| List<List<Integer>> expectedNumbersOfPendingRequestsPerCommittable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The variable name is misleading. Shouldn't this be the committablesPerSubtaskPerCheckpoint or something like this? expectedNumbers implies it is referring to the number but we are matching the content of the committable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
… more than 1 committable. Recovery more than one Committable causes `IllegalStateException` and prevents cluster to start. When we recover the `CheckpointCommittableManager` we deserialize SubtaskCommittableManager instances from recovery state, and we put them into `Map<Integer, SubtaskCommittableManager<CommT>>`. The key of this map is subtaskId of the recovered manager. However, this will fail if we have to recover more than one committable. What was implemented as a fix is to call `SubtaskCommittableManager::merge` if we already deserialize manager for this subtaskId. Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
9817234 to
ab99181
Compare
|
@flinkbot run azure |
What is the purpose of the change
Fixing FLINK-29627 where recovery more than one Committable causesed
IllegalStateExceptionand prevents cluster to start.Implemented fix is based on merging committables from same subtaskId during recovery for Sink V2 architecture.
Brief change log
CheckpointSimpleVersionedSerializer::deserializeby callingSubtaskCommittableManager::mergefor committables for same subtaskId.Verifying this change
Committablecollectorserializertest::testCommittablesForSameSubtaskIdV2SerDeto verify deserialization of multiple committables for the same subtaskId.CommitterOperatorTest::testStateRestoreto include committables for same subtaskId.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation