From b46031bf9e679000b807357ff33ca2af116bc401 Mon Sep 17 00:00:00 2001 From: Yun Tang Date: Tue, 16 Jun 2020 19:49:58 +0800 Subject: [PATCH] [FLINK-18238][checkpoint] Emit CancelCheckpointMarker downstream on checkpointState in sync phase of checkpoint on task side --- .../SubtaskCheckpointCoordinatorImpl.java | 4 +- .../SubtaskCheckpointCoordinatorTest.java | 75 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index b2fe147fc1e27..0d6d638cfda0e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -235,9 +235,11 @@ public void checkpointState( return; } - // Step (0): Record the last triggered checkpointId. + // Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary. lastCheckpointId = metadata.getCheckpointId(); if (checkAndClearAbortedStatus(metadata.getCheckpointId())) { + // broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align. + operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId())); LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId()); return; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index f7655ea892eac..00ef9a16746d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; @@ -28,7 +31,11 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -39,12 +46,15 @@ import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; import org.apache.flink.streaming.util.MockStreamTaskBuilder; @@ -53,6 +63,8 @@ import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -218,6 +230,69 @@ public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception { assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize()); } + @Test + public void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception { + OneInputStreamTaskTestHarness testHarness = + new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + 1, + 1, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setupOutputForSingletonOperatorChain(); + StreamConfig streamConfig = testHarness.getStreamConfig(); + streamConfig.setStreamOperator(new MapOperator()); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .build(); + + TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096); + ArrayList recordOrEvents = new ArrayList<>(); + StreamElementSerializer stringStreamElementSerializer = new StreamElementSerializer<>(StringSerializer.INSTANCE); + ResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter<>( + recordOrEvents, bufferProvider, stringStreamElementSerializer); + mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter)); + + OneInputStreamTask task = testHarness.getTask(); + OperatorChain> operatorChain = new OperatorChain<>( + task, StreamTask.createRecordWriterDelegate(streamConfig, mockEnvironment)); + long checkpointId = 42L; + // notify checkpoint aborted before execution. + subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + subtaskCheckpointCoordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetrics(), + operatorChain, + () -> true); + + assertEquals(1, recordOrEvents.size()); + Object recordOrEvent = recordOrEvents.get(0); + // ensure CancelCheckpointMarker is broadcast downstream. + assertTrue(recordOrEvent instanceof CancelCheckpointMarker); + assertEquals(checkpointId, ((CancelCheckpointMarker) recordOrEvent).getCheckpointId()); + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + private static class MapOperator extends StreamMap { + private static final long serialVersionUID = 1L; + + public MapOperator() { + super((MapFunction) value -> value); + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + } + } + @Test public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception { MockEnvironment mockEnvironment = MockEnvironment.builder().build();