-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-17861][task][checkpointing] Split channel state handles sent to JM #12292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17861][task][checkpointing] Split channel state handles sent to JM #12292
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 96bfb4f (Fri May 22 09:08:06 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
96bfb4f
to
14a727c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach would be to move the underlying state handle one level up (OperatorSubtaskState) and not store references to it in channelStateHandles. It would be more effective (less data duplication) but also more error-prone (implicit structure), less flexible (re-scaling), and require more changes.
Could you elaborate a bit more? What's the alternative? How would it avoid more data duplication? Are we still duplicating data with this PR?
.../test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java
Outdated
Show resolved
Hide resolved
if (underlying instanceof ByteStreamStateHandle) { | ||
ByteStreamStateHandle byteHandle = (ByteStreamStateHandle) underlying; | ||
return buildHandle.apply( | ||
e.getKey(), | ||
new ByteStreamStateHandle(randomUUID().toString(), serializer.extractAndMerge(byteHandle.getData(), e.getValue())), | ||
singletonList(serializer.getHeaderLength())); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like a generic/error prone solution. It looks like something should be extracted to the StreamStateHandle
. boolean StreamStateHandle#isDirectlyHoldingData()
? Or we are missing some kind of different abstraction of shared state handles.
Why is this issue new for spilled channel state? What's different for operators state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added StreamStateHandle.asBytesIfInMemory
that returns Optional<byte[]>
(better names are welcomed ;)
Why is this issue new for spilled channel state? What's different for operators state?
Operators don't share state handles with each other so they don't have this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better now, but I still can not put a finger on a feeling that something might not be entirely correct.
Why do we have multiple handles per channel, instead of a single handle that has multiple channels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only one InputChannelStateHandle
per channel.
Putting multiple channels could make recovery (rescaling) more difficult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After an offline discussion, we decided to keep it for now as it is. Could you create a JIRA ticket to revisit this issue later (linked to how to handle rescaling) and linked it as TODO
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Created a JIRA ticket: https://issues.apache.org/jira/browse/FLINK-17972
14a727c
to
1d9ab48
Compare
Thanks for the feedback @pnowojski , I've addressed the issues (except this one). Answering your question:
Current structure is the following (this PR doesn't change it):
The underlying An alternative would be something like
|
c9a38fe
to
3814750
Compare
My questions probing in the direction of single channel state handle seems to be going in a very similar direction as the alternate solution that you proposed: moving the shared
|
…tRecordingOffsets 1. Set writer position in NetworkBuffer passed to ChannelStateCheckpointWriter.write 2. Reduce state size to fit in the configured MemoryCheckpointOutputStream
… deserializer Motivation: 1. add a method that deserializes and then serializes data 2. simplify
…o JM Before this commit, if Unaligned checkpoints are enabled, channel state is written as state handles. Each channel has a handle and each such handle references the same underlying streamStateHandle (this is done to have a single file per subtask). But, if the state is less then state.backend.fs.memory-threshold, the data is sent directly to JM as a byteStreamHandle. This causes each channel state handle to hold the whole subtask state. This change solves this by extracting relevant potions of the underlying handles if they are byteStreamHandles.
3814750
to
7cbe552
Compare
@flinkbot run azure |
Merging from #12362 . Build was successful there, here e2e timed out because of getting artefacts from mvn cache took very long time |
What is the purpose of the change
If Unaligned checkpoints are enabled, channel state is written as state handles. Each channel has a handle and each such handle references the same underlying
streamStateHandle
(this is done to have a single file per subtask).But, if the state is less then
state.backend.fs.memory-threshold
, the data is sent directly to JM as a byteStreamHandle. This causes each channel state handle to hold the whole subtask state.This PR solves this by extracting relevant potions of the underlying handles if they are
byteStreamHandle
s.Another approach would be to move the underlying state handle one level up (
OperatorSubtaskState
) and not store references to it inchannelStateHandles
. It would be more effective (less data duplication) but also more error-prone (implicit structure), less flexible (re-scaling), and require more changes.Verifying this change
This change is already covered by existing tests, such as:
ChannelStateCheckpointWriterTest.testRecordingOffsets
(the test was corrected)ChannelPersistenceITCase
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation