-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-32514] Support configuring checkpointing interval during process backlog #22931
Conversation
57bf840
to
581e548
Compare
@flinkbot run azure |
@lindong28 could you please take a look at this PR? |
581e548
to
887666a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments below.
...src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
Outdated
Show resolved
Hide resolved
...treaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
Outdated
Show resolved
Hide resolved
bc7f391
to
4fe5d1b
Compare
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
4fe5d1b
to
5fc225f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left some comments below.
...time/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
Outdated
Show resolved
Hide resolved
17f9129
to
1039b03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left some comments below.
...rc/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
Show resolved
Hide resolved
...ming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
c521c4e
to
f0b9322
Compare
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
f0b9322
to
2b10cd3
Compare
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Show resolved
Hide resolved
2dde6a8
to
3a28ae2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left some comments below.
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
Outdated
Show resolved
Hide resolved
0725f05
to
10e471b
Compare
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left some comments below.
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexEndOfDataListener.java
Outdated
Show resolved
Hide resolved
18a7adc
to
d6c7573
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left some comments below.
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/EndOfDataEvent.java
Outdated
Show resolved
Hide resolved
...-core/src/test/java/org/apache/flink/util/concurrent/ManuallyTriggeredScheduledExecutor.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
Outdated
Show resolved
Hide resolved
...-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
Outdated
Show resolved
Hide resolved
...ests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceBlockableByCheckpoint.java
Outdated
Show resolved
Hide resolved
91c04e6
to
5da3732
Compare
...ests/src/test/java/org/apache/flink/test/util/NumberSequenceSourceBlockableByCheckpoint.java
Outdated
Show resolved
Hide resolved
5da3732
to
9c39634
Compare
9c39634
to
e1d134f
Compare
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
Outdated
Show resolved
Hide resolved
…g interval during process backlog
e1d134f
to
b4ab880
Compare
Thanks for the update. LGTM. |
|
||
context.lazyInitialize(globalFailureHandler, mainThreadExecutor); | ||
context.lazyInitialize(globalFailureHandler, mainThreadExecutor, checkpointCoordinator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lindong28 @yunfengzhou-hub Is initializing the OperatorCoordinatorHolder
twice done deliberately here? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XComp Thanks for catching this. No, we didn't do it intentionally. I believe it is introduced when Yunfeng rebased the PR. I didn't catch this issue because I didn't go over the entire PR end-to-end very carefully when there is only minor remaining comments in the last 2 rounds of review.
It seems that the extra invocation of lazyInitialize()
would not introduce any visible performance or correctness issue. Maybe one of us can fix it in our next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarification. Doing it as a hotfix commit in one other PR makes sense because of the reasons you mentioned. 👍
What is the purpose of the change
This pull request adds support that a Flink job can change its checkpointing interval depending on the process backlog status of the job.
Brief change log
execution.checkpointing.interval-during-backlog
SplitEnumeratorContext#setIsProcessingBacklog
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation