Skip to content

Commit

Permalink
[hotfix] Add ProcessOperator tests that verify side outputs work
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 24, 2017
1 parent 4afca4b commit cb615f1
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
Expand Up @@ -31,9 +31,12 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.concurrent.ConcurrentLinkedQueue;

Expand All @@ -44,6 +47,9 @@
*/
public class KeyedProcessOperatorTest extends TestLogger {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testTimestampAndWatermarkQuerying() throws Exception {

Expand Down Expand Up @@ -277,6 +283,93 @@ public void testSnapshotAndRestore() throws Exception {
testHarness.close();
}

@Test
public void testNullOutputTagRefusal() throws Exception {
KeyedProcessOperator<Integer, Integer, String> operator =
new KeyedProcessOperator<>(new NullOutputTagEmittingProcessFunction());

OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
operator, new IdentityKeySelector<>(), BasicTypeInfo.INT_TYPE_INFO);

testHarness.setup();
testHarness.open();

testHarness.setProcessingTime(17);
try {
expectedException.expect(IllegalArgumentException.class);
testHarness.processElement(new StreamRecord<>(5));
} finally {
testHarness.close();
}
}

/**
* This also verifies that the timestamps ouf side-emitted records is correct.
*/
@Test
public void testSideOutput() throws Exception {
KeyedProcessOperator<Integer, Integer, String> operator =
new KeyedProcessOperator<>(new SideOutputProcessFunction());

OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
operator, new IdentityKeySelector<>(), BasicTypeInfo.INT_TYPE_INFO);

testHarness.setup();
testHarness.open();

testHarness.processElement(new StreamRecord<>(42, 17L /* timestamp */));

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

expectedOutput.add(new StreamRecord<>("IN:42", 17L /* timestamp */));

TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

ConcurrentLinkedQueue<StreamRecord<Integer>> expectedIntSideOutput = new ConcurrentLinkedQueue<>();
expectedIntSideOutput.add(new StreamRecord<>(42, 17L /* timestamp */));
ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput =
testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
TestHarnessUtil.assertOutputEquals(
"Side output was not correct.",
expectedIntSideOutput,
intSideOutput);

ConcurrentLinkedQueue<StreamRecord<Long>> expectedLongSideOutput = new ConcurrentLinkedQueue<>();
expectedLongSideOutput.add(new StreamRecord<>(42L, 17L /* timestamp */));
ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput =
testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
TestHarnessUtil.assertOutputEquals(
"Side output was not correct.",
expectedLongSideOutput,
longSideOutput);

testHarness.close();
}

private static class NullOutputTagEmittingProcessFunction extends ProcessFunction<Integer, String> {

@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
ctx.output(null, value);
}
}

private static class SideOutputProcessFunction extends ProcessFunction<Integer, String> {

static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out") {};
static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out") {};

@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect("IN:" + value);

ctx.output(INTEGER_OUTPUT_TAG, value);
ctx.output(LONG_OUTPUT_TAG, value.longValue());
}
}

private static class IdentityKeySelector<T> implements KeySelector<T, T> {
private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
Expand Down Expand Up @@ -117,6 +118,49 @@ public void testNullOutputTagRefusal() throws Exception {
}
}

/**
* This also verifies that the timestamps ouf side-emitted records is correct.
*/
@Test
public void testSideOutput() throws Exception {
ProcessOperator<Integer, String> operator =
new ProcessOperator<>(new SideOutputProcessFunction());

OneInputStreamOperatorTestHarness<Integer, String> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);

testHarness.setup();
testHarness.open();

testHarness.processElement(new StreamRecord<>(42, 17L /* timestamp */));

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

expectedOutput.add(new StreamRecord<>("IN:42", 17L /* timestamp */));

TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());

ConcurrentLinkedQueue<StreamRecord<Integer>> expectedIntSideOutput = new ConcurrentLinkedQueue<>();
expectedIntSideOutput.add(new StreamRecord<>(42, 17L /* timestamp */));
ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput =
testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
TestHarnessUtil.assertOutputEquals(
"Side output was not correct.",
expectedIntSideOutput,
intSideOutput);

ConcurrentLinkedQueue<StreamRecord<Long>> expectedLongSideOutput = new ConcurrentLinkedQueue<>();
expectedLongSideOutput.add(new StreamRecord<>(42L, 17L /* timestamp */));
ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput =
testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
TestHarnessUtil.assertOutputEquals(
"Side output was not correct.",
expectedLongSideOutput,
longSideOutput);

testHarness.close();
}

private static class NullOutputTagEmittingProcessFunction extends ProcessFunction<Integer, String> {

@Override
Expand All @@ -125,6 +169,20 @@ public void processElement(Integer value, Context ctx, Collector<String> out) th
}
}

private static class SideOutputProcessFunction extends ProcessFunction<Integer, String> {

static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out") {};
static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out") {};

@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect("IN:" + value);

ctx.output(INTEGER_OUTPUT_TAG, value);
ctx.output(LONG_OUTPUT_TAG, value.longValue());
}
}

private static class QueryingProcessFunction extends ProcessFunction<Integer, String> {

private static final long serialVersionUID = 1L;
Expand Down
Expand Up @@ -57,7 +57,7 @@ public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> output) {
/**
* Compare the two queues containing operator/task output by converting them to an array first.
*/
public static void assertOutputEquals(String message, Queue<Object> expected, Queue<Object> actual) {
public static <T> void assertOutputEquals(String message, Queue<T> expected, Queue<T> actual) {
Assert.assertArrayEquals(message,
expected.toArray(),
actual.toArray());
Expand Down

0 comments on commit cb615f1

Please sign in to comment.