New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5985] Report no task states for stateless tasks on checkpointing #3523
[FLINK-5985] Report no task states for stateless tasks on checkpointing #3523
Conversation
The changes look reasonable :) |
I could only try the backported version on the topology that caused the problem initally (that is running 1.2.0) |
@gyfora if the effort is reasonable, it would be great to try this out on your topology. As soon as you give your +1, I could merge this change :-) |
Im gonna try to cherry-pick this on 1.2 and run it today |
Great, thanks! |
There seems to have been some changes in the StreamTask and some tests so I couldn't rebase this nicely. Do you have a minute to take a look and maybe push a branch with the backport please? That would help me a lot. |
Sure, I just quickly prepared a backport here: https://github.com/StefanRRichter/flink/tree/FLINK-5985-backport-to-1.2 |
Hm, doesnt seem to work for the first try. What I did is I updated the client with the new jar based on your backport branch. Redeployed the job with a savepoint (to get the new Flink version), took a savepoint and tried to redeploy with the changed topology. I still seem to get the same error. Is it possible that the previous checkpoints have an effect on this? In any case I will double check tomorrow morning and try to do the test again. |
It also doesnt seem to work starting from a clean state and then savepoint redeploy with changed topology so maybe I am really screwing up something |
@StefanRRichter It seems to work correctly locally, I am trying to see what went wrong with my yarn tests, but this shouldnt block you |
Ah, the reason is probably that I didnt change my job jar, and this relies on changes in the rocks backend |
Ok, then the mystery is finally solved :-) Thanks again for reporting this problem and your additional testing efforts! |
On a second thought, shouldnt the older rocks backend version still work? (I guess that still returns a Done future with null value) |
af45e89
to
20ef549
Compare
Yes, it should still work because the changes on |
Hm, one potential pitfall that I see is operator chaining, in case your stateless operators are chained together with stateful ones. But then again, you said it works locally? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this fix is good.
The StreamTask
could use some cleanup to make tests easier (less whiteboxing needed). We should do that in a separate refactoring, either as a followup to this, or as a preparation for a modified version of this.
What do you think?
* happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint. | ||
*/ | ||
@Test | ||
public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fact that this test requires extensive whiteboxing means we should move the whole CheckpointOperation
to a separate class and make it work independent of StreamTask
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could separate the classes. CheckpointOperation
is already a static inner class anyways. I would suggest to do this in a followup.
I think that verifying the possibility to reconfigure a job with respect to stateless operators warrants an ITCase. Can we extend the |
@StephanEwen, I have added the IT case. Please have another look. |
Can we slightly adapt the test to target more the typical use case:
Otherwise this looks good. |
4cb4900
to
6382484
Compare
6382484
to
f9a9ecf
Compare
Thanks for the review @StephanEwen. I updated the test as suggested. Merging this now. |
This PR fixes [FLINK-5985]. The solution is based on acknowledging
null
instead of some empty SubtaskState toCheckpointCoordinator#acknowledgeCheckpoint(...)
, so that noTaskState
is registered under theJobVertexID
of a stateless task in the checkpoint.