diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index b709299d789d8..4ebbd0374b97a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -66,7 +66,7 @@ public class SourceStreamTask< * Indicates whether this Task was purposefully finished (by finishTask()), in this case we want * to ignore exceptions thrown after finishing, to ensure shutdown works smoothly. */ - private volatile boolean isFinished = false; + private volatile boolean wasStoppedExternally = false; public SourceStreamTask(Environment env) throws Exception { this(env, new Object()); @@ -170,11 +170,11 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E .isPresent()) { mailboxProcessor.reportThrowable( new CancelTaskException(sourceThreadThrowable)); - } else if (!isFinished && sourceThreadThrowable != null) { + } else if (!wasStoppedExternally && sourceThreadThrowable != null) { mailboxProcessor.reportThrowable(sourceThreadThrowable); } else if (sourceThreadThrowable != null || isCanceled() - || isFinished) { + || wasStoppedExternally) { mailboxProcessor.allActionsCompleted(); } else { // this is a "true" end of input regardless of whether @@ -205,7 +205,7 @@ protected void cancelTask() { @Override protected void finishTask() throws Exception { - isFinished = true; + wasStoppedExternally = true; cancelTask(); }