From cb5061e7149519cb18673f4c572757dce3cc7bd1 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Sun, 9 Jul 2017 11:57:43 -0700 Subject: [PATCH] BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs --- .../operators/ApexParDoOperator.java | 21 ++++++++++++------- .../runners/apex/examples/WordCountTest.java | 8 +++++-- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 809ca2a166c5..c3cbab2c5498 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -359,10 +359,7 @@ private void processWatermark(ApexStreamTuple.WatermarkTuple mark) { } } if (sideInputs.isEmpty()) { - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", mark); - } - output.emit(mark); + outputWatermark(mark); return; } @@ -370,10 +367,20 @@ private void processWatermark(ApexStreamTuple.WatermarkTuple mark) { Math.min(pushedBackWatermark.get(), currentInputWatermark); if (potentialOutputWatermark > currentOutputWatermark) { currentOutputWatermark = potentialOutputWatermark; - if (traceTuples) { - LOG.debug("\nemitting watermark {}\n", currentOutputWatermark); + outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); + } + } + + private void outputWatermark(ApexStreamTuple.WatermarkTuple mark) { + if (traceTuples) { + LOG.debug("\nemitting {}\n", mark); + } + output.emit(mark); + if (!additionalOutputPortMapping.isEmpty()) { + for (DefaultOutputPort> additionalOutput : + additionalOutputPortMapping.values()) { + additionalOutput.emit(mark); } - output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); } } diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index e76096ef78d1..ba757468b031 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -123,11 +123,15 @@ public void testWordCountExample() throws Exception { options.setInputFile(new File(inputFile).getAbsolutePath()); String outputFilePrefix = "target/wordcountresult.txt"; options.setOutput(outputFilePrefix); - WordCountTest.main(TestPipeline.convertToArgs(options)); File outFile1 = new File(outputFilePrefix + "-00000-of-00002"); File outFile2 = new File(outputFilePrefix + "-00001-of-00002"); - Assert.assertTrue(outFile1.exists() && outFile2.exists()); + Assert.assertTrue(!outFile1.exists() || outFile1.delete()); + Assert.assertTrue(!outFile2.exists() || outFile2.delete()); + + WordCountTest.main(TestPipeline.convertToArgs(options)); + + Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists()); HashSet results = new HashSet<>(); results.addAll(FileUtils.readLines(outFile1)); results.addAll(FileUtils.readLines(outFile2));