diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java index 27ee53b3fa..e676e6e8c9 100644 --- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java +++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java @@ -24,7 +24,7 @@ /** * Watermark event. */ -public final class Watermark implements Serializable { +public final class Watermark implements Serializable, Comparable { private final long timestamp; public Watermark(final long timestamp) { @@ -47,8 +47,18 @@ public boolean equals(final Object o) { return timestamp == watermark.timestamp; } + @Override + public String toString() { + return String.valueOf(timestamp); + } + @Override public int hashCode() { return Objects.hash(timestamp); } + + @Override + public int compareTo(final Watermark o) { + return Long.compare(timestamp, o.getTimestamp()); + } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java index 3585ea2fd6..6a8f8d49a9 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java @@ -159,7 +159,7 @@ protected final void checkAndFinishBundle() { public final void prepare(final Context context, final OutputCollector> oc) { // deserialize pipeline option final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class); - this.outputCollector = oc; + this.outputCollector = wrapOutputCollector(oc); this.bundleMillis = options.getMaxBundleTimeMills(); this.bundleSize = options.getMaxBundleSize(); @@ -227,6 +227,13 @@ public final void close() { */ abstract DoFn wrapDoFn(final DoFn originalDoFn); + /** + * An abstract function that wraps the original output collector. + * @param oc the original outputCollector. + * @return wrapped output collector. + */ + abstract OutputCollector wrapOutputCollector(final OutputCollector oc); + @Override public abstract void onData(final WindowedValue data); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java index 433f9df1ff..18368c601e 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.punctuation.Watermark; import java.util.Collection; @@ -83,6 +84,11 @@ protected void beforeClose() { // nothing } + @Override + OutputCollector wrapOutputCollector(final OutputCollector oc) { + return oc; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 372784629a..84b6835b95 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.punctuation.Watermark; import org.joda.time.Instant; import org.slf4j.Logger; @@ -48,7 +49,8 @@ public final class GroupByKeyAndWindowDoFnTransform private final Map>> keyToValues; private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory; private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory; - private long currentOutputWatermark; + private Watermark prevOutputWatermark; + private final Map keyAndWatermarkHoldMap; /** * GroupByKey constructor. @@ -70,7 +72,8 @@ public GroupByKeyAndWindowDoFnTransform(final Map, Coder> outputC options); this.keyToValues = new HashMap<>(); this.reduceFn = reduceFn; - this.currentOutputWatermark = Long.MIN_VALUE; + this.prevOutputWatermark = new Watermark(Long.MIN_VALUE); + this.keyAndWatermarkHoldMap = new HashMap<>(); } /** @@ -96,6 +99,11 @@ protected DoFn wrapDoFn(final DoFn doFn) { getMainOutputTag()); } + @Override + OutputCollector wrapOutputCollector(final OutputCollector oc) { + return new GBKWOutputCollector(oc); + } + /** * It collects data for each key. * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)} @@ -126,8 +134,6 @@ public void onData(final WindowedValue> element) { private void processElementsAndTriggerTimers(final Watermark inputWatermark, final Instant processingTime, final Instant synchronizedTime) { - long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE; - for (final Map.Entry>> entry : keyToValues.entrySet()) { final K key = entry.getKey(); final List> values = entry.getValue(); @@ -143,20 +149,47 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark, } // Trigger timers - final long minOutputTimestamp = - triggerTimers(key, inputWatermark, processingTime, synchronizedTime); - - minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp); + triggerTimers(key, inputWatermark, processingTime, synchronizedTime); // Remove values values.clear(); } + } - // Emit watermark to downstream operators - if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE - && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) { - currentOutputWatermark = minOutputTimestampsOfEmittedWindows; - getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows)); + /** + * Output watermark + * = max(prev output watermark, + * min(input watermark, watermark holds)). + * @param inputWatermark input watermark + */ + private void emitOutputWatermark(final Watermark inputWatermark) { + + if (keyAndWatermarkHoldMap.isEmpty()) { + return; + } + + // Find min watermark hold + final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Watermark hold: {}, " + + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark); + } + + final Watermark outputWatermarkCandidate = new Watermark( + Math.max(prevOutputWatermark.getTimestamp(), + Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp()))); + + if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) { + // progress! + prevOutputWatermark = outputWatermarkCandidate; + // emit watermark + getOutputCollector().emitWatermark(outputWatermarkCandidate); + // Remove minimum watermark holds + if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) { + keyAndWatermarkHoldMap.entrySet() + .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp()); + } } } @@ -164,6 +197,8 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark, public void onWatermark(final Watermark inputWatermark) { checkAndInvokeBundle(); processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now()); + // Emit watermark to downstream operators + emitOutputWatermark(inputWatermark); checkAndFinishBundle(); } @@ -176,6 +211,8 @@ protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + // Emit watermark to downstream operators + emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); } /** @@ -185,10 +222,8 @@ protected void beforeClose() { * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time - * @return the minimum output timestamp. - * If no data is emitted, it returns Long.MAX_VALUE. */ - private long triggerTimers(final K key, + private void triggerTimers(final K key, final Watermark watermark, final Instant processingTime, final Instant synchronizedTime) { @@ -204,10 +239,7 @@ private long triggerTimers(final K key, final List timerDataList = getEligibleTimers(timerInternals); - if (timerDataList.isEmpty()) { - return Long.MAX_VALUE; - } else { - + if (!timerDataList.isEmpty()) { // Trigger timers and emit windowed data final KeyedWorkItem timerWorkItem = KeyedWorkItems.timersWorkItem(key, timerDataList); @@ -223,8 +255,6 @@ private long triggerTimers(final K key, } timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp)); - - return keyOutputTimestamp; } } @@ -320,4 +350,33 @@ public TimerInternals timerInternalsForKey(final K key) { return stateAndTimerForKey.timerInternals; } } + + /** + * This class wraps the output collector to track the watermark hold of each key. + */ + final class GBKWOutputCollector implements OutputCollector>>> { + private final OutputCollector>>> outputCollector; + GBKWOutputCollector(final OutputCollector>>> outputCollector) { + this.outputCollector = outputCollector; + } + + @Override + public void emit(final WindowedValue>> output) { + // adds the output timestamp to the watermark hold of each key + // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 + // TODO #270: consider early firing + // TODO #270: This logic may not be applied to early firing outputs + keyAndWatermarkHoldMap.put(output.getValue().getKey(), + new Watermark(output.getTimestamp().getMillis() + 1)); + outputCollector.emit(output); + } + @Override + public void emitWatermark(final Watermark watermark) { + outputCollector.emitWatermark(watermark); + } + @Override + public void emit(final String dstVertexId, final T output) { + outputCollector.emit(dstVertexId, output); + } + } } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java index b7a960f13a..e3fa23eaaa 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java @@ -21,8 +21,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; @@ -42,6 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +// TODO #270: Test different triggers public final class GroupByKeyAndWindowDoFnTransformTest { private final static Coder NULL_INPUT_CODER = null; @@ -57,13 +57,19 @@ private void checkOutput(final KV> expected, final KV expectedValue = new ArrayList<>(expected.getValue()); result.getValue().iterator().forEachRemaining(resultValue::add); Collections.sort(resultValue); + Collections.sort(expectedValue); assertEquals(expectedValue, resultValue); } - // [---- window1 --------] [--------------- window2 ---------------] - // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7 + // window size: 2 sec + // interval size: 1 sec + // + // [--------------window2------------------------------] + // [----------------------- window1 --------------------------] + // [-------window0-------] + // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4 // (1, "hello") // (1, "world") // (2, "hello") @@ -78,13 +84,15 @@ private void checkOutput(final KV> expected, final KV outputTag = new TupleTag<>("main-output"); - final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); + final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2)) + .every(Duration.standardSeconds(1)); + final GroupByKeyAndWindowDoFnTransform doFnTransform = new GroupByKeyAndWindowDoFnTransform( NULL_OUTPUT_CODERS, outputTag, Collections.emptyList(), /* additional outputs */ - WindowingStrategy.of(fixedwindows), + WindowingStrategy.of(slidingWindows), emptyList(), /* side inputs */ PipelineOptionsFactory.as(NemoPipelineOptions.class), SystemReduceFn.buffering(NULL_INPUT_CODER)); @@ -99,6 +107,25 @@ public void test() { final Instant ts6 = new Instant(1800); final Instant ts7 = new Instant(1900); final Watermark watermark3 = new Watermark(2100); + final Instant ts8 = new Instant(2200); + final Instant ts9 = new Instant(2300); + final Watermark watermark4 = new Watermark(3000); + + + List sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1)); + Collections.sort(sortedWindows, IntervalWindow::compareTo); + + // [0---1000) + final IntervalWindow window0 = sortedWindows.get(0); + // [0---2000) + final IntervalWindow window1 = sortedWindows.get(1); + + sortedWindows.clear(); + sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4)); + Collections.sort(sortedWindows, IntervalWindow::compareTo); + + // [1000--3000) + final IntervalWindow window2 = sortedWindows.get(1); final Transform.Context context = mock(Transform.Context.class); @@ -106,35 +133,41 @@ public void test() { doFnTransform.prepare(context, oc); doFnTransform.onData(WindowedValue.of( - KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING)); + KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING)); + KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING)); + KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING)); doFnTransform.onWatermark(watermark); + // output + // 1: ["hello", "world"] + // 2: ["hello"] Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey())); // windowed result for key 1 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows()); + assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows()); checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue()); // windowed result for key 2 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), oc.outputs.get(1).getWindows()); + assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows()); checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue()); + assertEquals(2, oc.outputs.size()); + assertEquals(1, oc.watermarks.size()); + // check output watermark - assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(), + assertEquals(1000, oc.watermarks.get(0).getTimestamp()); oc.outputs.clear(); oc.watermarks.clear(); doFnTransform.onData(WindowedValue.of( - KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING)); + KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING)); // do not emit anything doFnTransform.onWatermark(watermark2); @@ -142,33 +175,70 @@ public void test() { assertEquals(0, oc.watermarks.size()); doFnTransform.onData(WindowedValue.of( - KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING)); + KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING)); + KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING)); doFnTransform.onData(WindowedValue.of( - KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING)); + KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING)); - // emit windowed value + // emit window1 doFnTransform.onWatermark(watermark3); + // output + // 1: ["hello", "world", "a"] + // 2: ["hello"] + // 3: ["a", "a", "b"] Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey())); + // windowed result for key 1 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows()); - checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue()); + assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows()); + checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue()); // windowed result for key 2 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts5)), oc.outputs.get(1).getWindows()); - checkOutput(KV.of("2", Arrays.asList("a", "b")), oc.outputs.get(1).getValue()); + assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows()); + checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue()); + + // windowed result for key 3 + assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows()); + checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue()); + + // check output watermark + assertEquals(2000, + oc.watermarks.get(0).getTimestamp()); + + oc.outputs.clear(); + oc.watermarks.clear(); + + + doFnTransform.onData(WindowedValue.of( + KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING)); + + doFnTransform.onData(WindowedValue.of( + KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING)); + + // emit window2 + doFnTransform.onWatermark(watermark4); + + // output + // 1: ["a", "a"] + // 3: ["a", "a", "b", "b"] + Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey())); + + assertEquals(2, oc.outputs.size()); + + // windowed result for key 1 + assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows()); + checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue()); // windowed result for key 3 - assertEquals(Arrays.asList(fixedwindows.assignWindow(ts6)), oc.outputs.get(2).getWindows()); - checkOutput(KV.of("3", Arrays.asList("a")), oc.outputs.get(2).getValue()); + assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows()); + checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue()); // check output watermark - assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(), + assertEquals(3000, oc.watermarks.get(0).getTimestamp()); doFnTransform.close();