-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-18094][checkpointing] Unifies the creation of BarrierHandlers and CheckpointedInputGate. #12575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit e78203a (Wed Jun 10 08:13:41 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
rkhachatryan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @AHeise , the code also looks simpler now.
I think it can be improved a bit further, please see my comments below.
Can you also point me out which test exactly verifies the correctness of indices?
...k-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
Outdated
Show resolved
Hide resolved
...ing-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
Outdated
Show resolved
Hide resolved
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
Outdated
Show resolved
Hide resolved
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've skimmed over the code (I didn't do the full review). Mostly it LGTM. General remark is that first couple of comments are missing explanation what are they doing in the commit message. They look like maybe they could be kept as individual commits, but it's hard to understand them now with the context, which such commit message could provide.
...ime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
Outdated
Show resolved
Hide resolved
...k-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
Show resolved
Hide resolved
The method assumed that the gates have consecutive indexes starting at 0.
In the following commits, this method will be used to fetch information about all channels without explicitly needing to access the channels. Thus, for tests mocks just need to return meaningful InputChannelInfos instead of actually creating the respective channels.
…reateCheckpointedInputGate to createCheckpointedMultipleInputGate.
…ile creating checkpoint handlers. The actual implementation have been lists all along and we assume ordering anyways.
…lInfo. This removes the need to translate the InputChannelInfo back and forth to flattened indexes across all InputGates. All index-based data structures are replaced by maps that associate a certain state to a given InputChannelInfo. For performance reasons, these maps are fully initialized upon construction, such that no nodes need to be added/removed during runtime and only values are updated. Additionally, this commit unifies the creation of BarrierHandlers (similar signature) and removes the error-prone offset handling from CheckpointedInputGate.
Thank you for your feedback. I expanded the commit messages. |
rkhachatryan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this difficult (I guess) re-work @AHeise !:)
The changes LGTM overall, I have only a question about using Map instead of Set.
|
|
||
| /** Flags that indicate whether a channel is currently blocked/buffered. */ | ||
| private final boolean[] blockedChannels; | ||
| private final Map<InputChannelInfo, Boolean> blockedChannels; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use Map<Key, Boolean> instead of just Set<Key> (also in CheckpointBarrierUnaligner, ThreadSafeUnaligner)?
(I guess we can avoid the penalty of dynamic reallocation by setting set capacity in advance)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right (I tried to explain it in the commit message). Also internally HashSet uses LinkedHashMap, so it's even then slower than using HashMap directly even with dynamic reallocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also internally
HashSetusesLinkedHashMap
Can you explain what do you mean?
In my OpenJDK 11 HashSet uses HashMap under the hood, and I believe this is what most implementations do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, I misread the code. It's only used while calling LinkedHashSet -> super(HashSet).
Anyways, the current way is used to avoid dynamic reallocations. I can also write a small wrapper Set implementation (similar to EnumSet) if it's too hard to read right now.
Performance-wise there is no gain on a HashSet#containsKey on an empty set to a HashMap#get on a filled set with same capacity. However, I'd we avoid object creations and deletions if we just update the nodes on write access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it's not a premature optimization (I think contains is faster for set but put/remove slower; and it all may not be visible).
But I'm also fine with the current approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashSet#contains uses HashMap#containsKey, which fetches the node of the key. Without hash collisions and overflow lists, that is exactly the same. The question is how many collisions we have. It may also depend on whether the respective Set is mostly filled or empty (e.g., blockedChannels should be mostly empty). We might want to potentially invert the current semantics to keep them empty as long as possible.
My main motivation was to avoid adding anything to GC pressure and using mostly a constant data structure as before to not change too much. I think I'd leave it as is for now. We can revise it when we adjust the threading model of Unaligner, which will be a major change on the code anyways.
|
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @AHeise . Merging it to master, will probably merge it to release-1.11 after RC2 cut.
Build failure caused by e2e time out issues that we had yesterday/today morning.
What is the purpose of the change
For multi-gate inputs, there existed inconsistent offset handling of BarrierHandlers and CheckpointedInputGates for unaligned checkpoints. This PR unifies the creation of BarrierHandlers and CheckpointedInputGate.
Brief change log
Verifying this change
Covered by existing (modified) tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation