New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase #8820
[FLINK-12916][tests] Retry cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase #8820
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@flinkbot attention @tillrohrmann , due to you first introduce the retry logic in |
4f0d62c
to
4b0f85f
Compare
fc31bba
to
a7abc03
Compare
|
||
private static final Pattern PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern.compile( | ||
Stream.of( | ||
TRIGGER_SAVEPOINT_FAILURE.message(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sold on having such a generic exception be added to the set of tolerated exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, it's okay to trigger savepoint again when receiving such a general case when triggering savepoint. This test targets on migrate and restore with savepoints not to verify why savepoint failed. My previous commit would set the timeout as 300 seconds, but after I rebase with latest code, the timeout turned to previous 10000 seconds again. I hope to ignore this error and change default timeout to 300 seconds. What do you think?
NOT_ALL_REQUIRED_TASKS_RUNNING.message(), | ||
CHECKPOINT_DECLINED_TASK_NOT_READY.message(), | ||
// If task already in state RUNNING while stream task not running, stream task would then broadcast barrier. | ||
CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER.message()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the thing, this case shouldn't be possible in the first place. For a cancel-with-savepoint, we
- disable the checkpoint coordinator
- trigger a savepoint, and
- once the savepoint completes (successfully!), cancel all tasks.
Given that we only cancel tasks if the SP has completed, and the SP can only complete if all tasks are running, I don't see how we can ever be in a situation where we try to cancel yet not all tasks being in a running state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this test use EXACTLY_ONCE
to run checkpoints, we would analysis all cases that might crated CancelCheckpointMarker
with failed instance of the mvn-1.log.
I paste the error log below, and would use this to analysis:
23:53:31,033 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (ea4ff5f207089adad2fb67617ba00a36) switched from DEPLOYING to RUNNING.
23:53:31,036 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 382 @ 1562889211036 for job 5b085f6a3ad6cb824d085deabaee3baf.
23:53:31,038 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
23:53:31,040 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
23:53:31,042 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 382 by task 88f46280796f75b61dc22d811ebf8911 of job 5b085f6a3ad6cb824d085deabaee3baf at 9ec8cafd-f53e-41f8-b1c0-5531d7ff133f @ localhost (dataPort=-1).
23:53:31,042 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 382 of job 5b085f6a3ad6cb824d085deabaee3baf.
org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyAbortOnCancellationBarrier(CheckpointBarrierHandler.java:96)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processCancellationBarrier(CheckpointBarrierAligner.java:223)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:161)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
at java.lang.Thread.run(Thread.java:748)
23:53:31,042 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
23:53:31,042 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
23:53:31,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (5b085f6a3ad6cb824d085deabaee3baf) switched from state RUNNING to CANCELLING.
CheckpointBarrierAligner
:
- receive new checkpoint in
CheckpointBarrierHandler
but older checkpoints have not completed (see code). This is impossible for this case, ascheckpoint 382
is the first checkpoint to trigger. - process end of partition. This should not happen in this case, as for migration job,
IntegerTupleSource
would never ended by itself (see code). - buffered checkpoint size over the limit. This should not happen as default
task.checkpoint.alignment.max-size
is-1
, which means no limit.
Other cases in CheckpointBarrierAligner
would only process already created CheckpointBarrierHandler
not to create the first CheckpointBarrierHandler
.
StreamTask
- This should be the only place where to cause this case. If task received the checkpoint action but
isRunning
inStreamTask
is still false (see code), that would also create the firstCheckpointBarrierHandler
. Please take a look at above error logs, you would find a lot of"Initializing heap keyed state backend with stream factory"
before and after that checkpoint failure. Actually this log printed ininitializeState()
(see code) ofStreamTask
. Please pay attention again that the field 'isRunning' is still false when callinginitializeState()
, in other words, there exist possibility that theStreamTask
still hasisRunning
as false while received checkpoint action request.
In a nutshell, we should ignore the CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER
case to trigger savepoint again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER
does not signal a cancellation of a task (which is what I assumed), but that a checkpoint was being canceled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it represents the checkpont failed due to receiving a CancelCheckpointMarker
.
a7abc03
to
27b5913
Compare
27b5913
to
0f35675
Compare
What is the purpose of the change
Add retry logical for checkpoint failure reason on cancellation barrier.
Brief change log
AbstractOperatorRestoreTestBase
.Verifying this change
This change is already covered by existing tests, such as
AbstractOperatorRestoreTestBase
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation