diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java index 454e0e60d5a5a..058fc57ee8bbc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java @@ -69,7 +69,7 @@ public void processRecordAttributes(int idx, RecordAttributes recordAttributes) getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes); } - private MultipleInputStreamOperator getCastedOperator() { + protected MultipleInputStreamOperator getCastedOperator() { return (MultipleInputStreamOperator) operator; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java index 82a6d424c4f86..566424d7d21fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java @@ -30,7 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; import java.util.List; @@ -48,10 +48,10 @@ * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedMultiInputStreamOperatorTestHarness - extends AbstractStreamOperatorTestHarness { + extends MultiInputStreamOperatorTestHarness { /** The executor service for async state processing. */ - private ExecutorService executor; + private final ExecutorService executor; public static AsyncKeyedMultiInputStreamOperatorTestHarness create( StreamOperatorFactory operatorFactory, @@ -108,6 +108,8 @@ public void setKeySelector(int idx, KeySelector keySelector) { config.serializeAllConfigs(); } + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public void processElement(int idx, StreamRecord element) throws Exception { Input input = getCastedOperator().getInputs().get(idx); ThrowingConsumer, Exception> inputProcessor = @@ -115,16 +117,22 @@ public void processElement(int idx, StreamRecord element) throws Exception { execute(executor, (ignore) -> inputProcessor.accept(element)).get(); } + @Override + @SuppressWarnings("rawtypes") public void processWatermark(int idx, Watermark mark) throws Exception { Input input = getCastedOperator().getInputs().get(idx); execute(executor, (ignore) -> input.processWatermark(mark)).get(); } + @Override + @SuppressWarnings("rawtypes") public void processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) throws Exception { Input input = getCastedOperator().getInputs().get(idx); execute(executor, (ignore) -> input.processWatermarkStatus(watermarkStatus)).get(); } + @Override + @SuppressWarnings("rawtypes") public void processRecordAttributes(int idx, RecordAttributes recordAttributes) throws Exception { Input input = getCastedOperator().getInputs().get(idx); @@ -137,16 +145,7 @@ public void drainStateRequests() throws Exception { @Override public void close() throws Exception { - execute( - executor, - (ignore) -> { - super.close(); - }) - .get(); + execute(executor, (ignore) -> super.close()).get(); executor.shutdown(); } - - private MultipleInputStreamOperator getCastedOperator() { - return (MultipleInputStreamOperator) operator; - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java index fbca7135f3841..6128a5b915a0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -34,7 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; import java.util.ArrayList; @@ -54,13 +55,15 @@ * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedOneInputStreamOperatorTestHarness - extends AbstractStreamOperatorTestHarness { + extends OneInputStreamOperatorTestHarness { /** Empty if the {@link #operator} is not {@link MultipleInputStreamOperator}. */ - private final List inputs = new ArrayList<>(); + private final List> inputs = new ArrayList<>(); + + private long currentWatermark; /** The executor service for async state processing. */ - private ExecutorService executor; + private final ExecutorService executor; public static AsyncKeyedOneInputStreamOperatorTestHarness create( OneInputStreamOperator operator, @@ -120,6 +123,7 @@ protected AsyncKeyedOneInputStreamOperatorTestHarness( } @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public void setup(TypeSerializer outputSerializer) { super.setup(outputSerializer); if (operator instanceof MultipleInputStreamOperator) { @@ -128,10 +132,7 @@ public void setup(TypeSerializer outputSerializer) { } } - public OneInputStreamOperator getOneInputOperator() { - return (OneInputStreamOperator) this.operator; - } - + @Override public void processElement(StreamRecord element) throws Exception { processElementInternal(element).get(); } @@ -140,6 +141,7 @@ public void processElement(StreamRecord element) throws Exception { * Submit an element processing in an executor thread. This method is mainly used for internal * testing, please use {@link #processElement} for common operator testing. */ + @SuppressWarnings({"rawtypes", "unchecked"}) public CompletableFuture processElementInternal(StreamRecord element) throws Exception { if (inputs.isEmpty()) { @@ -160,22 +162,24 @@ public CompletableFuture processElementInternal(StreamRecord element) } } + @Override public void processWatermark(long watermark) throws Exception { processWatermarkInternal(watermark).get(); } /** For internal testing. */ - public CompletableFuture processWatermarkInternal(long watermark) throws Exception { + public CompletableFuture processWatermarkInternal(long watermark) { return processWatermarkInternal(new Watermark(watermark)); } + @Override public void processWatermarkStatus(WatermarkStatus status) throws Exception { processWatermarkStatusInternal(status).get(); } /** For internal testing. */ - public CompletableFuture processWatermarkStatusInternal(WatermarkStatus status) - throws Exception { + @SuppressWarnings("rawtypes") + public CompletableFuture processWatermarkStatusInternal(WatermarkStatus status) { if (inputs.isEmpty()) { return execute( executor, (ignore) -> getOneInputOperator().processWatermarkStatus(status)); @@ -186,12 +190,22 @@ public CompletableFuture processWatermarkStatusInternal(WatermarkStatus st } } + @Override public void processWatermark(Watermark mark) throws Exception { processWatermarkInternal(mark).get(); } + @Override + public void endInput() throws Exception { + if (operator instanceof BoundedOneInput) { + execute(executor, (ignore) -> ((BoundedOneInput) operator).endInput()).get(); + } + } + /** For internal testing. */ - public CompletableFuture processWatermarkInternal(Watermark mark) throws Exception { + @SuppressWarnings("rawtypes") + public CompletableFuture processWatermarkInternal(Watermark mark) { + currentWatermark = mark.getTimestamp(); if (inputs.isEmpty()) { return execute(executor, (ignore) -> getOneInputOperator().processWatermark(mark)); } else { @@ -206,6 +220,7 @@ public void processLatencyMarker(LatencyMarker marker) throws Exception { } /** For internal testing. */ + @SuppressWarnings("rawtypes") public CompletableFuture processLatencyMarkerInternal(LatencyMarker marker) { if (inputs.isEmpty()) { return execute( @@ -217,11 +232,18 @@ public CompletableFuture processLatencyMarkerInternal(LatencyMarker marker } } + @Override public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception { processRecordAttributesInternal(recordAttributes).get(); } + @Override + public long getCurrentWatermark() { + return currentWatermark; + } + /** For internal testing. */ + @SuppressWarnings("rawtypes") public CompletableFuture processRecordAttributesInternal( RecordAttributes recordAttributes) { if (inputs.isEmpty()) { @@ -246,12 +268,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { @Override public void close() throws Exception { - execute( - executor, - (ignore) -> { - super.close(); - }) - .get(); + execute(executor, (ignore) -> super.close()).get(); executor.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java index d3f34a1b19c14..4128f073afd79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -30,7 +31,7 @@ import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ThrowingConsumer; @@ -48,7 +49,7 @@ * async processing, please use methods of test harness instead of operator. */ public class AsyncKeyedTwoInputStreamOperatorTestHarness - extends AbstractStreamOperatorTestHarness { + extends TwoInputStreamOperatorTestHarness { private final TwoInputStreamOperator twoInputOperator; @@ -56,7 +57,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness private ThrowingConsumer, Exception> processor2; /** The executor service for async state processing. */ - private ExecutorService executor; + private final ExecutorService executor; public static AsyncKeyedTwoInputStreamOperatorTestHarness create( @@ -132,62 +133,85 @@ private ThrowingConsumer, Exception> getRecordProcessor2() { return processor2; } + @Override public void processElement1(StreamRecord element) throws Exception { execute(executor, (ignore) -> getRecordProcessor1().accept(element)).get(); } + @Override public void processElement1(IN1 value, long timestamp) throws Exception { processElement1(new StreamRecord<>(value, timestamp)); } + @Override public void processElement2(StreamRecord element) throws Exception { execute(executor, (ignore) -> getRecordProcessor2().accept(element)).get(); } + @Override public void processElement2(IN2 value, long timestamp) throws Exception { processElement2(new StreamRecord<>(value, timestamp)); } + @Override public void processWatermark1(Watermark mark) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get(); } + @Override public void processWatermark2(Watermark mark) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get(); } + @Override + public void processBothWatermarks(Watermark mark) throws Exception { + execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get(); + execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get(); + } + + @Override public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus1(watermarkStatus)) .get(); } + @Override public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception { execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus2(watermarkStatus)) .get(); } + @Override public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception { execute(executor, (ignore) -> twoInputOperator.processRecordAttributes1(recordAttributes)) .get(); } + @Override public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception { execute(executor, (ignore) -> twoInputOperator.processRecordAttributes2(recordAttributes)) .get(); } + public void endInput1() throws Exception { + if (operator instanceof BoundedMultiInput) { + execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(1)).get(); + } + } + + public void endInput2() throws Exception { + if (operator instanceof BoundedMultiInput) { + execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(2)).get(); + } + } + public void drainStateRequests() throws Exception { execute(executor, (ignore) -> drain(operator)).get(); } @Override public void close() throws Exception { - execute( - executor, - (ignore) -> { - super.close(); - }) - .get(); + execute(executor, (ignore) -> super.close()).get(); executor.shutdown(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java index a88146551e3c8..0c5252e5e4001 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java @@ -42,7 +42,7 @@ public static void drain(StreamOperator operator) { public static CompletableFuture execute( ExecutorService executor, ThrowingConsumer processor) { - CompletableFuture future = new CompletableFuture(); + CompletableFuture future = new CompletableFuture<>(); executor.execute( () -> { try {