From baa05e145694faba806c3f1dfdc7160061ae8e45 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Fri, 29 Jan 2021 16:46:15 +0100 Subject: [PATCH 1/5] [FLINK-21132][runtime] Don't end input on stop with savepoint EndOfInput was used to handle any stopping of the job. When stopping with savepoint the input is not actually ended. This causes issues with some sinks (e.g. Iceberg). With this change, endInput is not call for stop-with-savepoint. To differentiate stop-with-savepoint from other cases only checkpoint (RPC/barriers) are considered and not network EOP. That's enough because EOP is only injected after the CP completion (i.e. when the downstream is also notified by sync savepoint by CP barriers). --- .../runtime/tasks/OperatorChain.java | 10 +- .../runtime/tasks/SourceStreamTask.java | 9 ++ .../runtime/tasks/StreamOperatorWrapper.java | 11 ++- .../streaming/runtime/tasks/StreamTask.java | 25 ++++- .../tasks/StreamOperatorWrapperTest.java | 4 +- .../runtime/tasks/StreamTaskTest.java | 97 ++++++++++++++++++- .../TestBoundedOneInputStreamOperator.java | 11 +++ 7 files changed, 150 insertions(+), 17 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 68514e8f16727..8c59f7542d8eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -120,6 +120,8 @@ public class OperatorChain> private final OperatorEventDispatcherImpl operatorEventDispatcher; + private boolean isStoppingBySyncSavepoint; + /** * Current status of the input stream of the operator chain. Watermarks explicitly generated by * operators in the chain (i.e. timestamp assigner / watermark extractors), will be blocked and @@ -408,7 +410,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { */ @Override public void endInput(int inputId) throws Exception { - if (mainOperatorWrapper != null) { + if (mainOperatorWrapper != null && !isStoppingBySyncSavepoint) { mainOperatorWrapper.endOperatorInput(inputId); } } @@ -434,7 +436,7 @@ protected void initializeStateAndOpenOperators( */ protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception { if (firstOperatorWrapper != null) { - firstOperatorWrapper.close(actionExecutor); + firstOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint); } } @@ -745,6 +747,10 @@ StreamOperator getTailOperator() { return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator(); } + public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) { + this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint; + } + /** Wrapper class to access the chained sources and their's outputs. */ public static class ChainedSource { private final WatermarkGaugeExposingOutput> chainedSourceOutput; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 2cb6902ebecd5..b709299d789d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -172,7 +172,16 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E new CancelTaskException(sourceThreadThrowable)); } else if (!isFinished && sourceThreadThrowable != null) { mailboxProcessor.reportThrowable(sourceThreadThrowable); + } else if (sourceThreadThrowable != null + || isCanceled() + || isFinished) { + mailboxProcessor.allActionsCompleted(); } else { + // this is a "true" end of input regardless of whether + // stop-with-savepoint was issued or not + synchronized (lock) { + operatorChain.setIsStoppingBySyncSavepoint(false); + } mailboxProcessor.allActionsCompleted(); } }); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java index 8c15174a3fcd8..401bc89ade221 100755 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java @@ -39,8 +39,8 @@ * This class handles the close, endInput and other related logic of a {@link StreamOperator}. It * also automatically propagates the close operation to the next wrapper that the {@link #next} * points to, so we can use {@link #next} to link all operator wrappers in the operator chain and - * close all operators only by calling the {@link #close(StreamTaskActionExecutor)} method of the - * header operator wrapper. + * close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean)} method + * of the header operator wrapper. */ @Internal public class StreamOperatorWrapper> { @@ -120,8 +120,9 @@ void setNext(StreamOperatorWrapper next) { * MailboxExecutor#yield()} to take the mails of closing operator and running timers and run * them. */ - public void close(StreamTaskActionExecutor actionExecutor) throws Exception { - if (!isHead) { + public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint) + throws Exception { + if (!isHead && !isStoppingBySyncSavepoint) { // NOTE: This only do for the case where the operator is one-input operator. At present, // any non-head operator on the operator chain is one-input operator. actionExecutor.runThrowing(() -> endOperatorInput(1)); @@ -131,7 +132,7 @@ public void close(StreamTaskActionExecutor actionExecutor) throws Exception { // propagate the close operation to the next wrapper if (next != null) { - next.close(actionExecutor); + next.close(actionExecutor, isStoppingBySyncSavepoint); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8f3eefc81a83d..7252cad42afbb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -235,6 +235,7 @@ public abstract class StreamTask> extends Ab private final ExecutorService channelIOExecutor; private Long syncSavepointId = null; + private Long activeSyncSavepointId = null; private long latestAsyncCheckpointStartDelayNanos; @@ -427,7 +428,12 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E new ResumeWrapper(controller.suspendDefaultAction(timer), timer))); } - private void resetSynchronousSavepointId() { + private void resetSynchronousSavepointId(long id, boolean succeeded) { + if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) { + // allow to process further EndOfPartition events + activeSyncSavepointId = null; + operatorChain.setIsStoppingBySyncSavepoint(false); + } syncSavepointId = null; } @@ -436,6 +442,8 @@ private void setSynchronousSavepointId(long checkpointId) { syncSavepointId == null, "at most one stop-with-savepoint checkpoint at a time is allowed"); syncSavepointId = checkpointId; + activeSyncSavepointId = checkpointId; + operatorChain.setIsStoppingBySyncSavepoint(true); } @VisibleForTesting @@ -988,6 +996,7 @@ public void triggerCheckpointOnBarrier( @Override public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) throws IOException { + resetSynchronousSavepointId(checkpointId, false); subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain); } @@ -1013,6 +1022,10 @@ private boolean performCheckpoint( if (advanceToEndOfTime) { advanceToEndOfEventTime(); } + } else if (activeSyncSavepointId != null + && activeSyncSavepointId < checkpointMetaData.getCheckpointId()) { + activeSyncSavepointId = null; + operatorChain.setIsStoppingBySyncSavepoint(false); } subtaskCheckpointCoordinator.checkpointState( @@ -1066,9 +1079,11 @@ public Future notifyCheckpointCompleteAsync(long checkpointId) { @Override public Future notifyCheckpointAbortAsync(long checkpointId) { return notifyCheckpointOperation( - () -> - subtaskCheckpointCoordinator.notifyCheckpointAborted( - checkpointId, operatorChain, this::isRunning), + () -> { + resetSynchronousSavepointId(checkpointId, false); + subtaskCheckpointCoordinator.notifyCheckpointAborted( + checkpointId, operatorChain, this::isRunning); + }, String.format("checkpoint %d aborted", checkpointId)); } @@ -1097,7 +1112,7 @@ private void notifyCheckpointComplete(long checkpointId) throws Exception { if (isRunning && isSynchronousSavepointId(checkpointId)) { finishTask(); // Reset to "notify" the internal synchronous savepoint mailbox loop. - resetSynchronousSavepointId(); + resetSynchronousSavepointId(checkpointId, true); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java index 2919470d2e5a7..640efe0f7fa22 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java @@ -133,7 +133,7 @@ public void teardown() throws Exception { @Test public void testClose() throws Exception { output.clear(); - operatorWrappers.get(0).close(containingTask.getActionExecutor()); + operatorWrappers.get(0).close(containingTask.getActionExecutor(), false); List expected = new ArrayList<>(); for (int i = 0; i < operatorWrappers.size(); i++) { @@ -172,7 +172,7 @@ public void close() throws Exception { true); try { - operatorWrapper.close(containingTask.getActionExecutor()); + operatorWrapper.close(containingTask.getActionExecutor(), false); fail("should throw an exception"); } catch (Throwable t) { Optional optional = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 53d91b6827a79..75bdd20a81499 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -115,6 +115,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.SupplierWithException; @@ -151,7 +152,12 @@ import java.util.function.Consumer; import static java.util.Arrays.asList; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE; +import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; +import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; +import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY; import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.Matchers.instanceOf; @@ -181,13 +187,98 @@ public class StreamTaskTest extends TestLogger { @Rule public final Timeout timeoutPerTest = Timeout.seconds(30); + @Test + public void testSyncSavepointCompleted() throws Exception { + testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false); + } + + @Test + public void testSyncSavepointAborted() throws Exception { + testSyncSavepointWithEndInput( + (task, id) -> + task.abortCheckpointOnBarrier( + id, + new CheckpointException( + UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)), + true); + } + + @Test + public void testSyncSavepointAbortedAsync() throws Exception { + testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true); + } + + /** + * Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios: + * + *
    + *
  1. Thread1: notify sync savepoint + *
  2. Thread2: endInput + *
  3. Thread1: confirm/abort/abortAsync + *
  4. assert inputEnded: confirmed - no, abort/abortAsync - yes + *
+ */ + private void testSyncSavepointWithEndInput( + BiConsumerWithException, Long, IOException> savepointResult, + boolean expectEndInput) + throws Exception { + StreamTaskMailboxTestHarness harness = + new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO) + .addInput(STRING_TYPE_INFO) + .setupOutputForSingletonOperatorChain( + new TestBoundedOneInputStreamOperator()) + .build(); + + final long checkpointId = 1L; + CountDownLatch savepointTriggeredLatch = new CountDownLatch(1); + CountDownLatch inputEndedLatch = new CountDownLatch(1); + + MailboxExecutor executor = + harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY); + executor.execute( + () -> { + try { + harness.streamTask.triggerCheckpointOnBarrier( + new CheckpointMetaData(checkpointId, checkpointId), + new CheckpointOptions(SYNC_SAVEPOINT, getDefault()), + new CheckpointMetricsBuilder()); + } catch (IOException e) { + fail(e.getMessage()); + } + }, + "triggerCheckpointOnBarrier"); + new Thread( + () -> { + try { + savepointTriggeredLatch.await(); + harness.endInput(); + inputEndedLatch.countDown(); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + }) + .start(); + // this mails should be executed from the one above (from triggerCheckpointOnBarrier) + executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch"); + executor.execute( + () -> { + inputEndedLatch.await(); + savepointResult.accept(harness.streamTask, checkpointId); + }, + "savepointResult"); + + while (harness.streamTask.isMailboxLoopRunning()) { + harness.streamTask.runMailboxStep(); + } + + Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded()); + } + @Test public void testCleanUpExceptionSuppressing() throws Exception { OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness<>( - OneInputStreamTask::new, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO); + OneInputStreamTask::new, STRING_TYPE_INFO, STRING_TYPE_INFO); testHarness.setupOutputForSingletonOperatorChain(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java index 35d926615b868..b5f523d746cf2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestBoundedOneInputStreamOperator.java @@ -30,9 +30,15 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator element) { @Override public void endInput() { + inputEnded = true; output("[" + name + "]: End of input"); } @@ -59,4 +66,8 @@ public void close() throws Exception { private void output(String record) { output.collect(new StreamRecord<>(record)); } + + public static boolean isInputEnded() { + return inputEnded; + } } From 06e05bb9401c827b5df4caab0a727eb4e92690bd Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sat, 23 Jan 2021 23:10:22 +0800 Subject: [PATCH 2/5] [FLINK-21132][runtime][tests] Stop with savepoint shouldn't end input --- .../test/checkpointing/SavepointITCase.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 7637b070fc636..c818c8fa5c2de 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; @@ -52,6 +53,11 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestUtils; import org.apache.flink.testutils.EntropyInjectingTestFileSystem; @@ -364,6 +370,85 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception { } } + static class BoundedPassThroughOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + static volatile CountDownLatch progressLatch; + static volatile boolean inputEnded; + + private transient boolean processed; + + BoundedPassThroughOperator() { + chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void endInput() throws Exception { + inputEnded = true; + } + + @Override + public void processElement(StreamRecord element) throws Exception { + output.collect(element); + if (!processed) { + processed = true; + progressLatch.countDown(); + } + Thread.sleep(1000); + } + + // -------------------------------------------------------------------- + + static CountDownLatch getProgressLatch() { + return progressLatch; + } + + static void resetForTest(int parallelism) { + progressLatch = new CountDownLatch(parallelism); + inputEnded = false; + } + } + + @Test + public void testStopSavepointWithBoundedInput() throws Exception { + final int numTaskManagers = 2; + final int numSlotsPerTaskManager = 2; + + final MiniClusterResourceFactory clusterFactory = + new MiniClusterResourceFactory( + numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + BoundedPassThroughOperator operator = new BoundedPassThroughOperator<>(); + DataStream stream = + env.addSource(new InfiniteTestSource()) + .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator); + + stream.addSink(new DiscardingSink<>()); + + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + MiniClusterWithClientResource cluster = clusterFactory.get(); + cluster.before(); + ClusterClient client = cluster.getClusterClient(); + + try { + BoundedPassThroughOperator.resetForTest(1); + + client.submitJob(jobGraph).get(); + + BoundedPassThroughOperator.getProgressLatch().await(); + + client.stopWithSavepoint(jobId, false, null).get(); + + Assert.assertFalse(BoundedPassThroughOperator.inputEnded); + } finally { + cluster.after(); + } + } + @Test public void testSubmitWithUnknownSavepointPath() throws Exception { // Config From bb89788c91e0a3dfbe91b2675b7309c697bfe053 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Fri, 29 Jan 2021 16:51:53 +0100 Subject: [PATCH 3/5] [FLINK-21132][runtime][tests] Parameterize StopWithSavepoint test with chaining strategy --- .../test/checkpointing/SavepointITCase.java | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index c818c8fa5c2de..9a988380d1b3d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -377,8 +377,8 @@ static class BoundedPassThroughOperator extends AbstractStreamOperator private transient boolean processed; - BoundedPassThroughOperator() { - chainingStrategy = ChainingStrategy.ALWAYS; + BoundedPassThroughOperator(ChainingStrategy chainingStrategy) { + this.chainingStrategy = chainingStrategy; } @Override @@ -393,7 +393,6 @@ public void processElement(StreamRecord element) throws Exception { processed = true; progressLatch.countDown(); } - Thread.sleep(1000); } // -------------------------------------------------------------------- @@ -413,39 +412,46 @@ public void testStopSavepointWithBoundedInput() throws Exception { final int numTaskManagers = 2; final int numSlotsPerTaskManager = 2; - final MiniClusterResourceFactory clusterFactory = - new MiniClusterResourceFactory( - numTaskManagers, numSlotsPerTaskManager, getFileBasedCheckpointsConfig()); + for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) { + final MiniClusterResourceFactory clusterFactory = + new MiniClusterResourceFactory( + numTaskManagers, + numSlotsPerTaskManager, + getFileBasedCheckpointsConfig()); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); - BoundedPassThroughOperator operator = new BoundedPassThroughOperator<>(); - DataStream stream = - env.addSource(new InfiniteTestSource()) - .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator); + BoundedPassThroughOperator operator = + new BoundedPassThroughOperator<>(chainingStrategy); + DataStream stream = + env.addSource(new InfiniteTestSource()) + .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator); - stream.addSink(new DiscardingSink<>()); + stream.addSink(new DiscardingSink<>()); - final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - final JobID jobId = jobGraph.getJobID(); + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobId = jobGraph.getJobID(); - MiniClusterWithClientResource cluster = clusterFactory.get(); - cluster.before(); - ClusterClient client = cluster.getClusterClient(); + MiniClusterWithClientResource cluster = clusterFactory.get(); + cluster.before(); + ClusterClient client = cluster.getClusterClient(); - try { - BoundedPassThroughOperator.resetForTest(1); + try { + BoundedPassThroughOperator.resetForTest(1); - client.submitJob(jobGraph).get(); + client.submitJob(jobGraph).get(); - BoundedPassThroughOperator.getProgressLatch().await(); + BoundedPassThroughOperator.getProgressLatch().await(); - client.stopWithSavepoint(jobId, false, null).get(); + client.stopWithSavepoint(jobId, false, null).get(); - Assert.assertFalse(BoundedPassThroughOperator.inputEnded); - } finally { - cluster.after(); + Assert.assertFalse( + "input ended with chainingStrategy " + chainingStrategy, + BoundedPassThroughOperator.inputEnded); + } finally { + cluster.after(); + } } } From 07d846507f2abb2892ae311ca115754d2a8bd2a8 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 2 Feb 2021 19:16:53 +0100 Subject: [PATCH 4/5] [FLINK-21132][runtime][tests] Test StopWith Savepoint against concurrent EndOfInput --- .../test/checkpointing/SavepointITCase.java | 147 +++++++++++++++++- 1 file changed, 141 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 9a988380d1b3d..6215dd3fa1a5d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -36,12 +36,14 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -86,16 +88,21 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.concurrent.CompletableFuture.allOf; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN; import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -373,6 +380,8 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception { static class BoundedPassThroughOperator extends AbstractStreamOperator implements OneInputStreamOperator, BoundedOneInput { static volatile CountDownLatch progressLatch; + static volatile CountDownLatch snapshotAllowedLatch; + static volatile CountDownLatch snapshotStartedLatch; static volatile boolean inputEnded; private transient boolean processed; @@ -381,6 +390,14 @@ static class BoundedPassThroughOperator extends AbstractStreamOperator this.chainingStrategy = chainingStrategy; } + private static void allowSnapshots() { + snapshotAllowedLatch.countDown(); + } + + public static void awaitSnapshotStarted() throws InterruptedException { + snapshotStartedLatch.await(); + } + @Override public void endInput() throws Exception { inputEnded = true; @@ -395,18 +412,94 @@ public void processElement(StreamRecord element) throws Exception { } } + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + snapshotStartedLatch.countDown(); + snapshotAllowedLatch.await(); + super.snapshotState(context); + } + // -------------------------------------------------------------------- static CountDownLatch getProgressLatch() { return progressLatch; } - static void resetForTest(int parallelism) { + static void resetForTest(int parallelism, boolean allowSnapshots) { progressLatch = new CountDownLatch(parallelism); + snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism); + snapshotStartedLatch = new CountDownLatch(parallelism); inputEnded = false; } } + @Test + public void testStopSavepointWithBoundedInputConcurrently() throws Exception { + final int numTaskManagers = 2; + final int numSlotsPerTaskManager = 2; + + while (true) { + + final MiniClusterResourceFactory clusterFactory = + new MiniClusterResourceFactory( + numTaskManagers, + numSlotsPerTaskManager, + getFileBasedCheckpointsConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.setParallelism(1); + + // It's only possible to test this with chaining. Without it, JM fails the job before + // the downstream gets the abort notification + BoundedPassThroughOperator operator = + new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS); + InfiniteTestSource source = new InfiniteTestSource(); + DataStream stream = + env.addSource(source) + .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator); + + stream.addSink(new DiscardingSink<>()); + + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + MiniClusterWithClientResource cluster = clusterFactory.get(); + cluster.before(); + ClusterClient client = cluster.getClusterClient(); + + try { + BoundedPassThroughOperator.resetForTest(1, false); + InfiniteTestSource.resetForTest(); + + client.submitJob(jobGraph).get(); + + BoundedPassThroughOperator.getProgressLatch().await(); + InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait + CompletableFuture stop = client.stopWithSavepoint(jobId, false, null); + BoundedPassThroughOperator.awaitSnapshotStarted(); + InfiniteTestSource.cancelAllAndAwait(); // emulate end of input + BoundedPassThroughOperator.allowSnapshots(); + stop.get(); + Assert.assertTrue("input NOT ended ", BoundedPassThroughOperator.inputEnded); + return; + } catch (Exception e) { + // if sources and the whole job ends before the checkpoint completes + // then coordinator will shut down and savepoint will be aborted - retry + if (!ischeckpointcoordinatorshutdownError(e)) { + throw e; + } + } finally { + cluster.after(); + } + } + } + + private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) { + return ExceptionUtils.findThrowable(throwable, CheckpointException.class) + .filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN) + .isPresent(); + } + @Test public void testStopSavepointWithBoundedInput() throws Exception { final int numTaskManagers = 2; @@ -438,7 +531,7 @@ public void testStopSavepointWithBoundedInput() throws Exception { ClusterClient client = cluster.getClusterClient(); try { - BoundedPassThroughOperator.resetForTest(1); + BoundedPassThroughOperator.resetForTest(1, true); client.submitJob(jobGraph).get(); @@ -662,14 +755,31 @@ private static class InfiniteTestSource implements SourceFunction { private static final long serialVersionUID = 1L; private volatile boolean running = true; + private volatile boolean suspended = false; + private static final Collection createdSources = + new CopyOnWriteArrayList<>(); + private transient volatile CompletableFuture completeFuture; + + public InfiniteTestSource() { + createdSources.add(this); + } @Override public void run(SourceContext ctx) throws Exception { - while (running) { - synchronized (ctx.getCheckpointLock()) { - ctx.collect(1); + completeFuture = new CompletableFuture<>(); + createdSources.add(this); + try { + while (running) { + if (!suspended) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(1); + } + } + Thread.sleep(1); } - Thread.sleep(1); + completeFuture.complete(null); + } catch (Exception e) { + completeFuture.completeExceptionally(e); } } @@ -677,6 +787,31 @@ public void run(SourceContext ctx) throws Exception { public void cancel() { running = false; } + + public void suspend() { + suspended = true; + } + + public static void resetForTest() { + createdSources.clear(); + } + + public CompletableFuture getCompleteFuture() { + return completeFuture; + } + + public static void cancelAllAndAwait() throws ExecutionException, InterruptedException { + createdSources.forEach(InfiniteTestSource::cancel); + allOf( + createdSources.stream() + .map(InfiniteTestSource::getCompleteFuture) + .toArray(CompletableFuture[]::new)) + .get(); + } + + public static void suspendAll() { + createdSources.forEach(InfiniteTestSource::suspend); + } } private static class StatefulCounter extends RichMapFunction From 0527a31f45f1cc18d54ca9e250aa2af38227e1ff Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 2 Feb 2021 19:34:39 +0100 Subject: [PATCH 5/5] [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally --- .../flink/streaming/runtime/tasks/SourceStreamTask.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index b709299d789d8..4ebbd0374b97a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -66,7 +66,7 @@ public class SourceStreamTask< * Indicates whether this Task was purposefully finished (by finishTask()), in this case we want * to ignore exceptions thrown after finishing, to ensure shutdown works smoothly. */ - private volatile boolean isFinished = false; + private volatile boolean wasStoppedExternally = false; public SourceStreamTask(Environment env) throws Exception { this(env, new Object()); @@ -170,11 +170,11 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E .isPresent()) { mailboxProcessor.reportThrowable( new CancelTaskException(sourceThreadThrowable)); - } else if (!isFinished && sourceThreadThrowable != null) { + } else if (!wasStoppedExternally && sourceThreadThrowable != null) { mailboxProcessor.reportThrowable(sourceThreadThrowable); } else if (sourceThreadThrowable != null || isCanceled() - || isFinished) { + || wasStoppedExternally) { mailboxProcessor.allActionsCompleted(); } else { // this is a "true" end of input regardless of whether @@ -205,7 +205,7 @@ protected void cancelTask() { @Override protected void finishTask() throws Exception { - isFinished = true; + wasStoppedExternally = true; cancelTask(); }