-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-24807][iteration] Fix the issues in checkpoints with iteration #25
[FLINK-24807][iteration] Fix the issues in checkpoints with iteration #25
Conversation
f14e74c
to
29ce569
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gaoyunhaii for opening this pr. I have some following minor comments below.
BTW I just think that we could not support Batch
execution mode. So I think it would be nice to tell the user when they set the execution mode to Batch
. I am sorry that I find it so late. But I think you could open a separate pr for this.
Thanks
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/ReplayOperator.java
Show resolved
Hide resolved
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/ReplayOperator.java
Outdated
Show resolved
Hide resolved
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/ReplayOperatorTest.java
Show resolved
Hide resolved
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/ReplayOperatorTest.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
Show resolved
Hide resolved
.../main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
Show resolved
Hide resolved
.../main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
Show resolved
Hide resolved
...-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/test/iteration/operators/TwoInputReducePerRoundOperator.java
Outdated
Show resolved
Hide resolved
…rminating Currently the HeadCoordinator would emit CoordinatorCheckpointEvent to the tasks so that the GloballyAlignedEvent would not be interleave with the checkpoint barrier. Howver, if the tasks are finished and we continue emitting the event, the checkpoint would fail due to there are failed operator events. To address this issue, we would stop emitting CoordinatorCheckpointEvent after the head operator is terminating, namely it received the GloballyAlignedEvent marking terminating.
…e barrier feed back first
29ce569
to
00ec475
Compare
We have met some more issues in supporting checkpoints with iteration:
Specially for the second point, Currently the HeadCoordinator would emit CoordinatorCheckpointEvent to the tasks so that the GloballyAlignedEvent would not be interleave with the checkpoint barrier. However, if the tasks are finished and we continue emitting the event, the checkpoint would fail due to there are failed operator events. To address this issue, we would stop emitting CoordinatorCheckpointEvent after the head operator is terminating, namely it received the GloballyAlignedEvent marking terminating.
The third point is required since operators like
withBroadcast
might rely on raw operator state to snapshot the cached records. This is necessary since the normal operator state always resides in the memory, which might be not enough.