New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5096] Make the RollingSink rescalable. #2845
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could not find any functional issues, only had some comments regarding test readability.
testHarness3.processElement(new StreamRecord<>("test4", 0L)); | ||
checkFs(outDir, 3, 1, 0, 0); | ||
|
||
// intentionally we snapshot them in the reverse order so that the states are shuffled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3->1->2 is not a reverse order
|
||
testHarness.setProcessingTime(0L); | ||
|
||
testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment should include the conclusion the reader is supposed to arrive on: that every record goes in a separate file.
testHarness1.initializeState(mergedSnapshot); | ||
testHarness1.open(); | ||
|
||
// because we do not have a pending for part-2-0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comments needs some context; as in X happened "because we do not have a pending for part-2-0"
testHarness2.setup(); | ||
testHarness2.open(); | ||
|
||
testHarness1.processElement(new StreamRecord<>("test1", 1L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a particular reason why you use 1L
here instead of 0L
like in testScalingDown
?
testHarness2.snapshot(0, 0) | ||
); | ||
|
||
testHarness1 = createRescalingTestSink(outDir, 2, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens here would be easier to understand if you would say which sink receives which states.
Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint.
Thanks for the review @zentol ! I integrated your comments and rebased to the master. |
); | ||
|
||
//with the above state reshuffling, we expect the new testHarness1 | ||
// to take the state of the previous testHarness3 and testHarness2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't harness1 receive the state of 3 and 1?
What do you think about having the testHarness2 process 5 elements? This way one could always accurately deduce which of the subsequent harnesses got which state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right.
// to take the state of the previous testHarness3 and testHarness2 | ||
// while the new testHarness2 will take that of the previous testHarness1 | ||
|
||
testHarness1 = createRescalingTestSink(outDir, 2, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's give the new harnesses a new name. (aka, 4 and 5)
Just 2 more comments; i can also fix them while merging if you agree with them. |
Hi @zentol ! Thanks for the review. I integrated the comments. |
Great, will start merging this. |
Thanks! |
Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint. This closes apache#2845.
Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint. This closes apache#2845.
Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint. This closes apache#2845.
Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint. This closes apache#2845.
Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint. This closes apache#2845.
This PR integrates the RollingSink with the new state abstractions, so that the sink can
resume execution from a checkpoint, with different parallelism, without reducing the provided
exactly once guarantees.
R @zentol