From 9ef399774afafe2f47fd9770b47d4c7f0e96ae2a Mon Sep 17 00:00:00 2001 From: taegeonum Date: Tue, 6 Nov 2018 15:49:47 +0900 Subject: [PATCH 1/3] nemo-267 --- .../nemo/common/punctuation/Watermark.java | 12 +- .../beam/transform/AbstractDoFnTransform.java | 9 +- .../beam/transform/DoFnTransform.java | 6 + .../GroupByKeyAndWindowDoFnTransform.java | 100 +++++++++++--- .../GroupByKeyAndWindowDoFnTransformTest.java | 127 ++++++++++++++---- 5 files changed, 207 insertions(+), 47 deletions(-) diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java index 27ee53b3fa..e676e6e8c9 100644 --- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java +++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java @@ -24,7 +24,7 @@ /** * Watermark event. */ -public final class Watermark implements Serializable { +public final class Watermark implements Serializable, Comparable { private final long timestamp; public Watermark(final long timestamp) { @@ -47,8 +47,18 @@ public boolean equals(final Object o) { return timestamp == watermark.timestamp; } + @Override + public String toString() { + return String.valueOf(timestamp); + } + @Override public int hashCode() { return Objects.hash(timestamp); } + + @Override + public int compareTo(final Watermark o) { + return Long.compare(timestamp, o.getTimestamp()); + } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java index 92a1f1696d..79534e39a3 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java @@ -119,7 +119,7 @@ public final DoFn getDoFn() { public final void prepare(final Context context, final OutputCollector> oc) { // deserialize pipeline option final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class); - this.outputCollector = oc; + this.outputCollector = wrapOutputCollector(oc); // create output manager outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag); @@ -184,6 +184,13 @@ public final void close() { */ abstract DoFn wrapDoFn(final DoFn originalDoFn); + /** + * An abstract function that wraps the original output collector. + * @param oc the original outputCollector. + * @return wrapped output collector. + */ + abstract OutputCollector wrapOutputCollector(final OutputCollector oc); + @Override public abstract void onData(final WindowedValue data); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java index 4a57ada904..0028c62560 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.punctuation.Watermark; import java.util.Collection; @@ -79,6 +80,11 @@ protected void beforeClose() { // nothing } + @Override + OutputCollector wrapOutputCollector(final OutputCollector oc) { + return oc; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 7d20f26536..883b4da577 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.punctuation.Watermark; import org.joda.time.Instant; import org.slf4j.Logger; @@ -48,7 +49,8 @@ public final class GroupByKeyAndWindowDoFnTransform private final Map>> keyToValues; private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory; private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory; - private long currentOutputWatermark; + private Watermark prevOutputWatermark; + private final Map keyAndWatermarkHoldMap; /** * GroupByKey constructor. @@ -70,7 +72,8 @@ public GroupByKeyAndWindowDoFnTransform(final Map, Coder> outputC options); this.keyToValues = new HashMap<>(); this.reduceFn = reduceFn; - this.currentOutputWatermark = Long.MIN_VALUE; + this.prevOutputWatermark = new Watermark(Long.MIN_VALUE); + this.keyAndWatermarkHoldMap = new HashMap<>(); } /** @@ -96,6 +99,11 @@ protected DoFn wrapDoFn(final DoFn doFn) { getMainOutputTag()); } + @Override + OutputCollector wrapOutputCollector(final OutputCollector oc) { + return new GBKWOutputCollector(oc); + } + /** * It collects data for each key. * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)} @@ -122,8 +130,6 @@ public void onData(final WindowedValue> element) { private void processElementsAndTriggerTimers(final Watermark inputWatermark, final Instant processingTime, final Instant synchronizedTime) { - long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE; - for (final Map.Entry>> entry : keyToValues.entrySet()) { final K key = entry.getKey(); final List> values = entry.getValue(); @@ -139,20 +145,52 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark, } // Trigger timers - final long minOutputTimestamp = - triggerTimers(key, inputWatermark, processingTime, synchronizedTime); - - minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp); + triggerTimers(key, inputWatermark, processingTime, synchronizedTime); // Remove values values.clear(); } // Emit watermark to downstream operators - if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE - && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) { - currentOutputWatermark = minOutputTimestampsOfEmittedWindows; - getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows)); + emitOutputWatermark(inputWatermark); + } + + /** + * Output watermark + * = max(prev output watermark, + * min(input watermark, watermark holds)). + * @param inputWatermark input watermark + */ + private void emitOutputWatermark(final Watermark inputWatermark) { + + if (keyAndWatermarkHoldMap.isEmpty()) { + return; + } + + // Find min watermark hold + final Watermark watermarkHold = Collections.min(keyAndWatermarkHoldMap.values()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Watermark hold: {}, " + + "inputWatermark: {}, outputWatermark: {}", watermarkHold, inputWatermark, prevOutputWatermark); + } + + if (watermarkHold.getTimestamp() > inputWatermark.getTimestamp()) { + if (inputWatermark.getTimestamp() > prevOutputWatermark.getTimestamp()) { + // progress! + prevOutputWatermark = inputWatermark; + getOutputCollector().emitWatermark(prevOutputWatermark); + } + } else { + // watermark hold < input watermark + if (watermarkHold.getTimestamp() > prevOutputWatermark.getTimestamp()) { + // progress! + prevOutputWatermark = watermarkHold; + // Remove minimum watermark holds + keyAndWatermarkHoldMap.entrySet() + .removeIf(entry -> entry.getValue().getTimestamp() == watermarkHold.getTimestamp()); + getOutputCollector().emitWatermark(prevOutputWatermark); + } } } @@ -179,10 +217,8 @@ protected void beforeClose() { * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time - * @return the minimum output timestamp. - * If no data is emitted, it returns Long.MAX_VALUE. */ - private long triggerTimers(final K key, + private void triggerTimers(final K key, final Watermark watermark, final Instant processingTime, final Instant synchronizedTime) { @@ -198,10 +234,7 @@ private long triggerTimers(final K key, final List timerDataList = getEligibleTimers(timerInternals); - if (timerDataList.isEmpty()) { - return Long.MAX_VALUE; - } else { - + if (!timerDataList.isEmpty()) { // Trigger timers and emit windowed data final KeyedWorkItem timerWorkItem = KeyedWorkItems.timersWorkItem(key, timerDataList); @@ -217,8 +250,6 @@ private long triggerTimers(final K key, } timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp)); - - return keyOutputTimestamp; } } @@ -314,4 +345,31 @@ public TimerInternals timerInternalsForKey(final K key) { return stateAndTimerForKey.timerInternals; } } + + /** + * This class wraps the output collector to track the watermark hold of each key. + */ + final class GBKWOutputCollector implements OutputCollector>>> { + private final OutputCollector>>> outputCollector; + GBKWOutputCollector(final OutputCollector>>> outputCollector) { + this.outputCollector = outputCollector; + } + + @Override + public void emit(final WindowedValue>> output) { + // adds the output timestamp to the watermark hold of each key + // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 + keyAndWatermarkHoldMap.put(output.getValue().getKey(), + new Watermark(output.getTimestamp().getMillis() + 1)); + outputCollector.emit(output); + } + @Override + public void emitWatermark(final Watermark watermark) { + outputCollector.emitWatermark(watermark); + } + @Override + public void emit(final String dstVertexId, final T output) { + outputCollector.emit(dstVertexId, output); + } + } } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java index b7a960f13a..471dc03dfe 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java @@ -21,8 +21,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; @@ -57,13 +56,19 @@ private void checkOutput(final KV> expected, final KV expectedValue = new ArrayList<>(expected.getValue()); result.getValue().iterator().forEachRemaining(resultValue::add); Collections.sort(resultValue); + Collections.sort(expectedValue); assertEquals(expectedValue, resultValue); } - // [---- window1 --------] [--------------- window2 ---------------] - // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // window size: 2 sec + // interval size: 1 sec + // + // [--------------window2------------------------------] + // [----------------------- window1 --------------------------] + // [-------window0-------] + // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4 // (1, "hello") // (1, "world") // (2, "hello") @@ -78,13 +83,15 @@ private void checkOutput(final KV> expected, final KV outputTag = new TupleTag<>("main-output"); - final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); + final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2)) + .every(Duration.standardSeconds(1)); + final GroupByKeyAndWindowDoFnTransform doFnTransform = new GroupByKeyAndWindowDoFnTransform( NULL_OUTPUT_CODERS, outputTag, Collections.emptyList(), /* additional outputs */ - WindowingStrategy.of(fixedwindows), + WindowingStrategy.of(slidingWindows), emptyList(), /* side inputs */ PipelineOptionsFactory.as(NemoPipelineOptions.class), SystemReduceFn.buffering(NULL_INPUT_CODER)); @@ -99,6 +106,35 @@ public void test() { final Instant ts6 = new Instant(1800); final Instant ts7 = new Instant(1900); final Watermark watermark3 = new Watermark(2100); + final Instant ts8 = new Instant(2200); + final Instant ts9 = new Instant(2300); + final Watermark watermark4 = new Watermark(3000); + + + List sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1)); + Collections.sort(sortedWindows, new Comparator() { + @Override + public int compare(IntervalWindow o1, IntervalWindow o2) { + return o1.maxTimestamp().compareTo(o2.maxTimestamp()); + } + }); + + // [0---1000) + final IntervalWindow window0 = sortedWindows.get(0); + // [0---2000) + final IntervalWindow window1 = sortedWindows.get(1); + + sortedWindows.clear(); + sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4)); + Collections.sort(sortedWindows, new Comparator() { + @Override + public int compare(IntervalWindow o1, IntervalWindow o2) { + return o1.maxTimestamp().compareTo(o2.maxTimestamp()); + } + }); + + // [1000--3000) + final IntervalWindow window2 = sortedWindows.get(1); final Transform.Context context = mock(Transform.Context.class); @@ -106,35 +142,41 @@ public void test() { doFnTransform.prepare(context, oc); doFnTransform.onData(WindowedValue.of( - KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING)); doFnTransform.onWatermark(watermark); + // output + // 1: ["hello", "world"] + // 2: ["hello"] Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey())); // windowed result for key 1 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows()); + assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows()); checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue()); // windowed result for key 2 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows()); + assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows()); checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue()); + assertEquals(2, oc.outputs.size()); + assertEquals(1, oc.watermarks.size()); + // check output watermark - assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(), + assertEquals(1000, oc.watermarks.get(0).getTimestamp()); oc.outputs.clear(); oc.watermarks.clear(); doFnTransform.onData(WindowedValue.of( - KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING)); + KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING)); // do not emit anything doFnTransform.onWatermark(watermark2); @@ -142,33 +184,70 @@ public void test() { assertEquals(0, oc.watermarks.size()); doFnTransform.onData(WindowedValue.of( - KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING)); + KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING)); + KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING)); + KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING)); - // emit windowed value + // emit window1 doFnTransform.onWatermark(watermark3); + // output + // 1: ["hello", "world", "a"] + // 2: ["hello"] + // 3: ["a", "a", "b"] Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey())); + // windowed result for key 1 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows()); - checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue()); + assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows()); + checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue()); // windowed result for key 2 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows()); - checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue()); + assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows()); + checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue()); + + // windowed result for key 3 + assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows()); + checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue()); + + // check output watermark + assertEquals(2000, + oc.watermarks.get(0).getTimestamp()); + + oc.outputs.clear(); + oc.watermarks.clear(); + + + doFnTransform.onData(WindowedValue.of( + KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING)); + + doFnTransform.onData(WindowedValue.of( + KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING)); + + // emit window2 + doFnTransform.onWatermark(watermark4); + + // output + // 1: ["a", "a"] + // 3: ["a", "a", "b", "b"] + Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey())); + + assertEquals(2, oc.outputs.size()); + + // windowed result for key 1 + assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows()); + checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue()); // windowed result for key 3 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows()); - checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue()); + assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows()); + checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue()); // check output watermark - assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(), + assertEquals(3000, oc.watermarks.get(0).getTimestamp()); doFnTransform.close(); From 494f10370a2d0294946e8766d7ca4d84fcda029d Mon Sep 17 00:00:00 2001 From: taegeonum Date: Wed, 7 Nov 2018 12:12:19 +0900 Subject: [PATCH 2/3] Address comments --- .../GroupByKeyAndWindowDoFnTransform.java | 37 +++++++++---------- .../GroupByKeyAndWindowDoFnTransformTest.java | 15 ++------ 2 files changed, 21 insertions(+), 31 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 883b4da577..0d8c13d389 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -150,9 +150,6 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark, // Remove values values.clear(); } - - // Emit watermark to downstream operators - emitOutputWatermark(inputWatermark); } /** @@ -168,28 +165,26 @@ private void emitOutputWatermark(final Watermark inputWatermark) { } // Find min watermark hold - final Watermark watermarkHold = Collections.min(keyAndWatermarkHoldMap.values()); + final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values()); if (LOG.isDebugEnabled()) { LOG.debug("Watermark hold: {}, " - + "inputWatermark: {}, outputWatermark: {}", watermarkHold, inputWatermark, prevOutputWatermark); + + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark); } - if (watermarkHold.getTimestamp() > inputWatermark.getTimestamp()) { - if (inputWatermark.getTimestamp() > prevOutputWatermark.getTimestamp()) { - // progress! - prevOutputWatermark = inputWatermark; - getOutputCollector().emitWatermark(prevOutputWatermark); - } - } else { - // watermark hold < input watermark - if (watermarkHold.getTimestamp() > prevOutputWatermark.getTimestamp()) { - // progress! - prevOutputWatermark = watermarkHold; - // Remove minimum watermark holds + final Watermark outputWatermarkCandidate = new Watermark( + Math.max(prevOutputWatermark.getTimestamp(), + Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp()))); + + if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) { + // progress! + prevOutputWatermark = outputWatermarkCandidate; + // emit watermark + getOutputCollector().emitWatermark(outputWatermarkCandidate); + // Remove minimum watermark holds + if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) { keyAndWatermarkHoldMap.entrySet() - .removeIf(entry -> entry.getValue().getTimestamp() == watermarkHold.getTimestamp()); - getOutputCollector().emitWatermark(prevOutputWatermark); + .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp()); } } } @@ -197,6 +192,8 @@ private void emitOutputWatermark(final Watermark inputWatermark) { @Override public void onWatermark(final Watermark inputWatermark) { processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now()); + // Emit watermark to downstream operators + emitOutputWatermark(inputWatermark); } /** @@ -208,6 +205,8 @@ protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + // Emit watermark to downstream operators + emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); } /** diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java index 471dc03dfe..e3fa23eaaa 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java @@ -41,6 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +// TODO #270: Test different triggers public final class GroupByKeyAndWindowDoFnTransformTest { private final static Coder NULL_INPUT_CODER = null; @@ -112,12 +113,7 @@ public void test() { List sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1)); - Collections.sort(sortedWindows, new Comparator() { - @Override - public int compare(IntervalWindow o1, IntervalWindow o2) { - return o1.maxTimestamp().compareTo(o2.maxTimestamp()); - } - }); + Collections.sort(sortedWindows, IntervalWindow::compareTo); // [0---1000) final IntervalWindow window0 = sortedWindows.get(0); @@ -126,12 +122,7 @@ public int compare(IntervalWindow o1, IntervalWindow o2) { sortedWindows.clear(); sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4)); - Collections.sort(sortedWindows, new Comparator() { - @Override - public int compare(IntervalWindow o1, IntervalWindow o2) { - return o1.maxTimestamp().compareTo(o2.maxTimestamp()); - } - }); + Collections.sort(sortedWindows, IntervalWindow::compareTo); // [1000--3000) final IntervalWindow window2 = sortedWindows.get(1); From 1b6292cc8a5c28798f460c634077a13a2c546733 Mon Sep 17 00:00:00 2001 From: taegeonum Date: Wed, 7 Nov 2018 13:51:12 +0900 Subject: [PATCH 3/3] add commentsg --- .../beam/transform/GroupByKeyAndWindowDoFnTransform.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 0d8c13d389..d22517a274 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -358,6 +358,8 @@ final class GBKWOutputCollector implements OutputCollector>> output) { // adds the output timestamp to the watermark hold of each key // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 + // TODO #270: consider early firing + // TODO #270: This logic may not be applied to early firing outputs keyAndWatermarkHoldMap.put(output.getValue().getKey(), new Watermark(output.getTimestamp().getMillis() + 1)); outputCollector.emit(output);