Skip to content

Commit

Permalink
[FLINK-28474][checkpoint] Fix the bug ChannelStateWriteResult might n…
Browse files Browse the repository at this point in the history
…ot fail after checkpoint abort
  • Loading branch information
1996fanrui committed Jul 9, 2022
1 parent 5a25b64 commit 9b618c5
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 1 deletion.
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.InputChannelStateHandle;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void fail(Throwable e) {
resultSubpartitionStateHandles.completeExceptionally(e);
}

boolean isDone() {
public boolean isDone() {
return inputChannelStateHandles.isDone() && resultSubpartitionStateHandles.isDone();
}
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.channel;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
Expand Down Expand Up @@ -223,6 +224,12 @@ public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
return result;
}

// just for test
@VisibleForTesting
public ChannelStateWriteResult getWriteResult(long checkpointId) {
return results.get(checkpointId);
}

public void open() {
executor.start();
}
Expand Down
Expand Up @@ -316,6 +316,10 @@ public void checkpointState(
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to
// checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
channelStateWriter.abort(
metadata.getCheckpointId(),
new CancellationException("checkpoint aborted via notification"),
true);
LOG.info(
"Checkpoint {} has been notified as aborted, would not trigger any checkpoint.",
metadata.getCheckpointId());
Expand Down
Expand Up @@ -81,11 +81,15 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -568,6 +572,53 @@ public void snapshotState(
}
}

@Test
public void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception {
MockEnvironment mockEnvironment = MockEnvironment.builder().build();
final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment);
String taskName = "test";
ChannelStateWriterImpl writer =
new ChannelStateWriterImpl(taskName, 0, getStreamFactoryFactory());
writer.open();
SubtaskCheckpointCoordinator coordinator =
new SubtaskCheckpointCoordinatorImpl(
new TestCheckpointStorageWorkerView(100),
taskName,
StreamTaskActionExecutor.IMMEDIATE,
newDirectExecutorService(),
new DummyEnvironment(),
(unused1, unused2) -> {},
(unused1, unused2) -> CompletableFuture.completedFuture(null),
128,
writer,
true,
(callable, duration) -> () -> {});
int checkpointId = 1;
coordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);

coordinator.initInputsCheckpoint(
checkpointId, unaligned(CheckpointType.CHECKPOINT, getDefault()));
ChannelStateWriter.ChannelStateWriteResult writeResult =
writer.getWriteResult(checkpointId);
assertNotNull(writeResult);
assertFalse(writeResult.isDone());
assertFalse(writeResult.getInputChannelStateHandles().isCompletedExceptionally());
assertFalse(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally());

coordinator.checkpointState(
new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetricsBuilder(),
operatorChain,
false,
() -> true);
assertNull(writer.getWriteResult(checkpointId));
TimeUnit.MILLISECONDS.sleep(10);
assertTrue(writeResult.isDone());
assertTrue(writeResult.getInputChannelStateHandles().isCompletedExceptionally());
assertTrue(writeResult.getResultSubpartitionStateHandles().isCompletedExceptionally());
}

private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception {
return new RegularOperatorChain<>(
new MockStreamTaskBuilder(mockEnvironment).build(), new NonRecordWriter<>());
Expand Down

0 comments on commit 9b618c5

Please sign in to comment.