[FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment#24680
[FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment#24680zentol merged 2 commits intoapache:masterfrom
Conversation
|
meh, stop-with-savepoint failures need to be able to transition back into executing without recreating the EG :/ |
Blocking edges aren't supported by the AdaptiveScheduler in the first place, so there's no point in testing what happens when a savepoint is triggered for a job with blocking edges. This wasn't caught earlier because the test wasn't very good to start with.
dmvk
left a comment
There was a problem hiding this comment.
LGTM 👍 Thanks for digging up the root cause!
I was initially bit worried about the testing part, but it feels that hardening testNotPossibleSlotAssignmentTransitionsToWaitingForResources should be enough, because smoke tests should cover that this didn't break anything.
...me/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| void testSavepointFailsWhenBlockingEdgeExists() throws Exception { |
There was a problem hiding this comment.
do you know why / when this was introduced? it indeed doesn't seem to be related AS; is this tested somewhere else?
There was a problem hiding this comment.
This was added in FLINK-34371 and is covered by tests in the DefaultScheduler:
d4e0084#diff-b4bc1cd606feb86850a18371520b5dd63d02090b8567fdf80730d1d7dd6e693d
| getGraph(new StateTrackingMockExecutionGraph())); | ||
| getGraph(executionGraph)); | ||
|
|
||
| assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING); |
There was a problem hiding this comment.
iiuc, the graph would be running before the change; makes sense
There was a problem hiding this comment.
the graph would be running before the change
yes
Transitioning the ExecutionGraph to running implicitly starts periodic checkpoint triggering. This should only occur after the all initialization steps have occurred.
The existing code could leak the CheckpointCoordinator if '#handleExecutionGraphCreation' fails to 'tryToAssignSlots'; this eventually leads to a checkpoint being triggered for old ExecutionGraph for which the operator holders were not initialized yet, which causes an exception that crashes the JM.
We now force this to happen in theExecutingconstructor, at which point all the initialization steps must already be complete, and we can be sure that the EG gets transitioned into a terminal state when an error occurs (implicitly disabling checkpointing again).We now do this only after slots have been assigned, right before the transition into Executing. At that point we either transition into executing (where we can be sure we'll transition into a terminal state eventually) or fail hard if an error occurs.