Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21132][runtime] Don't end input on stop with savepoint #14815

Merged
merged 5 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>

private final OperatorEventDispatcherImpl operatorEventDispatcher;

private boolean isStoppingBySyncSavepoint;

/**
* Current status of the input stream of the operator chain. Watermarks explicitly generated by
* operators in the chain (i.e. timestamp assigner / watermark extractors), will be blocked and
Expand Down Expand Up @@ -408,7 +410,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
*/
@Override
public void endInput(int inputId) throws Exception {
if (mainOperatorWrapper != null) {
if (mainOperatorWrapper != null && !isStoppingBySyncSavepoint) {
mainOperatorWrapper.endOperatorInput(inputId);
}
}
Expand All @@ -434,7 +436,7 @@ protected void initializeStateAndOpenOperators(
*/
protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
if (firstOperatorWrapper != null) {
firstOperatorWrapper.close(actionExecutor);
firstOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint);
}
}

Expand Down Expand Up @@ -745,6 +747,10 @@ StreamOperator<?> getTailOperator() {
return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
}

public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) {
this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint;
}

/** Wrapper class to access the chained sources and their's outputs. */
public static class ChainedSource {
private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class SourceStreamTask<
* Indicates whether this Task was purposefully finished (by finishTask()), in this case we want
* to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
*/
private volatile boolean isFinished = false;
private volatile boolean wasStoppedExternally = false;

public SourceStreamTask(Environment env) throws Exception {
this(env, new Object());
Expand Down Expand Up @@ -170,9 +170,18 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
.isPresent()) {
mailboxProcessor.reportThrowable(
new CancelTaskException(sourceThreadThrowable));
} else if (!isFinished && sourceThreadThrowable != null) {
} else if (!wasStoppedExternally && sourceThreadThrowable != null) {
mailboxProcessor.reportThrowable(sourceThreadThrowable);
} else if (sourceThreadThrowable != null
|| isCanceled()
|| wasStoppedExternally) {
mailboxProcessor.allActionsCompleted();
} else {
// this is a "true" end of input regardless of whether
// stop-with-savepoint was issued or not
synchronized (lock) {
operatorChain.setIsStoppingBySyncSavepoint(false);
}
mailboxProcessor.allActionsCompleted();
}
});
Expand All @@ -196,7 +205,7 @@ protected void cancelTask() {

@Override
protected void finishTask() throws Exception {
isFinished = true;
wasStoppedExternally = true;
cancelTask();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* This class handles the close, endInput and other related logic of a {@link StreamOperator}. It
* also automatically propagates the close operation to the next wrapper that the {@link #next}
* points to, so we can use {@link #next} to link all operator wrappers in the operator chain and
* close all operators only by calling the {@link #close(StreamTaskActionExecutor)} method of the
* header operator wrapper.
* close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean)} method
* of the header operator wrapper.
*/
@Internal
public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
Expand Down Expand Up @@ -120,8 +120,9 @@ void setNext(StreamOperatorWrapper next) {
* MailboxExecutor#yield()} to take the mails of closing operator and running timers and run
* them.
*/
public void close(StreamTaskActionExecutor actionExecutor) throws Exception {
if (!isHead) {
public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
throws Exception {
if (!isHead && !isStoppingBySyncSavepoint) {
// NOTE: This only do for the case where the operator is one-input operator. At present,
// any non-head operator on the operator chain is one-input operator.
actionExecutor.runThrowing(() -> endOperatorInput(1));
Expand All @@ -131,7 +132,7 @@ public void close(StreamTaskActionExecutor actionExecutor) throws Exception {

// propagate the close operation to the next wrapper
if (next != null) {
next.close(actionExecutor);
next.close(actionExecutor, isStoppingBySyncSavepoint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
private final ExecutorService channelIOExecutor;

private Long syncSavepointId = null;
private Long activeSyncSavepointId = null;

private long latestAsyncCheckpointStartDelayNanos;

Expand Down Expand Up @@ -427,7 +428,12 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
}

private void resetSynchronousSavepointId() {
private void resetSynchronousSavepointId(long id, boolean succeeded) {
if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) {
// allow to process further EndOfPartition events
activeSyncSavepointId = null;
operatorChain.setIsStoppingBySyncSavepoint(false);
}
syncSavepointId = null;
}

Expand All @@ -436,6 +442,8 @@ private void setSynchronousSavepointId(long checkpointId) {
syncSavepointId == null,
"at most one stop-with-savepoint checkpoint at a time is allowed");
syncSavepointId = checkpointId;
activeSyncSavepointId = checkpointId;
operatorChain.setIsStoppingBySyncSavepoint(true);
}

@VisibleForTesting
Expand Down Expand Up @@ -988,6 +996,7 @@ public void triggerCheckpointOnBarrier(
@Override
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause)
throws IOException {
resetSynchronousSavepointId(checkpointId, false);
subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
}

Expand All @@ -1013,6 +1022,10 @@ private boolean performCheckpoint(
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
} else if (activeSyncSavepointId != null
&& activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {
activeSyncSavepointId = null;
operatorChain.setIsStoppingBySyncSavepoint(false);
}

subtaskCheckpointCoordinator.checkpointState(
Expand Down Expand Up @@ -1066,9 +1079,11 @@ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
@Override
public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
return notifyCheckpointOperation(
() ->
subtaskCheckpointCoordinator.notifyCheckpointAborted(
checkpointId, operatorChain, this::isRunning),
() -> {
resetSynchronousSavepointId(checkpointId, false);
subtaskCheckpointCoordinator.notifyCheckpointAborted(
checkpointId, operatorChain, this::isRunning);
},
String.format("checkpoint %d aborted", checkpointId));
}

Expand Down Expand Up @@ -1097,7 +1112,7 @@ private void notifyCheckpointComplete(long checkpointId) throws Exception {
if (isRunning && isSynchronousSavepointId(checkpointId)) {
finishTask();
// Reset to "notify" the internal synchronous savepoint mailbox loop.
resetSynchronousSavepointId();
resetSynchronousSavepointId(checkpointId, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void teardown() throws Exception {
@Test
public void testClose() throws Exception {
output.clear();
operatorWrappers.get(0).close(containingTask.getActionExecutor());
operatorWrappers.get(0).close(containingTask.getActionExecutor(), false);

List<Object> expected = new ArrayList<>();
for (int i = 0; i < operatorWrappers.size(); i++) {
Expand Down Expand Up @@ -172,7 +172,7 @@ public void close() throws Exception {
true);

try {
operatorWrapper.close(containingTask.getActionExecutor());
operatorWrapper.close(containingTask.getActionExecutor(), false);
fail("should throw an exception");
} catch (Throwable t) {
Optional<Throwable> optional =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;

Expand Down Expand Up @@ -151,7 +152,12 @@
import java.util.function.Consumer;

import static java.util.Arrays.asList;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE;
import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -181,13 +187,98 @@ public class StreamTaskTest extends TestLogger {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(30);

@Test
public void testSyncSavepointCompleted() throws Exception {
testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
}

@Test
public void testSyncSavepointAborted() throws Exception {
testSyncSavepointWithEndInput(
(task, id) ->
task.abortCheckpointOnBarrier(
id,
new CheckpointException(
UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)),
true);
}

@Test
public void testSyncSavepointAbortedAsync() throws Exception {
testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true);
}

/**
* Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios:
*
* <ol>
* <li>Thread1: notify sync savepoint
* <li>Thread2: endInput
* <li>Thread1: confirm/abort/abortAsync
* <li>assert inputEnded: confirmed - no, abort/abortAsync - yes
* </ol>
*/
private void testSyncSavepointWithEndInput(
BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult,
boolean expectEndInput)
throws Exception {
StreamTaskMailboxTestHarness<String> harness =
new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
.addInput(STRING_TYPE_INFO)
.setupOutputForSingletonOperatorChain(
new TestBoundedOneInputStreamOperator())
.build();

final long checkpointId = 1L;
CountDownLatch savepointTriggeredLatch = new CountDownLatch(1);
CountDownLatch inputEndedLatch = new CountDownLatch(1);

MailboxExecutor executor =
harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
executor.execute(
() -> {
try {
harness.streamTask.triggerCheckpointOnBarrier(
new CheckpointMetaData(checkpointId, checkpointId),
new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
new CheckpointMetricsBuilder());
} catch (IOException e) {
fail(e.getMessage());
}
},
"triggerCheckpointOnBarrier");
new Thread(
() -> {
try {
savepointTriggeredLatch.await();
harness.endInput();
inputEndedLatch.countDown();
} catch (InterruptedException e) {
fail(e.getMessage());
}
})
.start();
// this mails should be executed from the one above (from triggerCheckpointOnBarrier)
executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch");
executor.execute(
() -> {
inputEndedLatch.await();
savepointResult.accept(harness.streamTask, checkpointId);
},
"savepointResult");

while (harness.streamTask.isMailboxLoopRunning()) {
harness.streamTask.runMailboxStep();
}

Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded());
}

@Test
public void testCleanUpExceptionSuppressing() throws Exception {
OneInputStreamTaskTestHarness<String, String> testHarness =
new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
OneInputStreamTask::new, STRING_TYPE_INFO, STRING_TYPE_INFO);

testHarness.setupOutputForSingletonOperatorChain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
private static final long serialVersionUID = 1L;

private final String name;
private static volatile boolean inputEnded = false;

public TestBoundedOneInputStreamOperator() {
this("test");
}

public TestBoundedOneInputStreamOperator(String name) {
this.name = name;
inputEnded = false;
}

@Override
Expand All @@ -42,6 +48,7 @@ public void processElement(StreamRecord<String> element) {

@Override
public void endInput() {
inputEnded = true;
output("[" + name + "]: End of input");
}

Expand All @@ -59,4 +66,8 @@ public void close() throws Exception {
private void output(String record) {
output.collect(new StreamRecord<>(record));
}

public static boolean isInputEnded() {
return inputEnded;
}
}
Loading