From b53567a0fdb4a291e4452c1bf00070fa65cb4856 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 2 Feb 2021 19:34:39 +0100 Subject: [PATCH] [hotfix][task] Rename SourceStreamTask.isFinished to wasStoppedExternally --- .../flink/streaming/runtime/tasks/SourceStreamTask.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index b709299d789d8f..4ebbd0374b97a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -66,7 +66,7 @@ public class SourceStreamTask< * Indicates whether this Task was purposefully finished (by finishTask()), in this case we want * to ignore exceptions thrown after finishing, to ensure shutdown works smoothly. */ - private volatile boolean isFinished = false; + private volatile boolean wasStoppedExternally = false; public SourceStreamTask(Environment env) throws Exception { this(env, new Object()); @@ -170,11 +170,11 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E .isPresent()) { mailboxProcessor.reportThrowable( new CancelTaskException(sourceThreadThrowable)); - } else if (!isFinished && sourceThreadThrowable != null) { + } else if (!wasStoppedExternally && sourceThreadThrowable != null) { mailboxProcessor.reportThrowable(sourceThreadThrowable); } else if (sourceThreadThrowable != null || isCanceled() - || isFinished) { + || wasStoppedExternally) { mailboxProcessor.allActionsCompleted(); } else { // this is a "true" end of input regardless of whether @@ -205,7 +205,7 @@ protected void cancelTask() { @Override protected void finishTask() throws Exception { - isFinished = true; + wasStoppedExternally = true; cancelTask(); }