-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header. #168
Conversation
This commit exposes the list of key groups that can be written into the raw keyed stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fixes @igalshilman! Overall the changes looks very good to me.
Had one comment regarding one extra UT, other than that this LGTM.
final Iterable<Integer> assignedKeyGroupIds = | ||
checkpointedStreamOperations.keyGroupList(keyedStateOutputStream); | ||
// the underlying checkpointed raw stream, requires that all key groups assigned | ||
// to this operator must be written to the underlying stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm wondering if it makes sense to add a TODO here to help remind us in the future that after FLINK-19748 (allow skipping key groups) is merged, we may choose to revert writing empty key groups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it would be better to write down the empty groups anyways, because they are presented to us on restore.
And having a header there, would help us to differentiate between different versions.
But I can definitely add a TODO: revist this after 19748 is merged.
would that work for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t have a strong opinion on whether or not the empty key groups should stay there in the long term, so fine by me to keep this as is without the TODO comment to revisit 👍
@@ -79,6 +81,21 @@ public void roundTripWithSpill() throws Exception { | |||
roundTrip(1_000_000, 0); | |||
} | |||
|
|||
@Test | |||
public void testHeader() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, this test verifies the header serde round trip, in the case that the header was written.
As a counterpart, could you add a test that verifies Header.skipHeaderSilently
is effectively a no-op if the header was missing in the input stream?
i.e.,
another variant of this test where the line UnboundedFeedbackLogger.Header.writeHeader(out);
is removed should be passing as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes definitely. Would add that.
Thanks for addressing my comments @igalshilman! |
This PR allows creating and restoring from savepoints that contain elements in the feedback log, taken as of this PR and moving forward.
Restoring from savepoints taken prior to this PR (with elements in the feedback log) requires a followup work.
This PR changes the on the wire checkpoint format for the feedback log.
For each key group written to the raw keyed stream (empty or not), we now write the following header:
<statefun version: int><statefun magic header: int>
We start with the version 0 and a magic header that is a constant random number. Writing the version and the magic header are made for safety and format evolution.
The version number
0
is carefully chosen as a workaround to FLINK-19692. This is because the InternalTimerManager reads that first 0 (that we wrote as a version) and it will not try to read anything else from the stream.