Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,19 @@ private void progressToStabilized(Temporal firstChangeEventTimestamp) {

private void triggerTransitionToSubsequentState() {
progressToPhase(new Transitioning(clock, this));
transitionContext.transitionToSubsequentState();
try {
transitionContext.transitionToSubsequentState();
} catch (Throwable t) {
LOG.warn(
"Failed to transition to subsequent state for job {}. "
+ "Resetting to Idling phase to allow future transition attempts.",
getJobId(),
t);
// Reset phase directly (bypassing progressToPhase guard) so the manager
// can respond to future resource changes and retry the transition.
phase = new Idling(clock, this);
throw t;
}
}

private void progressToPhase(Phase newPhase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,42 @@ void testRevokedChangeInStabilizedPhase() {
assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);
}

@Test
void testManagerResetsToIdlingWhenTransitionToSubsequentStateFails() {
final TestingStateTransitionManagerContext ctx =
TestingStateTransitionManagerContext.stableContext().withDesiredResources();
// Make transitionToSubsequentState() throw to simulate e.g. OOM during
// ExecutionGraph creation (FLINK-38997).
ctx.failOnTransition(new RuntimeException("Simulated failure during state transition"));
final DefaultStateTransitionManager testInstance =
ctx.createTestInstanceThatPassedCooldownPhase();

assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class);

// Trigger a change event to move to Stabilizing
testInstance.onChange();
assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class);

// onTrigger would normally transition to Transitioning, but the context throws
try {
testInstance.onTrigger();
} catch (RuntimeException expected) {
// expected: the exception from transitionToSubsequentState propagates
}

// The manager should have recovered to Idling, NOT stuck in Transitioning
assertThat(testInstance.getPhase()).isInstanceOf(Idling.class);

// Now stop failing and verify the manager can still process events
ctx.stopFailingOnTransition();
ctx.clearStateTransition();
ctx.withDesiredResources();
testInstance.onChange();
assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class);
testInstance.onTrigger();
assertFinalStateTransitionHappened(ctx, testInstance);
}

@Test
void testScheduledTaskBeingIgnoredAfterStateChanged() {
final TestingStateTransitionManagerContext ctx =
Expand Down Expand Up @@ -459,6 +495,7 @@ private static class TestingStateTransitionManagerContext

// internal state used for assertions
private final AtomicBoolean transitionTriggered = new AtomicBoolean();
private RuntimeException transitionFailure = null;
private final SortedMap<Instant, List<ScheduledTask<Object>>> scheduledTasks =
new TreeMap<>();

Expand Down Expand Up @@ -507,6 +544,14 @@ public TestingStateTransitionManagerContext withSufficientResources() {
return this;
}

public void failOnTransition(RuntimeException failure) {
this.transitionFailure = failure;
}

public void stopFailingOnTransition() {
this.transitionFailure = null;
}

// ///////////////////////////////////////////////
// StateTransitionManager.Context interface methods
// ///////////////////////////////////////////////
Expand All @@ -524,6 +569,9 @@ public boolean hasDesiredResources() {
@Override
public void transitionToSubsequentState() {
transitionTriggered.set(true);
if (transitionFailure != null) {
throw transitionFailure;
}
}

@Override
Expand Down