From dd71e19515fe4a8a9f1e4e7d765c2d4a6aa95bac Mon Sep 17 00:00:00 2001 From: sunhaibotb Date: Tue, 25 Jun 2019 11:46:55 +0800 Subject: [PATCH] [FLINK-12967][runtime] Change the processing that the input reaches the end to comply with the InputSelectable contract --- .../io/StreamTwoInputSelectableProcessor.java | 3 +-- .../tasks/StreamTaskSelectiveReadingTest.java | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java index 3293ae90f82f2..54ce749a4827a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java @@ -337,10 +337,9 @@ private void waitForOneInput(StreamTaskInput input) private boolean checkFinished() throws Exception { if (getInput(lastReadInputIndex).isFinished()) { - inputSelection = (lastReadInputIndex == 0) ? InputSelection.SECOND : InputSelection.FIRST; - synchronized (lock) { operatorChain.endInput(getInputId(lastReadInputIndex)); + inputSelection = inputSelector.nextSelection(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java index a56dc9b2673a7..1308796f5ddb0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -239,7 +240,7 @@ public void processElement2(StreamRecord element) { * Test operator for sequential reading. */ public static class SequentialReadingStreamOperator extends AbstractStreamOperator - implements TwoInputStreamOperator, InputSelectable { + implements TwoInputStreamOperator, InputSelectable, BoundedMultiInput { private final String name; @@ -265,13 +266,18 @@ public void processElement1(StreamRecord element) { @Override public void processElement2(StreamRecord element) { output.collect(element.replace("[" + name + "-2]: " + element.getValue())); + } - this.inputSelection = InputSelection.SECOND; + @Override + public void endInput(int inputId) { + if (inputId == 1) { + inputSelection = InputSelection.SECOND; + } } } private static class SpecialRuleReadingStreamOperator extends AbstractStreamOperator - implements TwoInputStreamOperator, InputSelectable { + implements TwoInputStreamOperator, InputSelectable, BoundedMultiInput { private final String name; @@ -338,6 +344,11 @@ public void processElement2(StreamRecord element) { inputSelection = InputSelection.SECOND; } + + @Override + public void endInput(int inputId) { + inputSelection = (inputId == 1) ? InputSelection.SECOND : InputSelection.FIRST; + } } private static class TestReadFinishedInputStreamOperator extends AbstractStreamOperator