Skip to content

Commit

Permalink
[FLINK-22367][streaming] Reset syncSavepointId only if it is equal to…
Browse files Browse the repository at this point in the history
… checkpoint id from event
  • Loading branch information
akalash committed Sep 1, 2021
1 parent 39d834f commit 1f41e4f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,10 @@ private void resetSynchronousSavepointId(long id, boolean succeeded) {
activeSyncSavepointId = null;
operatorChain.setIgnoreEndOfInput(false);
}
syncSavepointId = null;

if (syncSavepointId != null && syncSavepointId <= id) {
syncSavepointId = null;
}
}

private void setSynchronousSavepointId(long checkpointId, boolean ignoreEndOfInput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DoneFuture;
Expand Down Expand Up @@ -1250,6 +1251,42 @@ public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
assertEquals(1, harness.getCheckpointResponder().getDeclineReports().size());
}

@Test
public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint() throws Throwable {
// given: Marker that the task is finished.
AtomicBoolean finishTask = new AtomicBoolean();
StreamTaskMailboxTestHarnessBuilder<Integer> builder =
new StreamTaskMailboxTestHarnessBuilder<>(
(env) ->
new OneInputStreamTask<Integer, Integer>(env) {
@Override
protected void finishTask() throws Exception {
super.finishTask();
finishTask.set(true);
}
},
BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
StreamTaskMailboxTestHarness<Integer> harness =
builder.setupOutputForSingletonOperatorChain(
new TestBoundedOneInputStreamOperator())
.build();

// when: Receiving the abort notification of the previous checkpoint before the complete
// notification of the savepoint terminate.
harness.streamTask.notifyCheckpointAbortAsync(1);
harness.streamTask.notifyCheckpointCompleteAsync(2);
harness.streamTask.triggerCheckpointOnBarrier(
new CheckpointMetaData(2, 0),
new CheckpointOptions(
CheckpointType.SAVEPOINT_TERMINATE,
CheckpointStorageLocationReference.getDefault()),
new CheckpointMetricsBuilder());

// then: The task should be finished.
assertEquals(true, finishTask.get());
}

@Test
public void testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws Exception {
OneShotLatch latch = new OneShotLatch();
Expand Down

0 comments on commit 1f41e4f

Please sign in to comment.