From 7105d925c51a49798849f01f1d7e0b4f3d4f51ad Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Wed, 19 Oct 2016 19:11:54 -0700 Subject: [PATCH] BEAM-783 Add test to cover side inputs and outputs. --- .../ParDoBoundMultiTranslator.java | 14 ++- .../functions/ApexFlattenOperator.java | 3 +- .../translators/ParDoBoundTranslatorTest.java | 96 ++++++++++++++++++- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java index 13f07c1b1140d..9135dd86063f5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java @@ -68,9 +68,19 @@ public void translate(ParDo.BoundMulti transform, TranslationCo Map, PCollection> outputs = output.getAll(); Map, OutputPort> ports = Maps.newHashMapWithExpectedSize(outputs.size()); - int i = 0; for (Map.Entry, PCollection> outputEntry : outputs.entrySet()) { - ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]); + if (outputEntry.getKey() == transform.getMainOutputTag()) { + ports.put(outputEntry.getValue(), operator.output); + } else { + int portIndex = 0; + for (TupleTag tag : transform.getSideOutputTags().getAll()) { + if (tag == outputEntry.getKey()) { + ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]); + break; + } + portIndex++; + } + } } context.addOperator(operator, ports); context.addStream(context.getInput(), operator.input); diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java index dd8fcd168724c..703b1f4f40731 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java @@ -34,7 +34,7 @@ public class ApexFlattenOperator extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class); - private boolean traceTuples = true; + private boolean traceTuples = false; private long inputWM1; private long inputWM2; @@ -121,4 +121,5 @@ public void process(ApexStreamTuple> tuple) { @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort>> out = new DefaultOutputPort>>(); + } diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index ad22acde5cf67..72b429977892f 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -26,6 +26,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -48,9 +50,11 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -67,6 +71,8 @@ @RunWith(JUnit4.class) public class ParDoBoundTranslatorTest { private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class); + private static final long SLEEP_MILLIS = 500; + private static final long TIMEOUT_MILLIS = 30000; @Test public void test() throws Exception { @@ -94,13 +100,13 @@ public void test() throws Exception { Assert.assertNotNull(om); Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class); - long timeout = System.currentTimeMillis() + 30000; + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; while (System.currentTimeMillis() < timeout) { if (EmbeddedCollector.RESULTS.containsAll(expected)) { break; } LOG.info("Waiting for expected results."); - Thread.sleep(1000); + Thread.sleep(SLEEP_MILLIS); } Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } @@ -119,11 +125,12 @@ public void processElement(ProcessContext c) throws Exception { } } - @SuppressWarnings("serial") private static class EmbeddedCollector extends OldDoFn { + private static final long serialVersionUID = 1L; protected static final HashSet RESULTS = new HashSet<>(); public EmbeddedCollector() { + RESULTS.clear(); } @Override @@ -175,6 +182,7 @@ public void testContainsInAnyOrder() throws Exception { Pipeline pipeline = Pipeline.create(options); PCollection pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); + // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown pipeline.run(); } @@ -203,4 +211,86 @@ public void testSerialization() throws Exception { Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator)); } + + @Test + public void testMultiOutputParDoWithSideInputs() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); // non-blocking run + Pipeline pipeline = Pipeline.create(options); + + List inputs = Arrays.asList(3, -42, 666); + final TupleTag mainOutputTag = new TupleTag("main"); + final TupleTag sideOutputTag = new TupleTag("sideOutput"); + + PCollectionView sideInput1 = pipeline + .apply("CreateSideInput1", Create.of(11)) + .apply("ViewSideInput1", View.asSingleton()); + PCollectionView sideInputUnread = pipeline + .apply("CreateSideInputUnread", Create.of(-3333)) + .apply("ViewSideInputUnread", View.asSingleton()); + PCollectionView sideInput2 = pipeline + .apply("CreateSideInput2", Create.of(222)) + .apply("ViewSideInput2", View.asSingleton()); + + PCollectionTuple outputs = pipeline + .apply(Create.of(inputs)) + .apply(ParDo.withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .of(new TestMultiOutputWithSideInputsFn( + Arrays.asList(sideInput1, sideInput2), + Arrays.>asList()))); + + outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); + ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + + HashSet expected = Sets.newHashSet("processing: 3: [11, 222]", + "processing: -42: [11, 222]", "processing: 666: [11, 222]"); + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); + } + result.cancel(); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + } + + private static class TestMultiOutputWithSideInputsFn extends OldDoFn { + private static final long serialVersionUID = 1L; + + final List> sideInputViews = new ArrayList<>(); + final List> sideOutputTupleTags = new ArrayList<>(); + + public TestMultiOutputWithSideInputsFn(List> sideInputViews, + List> sideOutputTupleTags) { + this.sideInputViews.addAll(sideInputViews); + this.sideOutputTupleTags.addAll(sideOutputTupleTags); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + outputToAllWithSideInputs(c, "processing: " + c.element()); + } + + private void outputToAllWithSideInputs(ProcessContext c, String value) { + if (!sideInputViews.isEmpty()) { + List sideInputValues = new ArrayList<>(); + for (PCollectionView sideInputView : sideInputViews) { + sideInputValues.add(c.sideInput(sideInputView)); + } + value += ": " + sideInputValues; + } + c.output(value); + for (TupleTag sideOutputTupleTag : sideOutputTupleTags) { + c.sideOutput(sideOutputTupleTag, + sideOutputTupleTag.getId() + ": " + value); + } + } + + } + }