Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void processRecordAttributes(int idx, RecordAttributes recordAttributes)
getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes);
}

private MultipleInputStreamOperator<OUT> getCastedOperator() {
protected MultipleInputStreamOperator<OUT> getCastedOperator() {
return (MultipleInputStreamOperator<OUT>) operator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.List;
Expand All @@ -48,10 +48,10 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends MultiInputStreamOperatorTestHarness<OUT> {

/** The executor service for async state processing. */
private ExecutorService executor;
private final ExecutorService executor;

public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> create(
StreamOperatorFactory<OUT> operatorFactory,
Expand Down Expand Up @@ -108,23 +108,31 @@ public void setKeySelector(int idx, KeySelector<?, K> keySelector) {
config.serializeAllConfigs();
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void processElement(int idx, StreamRecord<?> element) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
RecordProcessorUtils.getRecordProcessor(input);
execute(executor, (ignore) -> inputProcessor.accept(element)).get();
}

@Override
@SuppressWarnings("rawtypes")
public void processWatermark(int idx, Watermark mark) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermark(mark)).get();
}

@Override
@SuppressWarnings("rawtypes")
public void processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermarkStatus(watermarkStatus)).get();
}

@Override
@SuppressWarnings("rawtypes")
public void processRecordAttributes(int idx, RecordAttributes recordAttributes)
throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
Expand All @@ -137,16 +145,7 @@ public void drainStateRequests() throws Exception {

@Override
public void close() throws Exception {
execute(
executor,
(ignore) -> {
super.close();
})
.get();
execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}

private MultipleInputStreamOperator<OUT> getCastedOperator() {
return (MultipleInputStreamOperator<OUT>) operator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand All @@ -34,7 +35,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import java.util.ArrayList;
Expand All @@ -54,13 +55,15 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends OneInputStreamOperatorTestHarness<IN, OUT> {

/** Empty if the {@link #operator} is not {@link MultipleInputStreamOperator}. */
private final List<Input> inputs = new ArrayList<>();
private final List<Input<IN>> inputs = new ArrayList<>();

private long currentWatermark;

/** The executor service for async state processing. */
private ExecutorService executor;
private final ExecutorService executor;

public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create(
OneInputStreamOperator<IN, OUT> operator,
Expand Down Expand Up @@ -120,6 +123,7 @@ protected AsyncKeyedOneInputStreamOperatorTestHarness(
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void setup(TypeSerializer<OUT> outputSerializer) {
super.setup(outputSerializer);
if (operator instanceof MultipleInputStreamOperator) {
Expand All @@ -128,10 +132,7 @@ public void setup(TypeSerializer<OUT> outputSerializer) {
}
}

public OneInputStreamOperator<IN, OUT> getOneInputOperator() {
return (OneInputStreamOperator<IN, OUT>) this.operator;
}

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
processElementInternal(element).get();
}
Expand All @@ -140,6 +141,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {
* Submit an element processing in an executor thread. This method is mainly used for internal
* testing, please use {@link #processElement} for common operator testing.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element)
throws Exception {
if (inputs.isEmpty()) {
Expand All @@ -160,22 +162,24 @@ public CompletableFuture<Void> processElementInternal(StreamRecord<IN> element)
}
}

@Override
public void processWatermark(long watermark) throws Exception {
processWatermarkInternal(watermark).get();
}

/** For internal testing. */
public CompletableFuture<Void> processWatermarkInternal(long watermark) throws Exception {
public CompletableFuture<Void> processWatermarkInternal(long watermark) {
return processWatermarkInternal(new Watermark(watermark));
}

@Override
public void processWatermarkStatus(WatermarkStatus status) throws Exception {
processWatermarkStatusInternal(status).get();
}

/** For internal testing. */
public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status)
throws Exception {
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus status) {
if (inputs.isEmpty()) {
return execute(
executor, (ignore) -> getOneInputOperator().processWatermarkStatus(status));
Expand All @@ -186,12 +190,22 @@ public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus st
}
}

@Override
public void processWatermark(Watermark mark) throws Exception {
processWatermarkInternal(mark).get();
}

@Override
public void endInput() throws Exception {
if (operator instanceof BoundedOneInput) {
execute(executor, (ignore) -> ((BoundedOneInput) operator).endInput()).get();
}
}

/** For internal testing. */
public CompletableFuture<Void> processWatermarkInternal(Watermark mark) throws Exception {
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {
return execute(executor, (ignore) -> getOneInputOperator().processWatermark(mark));
} else {
Expand All @@ -206,6 +220,7 @@ public void processLatencyMarker(LatencyMarker marker) throws Exception {
}

/** For internal testing. */
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker) {
if (inputs.isEmpty()) {
return execute(
Expand All @@ -217,11 +232,18 @@ public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker marker
}
}

@Override
public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
processRecordAttributesInternal(recordAttributes).get();
}

@Override
public long getCurrentWatermark() {
return currentWatermark;
}

/** For internal testing. */
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> processRecordAttributesInternal(
RecordAttributes recordAttributes) {
if (inputs.isEmpty()) {
Expand All @@ -246,12 +268,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {

@Override
public void close() throws Exception {
execute(
executor,
(ignore) -> {
super.close();
})
.get();
execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -30,7 +31,7 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

Expand All @@ -48,15 +49,15 @@
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
extends AbstractStreamOperatorTestHarness<OUT> {
extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {

private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;

private ThrowingConsumer<StreamRecord<IN1>, Exception> processor1;
private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;

/** The executor service for async state processing. */
private ExecutorService executor;
private final ExecutorService executor;

public static <K, IN1, IN2, OUT>
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create(
Expand Down Expand Up @@ -132,62 +133,85 @@ private ThrowingConsumer<StreamRecord<IN2>, Exception> getRecordProcessor2() {
return processor2;
}

@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
execute(executor, (ignore) -> getRecordProcessor1().accept(element)).get();
}

@Override
public void processElement1(IN1 value, long timestamp) throws Exception {
processElement1(new StreamRecord<>(value, timestamp));
}

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
execute(executor, (ignore) -> getRecordProcessor2().accept(element)).get();
}

@Override
public void processElement2(IN2 value, long timestamp) throws Exception {
processElement2(new StreamRecord<>(value, timestamp));
}

@Override
public void processWatermark1(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get();
}

@Override
public void processWatermark2(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get();
}

@Override
public void processBothWatermarks(Watermark mark) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermark1(mark)).get();
execute(executor, (ignore) -> twoInputOperator.processWatermark2(mark)).get();
}

@Override
public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus1(watermarkStatus))
.get();
}

@Override
public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processWatermarkStatus2(watermarkStatus))
.get();
}

@Override
public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processRecordAttributes1(recordAttributes))
.get();
}

@Override
public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
execute(executor, (ignore) -> twoInputOperator.processRecordAttributes2(recordAttributes))
.get();
}

public void endInput1() throws Exception {
if (operator instanceof BoundedMultiInput) {
execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(1)).get();
}
}

public void endInput2() throws Exception {
if (operator instanceof BoundedMultiInput) {
execute(executor, (ignore) -> ((BoundedMultiInput) operator).endInput(2)).get();
}
}

public void drainStateRequests() throws Exception {
execute(executor, (ignore) -> drain(operator)).get();
}

@Override
public void close() throws Exception {
execute(
executor,
(ignore) -> {
super.close();
})
.get();
execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static <OUT> void drain(StreamOperator<OUT> operator) {

public static CompletableFuture<Void> execute(
ExecutorService executor, ThrowingConsumer<Void, Exception> processor) {
CompletableFuture<Void> future = new CompletableFuture();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(
() -> {
try {
Expand Down