From 1686805d84316f2a8438c642214cdabe5b579381 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 12 Jul 2017 14:42:37 +0200 Subject: [PATCH 1/3] [BEAM-2571] Change DoFnOperator to use Long.MAX_VALUE as max watermark This is in line with what Flink does and what BoundedSourceWrapper and UnboundedSourceWrapper do. --- .../translation/wrappers/streaming/DoFnOperator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 350f32367a84c..751eba1fd432d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -334,7 +334,7 @@ public void close() throws Exception { protected final long getPushbackWatermarkHold() { // if we don't have side inputs we never hold the watermark if (sideInputs.isEmpty()) { - return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + return Long.MAX_VALUE; } try { @@ -353,7 +353,7 @@ private void checkInitPushedBackWatermark() { BagState> pushedBack = pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + long min = Long.MAX_VALUE; for (WindowedValue value : pushedBack.read()) { min = Math.min(min, value.getTimestamp().getMillis()); } @@ -426,7 +426,7 @@ public final void processElement2( } pushedBack.clear(); - long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + long min = Long.MAX_VALUE; for (WindowedValue pushedBackValue : newPushedBack) { min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); pushedBack.add(pushedBackValue); @@ -524,7 +524,7 @@ private void emitAllPushedBackData() throws Exception { pushedBack.clear(); - setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + setPushedBackWatermark(Long.MAX_VALUE); pushbackDoFnRunner.finishBundle(); } From 5c4a95aa63da23027b619a50928ebebe5beb05c2 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 12 Jul 2017 14:39:58 +0200 Subject: [PATCH 2/3] [BEAM-2571] Clarify pushedback variable name in DoFnOperator --- .../flink/translation/wrappers/streaming/DoFnOperator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 751eba1fd432d..8da8de5ae7509 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -470,15 +470,15 @@ public void processWatermark1(Watermark mark) throws Exception { setCurrentInputWatermark(mark.getTimestamp()); // hold back by the pushed back values waiting for side inputs - long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - timerService.advanceWatermark(actualInputWatermark); + timerService.advanceWatermark(pushedBackInputWatermark); Instant watermarkHold = stateInternals.watermarkHold(); long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); - long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); + long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold); if (potentialOutputWatermark > currentOutputWatermark) { setCurrentOutputWatermark(potentialOutputWatermark); From ade506526b4ff56eb4ed15e9eea04d1d3345bc13 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 12 Jul 2017 15:38:06 +0200 Subject: [PATCH 3/3] [BEAM-2571] Respect watermark contract in Flink DoFnOperator In Flink, a watermark T specifies that there will be no elements with a timestamp <= T in the future. In Beam, a watermark T specifies that there will not be element with a timestamp < T in the future. This leads to problems when the watermark is exactly "on the timer timestamp", most prominently, this happened with Triggers, where Flink would fire the Trigger too early and the Trigger would determine (based on the watermark) that it is not yet time to fire the window while Flink thought it was time. This also adds a test that specifially tests the edge case. --- .../wrappers/streaming/DoFnOperator.java | 13 +- .../flink/streaming/DoFnOperatorTest.java | 117 +++++++++++++++++- 2 files changed, 128 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8da8de5ae7509..8884ce10c77cf 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -472,7 +472,7 @@ public void processWatermark1(Watermark mark) throws Exception { // hold back by the pushed back values waiting for side inputs long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - timerService.advanceWatermark(pushedBackInputWatermark); + timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark)); Instant watermarkHold = stateInternals.watermarkHold(); @@ -500,6 +500,17 @@ public void processWatermark2(Watermark mark) throws Exception { } } + /** + * Converts a Beam watermark to a Flink watermark. This is only relevant when considering what + * event-time timers to fire: in Beam, a watermark {@code T} says there will not be any elements + * with a timestamp {@code < T} in the future. A Flink watermark {@code T} says there will not be + * any elements with a timestamp {@code <= T} in the future. We correct this by subtracting + * {@code 1} from a Beam watermark before passing to any relevant Flink runtime components. + */ + private static long toFlinkRuntimeWatermark(long beamWatermark) { + return beamWatermark - 1; + } + /** * Emits all pushed-back data. This should be used once we know that there will not be * any future side input, i.e. that there is no point in waiting. diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index ad9d236c53484..4d2a91254b6f6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -197,6 +198,118 @@ public void testMultiOutputOutput() throws Exception { testHarness.close(); } + /** + * This test specifically verifies that we correctly map Flink watermarks to Beam watermarks. In + * Beam, a watermark {@code T} guarantees there will not be elements with a timestamp + * {@code < T} in the future. In Flink, a watermark {@code T} guarantees there will not be + * elements with a timestamp {@code <= T} in the future. We have to make sure to take this into + * account when firing timers. + * + *

This not test the timer API in general or processing-time timers because there are generic + * tests for this in {@code ParDoTest}. + */ + @Test + public void testWatermarkContract() throws Exception { + + final Instant timerTimestamp = new Instant(1000); + final String outputMessage = "Timer fired"; + + WindowingStrategy windowingStrategy = + WindowingStrategy.of(FixedWindows.of(new Duration(10_000))); + + DoFn fn = new DoFn() { + private static final String EVENT_TIMER_ID = "eventTimer"; + + @TimerId(EVENT_TIMER_ID) + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(ProcessContext context, @TimerId(EVENT_TIMER_ID) Timer timer) { + timer.set(timerTimestamp); + } + + @OnTimer(EVENT_TIMER_ID) + public void onEventTime(OnTimerContext context) { + assertEquals( + "Timer timestamp must match set timestamp.", timerTimestamp, context.timestamp()); + context.outputWithTimestamp(outputMessage, context.timestamp()); + } + }; + + WindowedValue.FullWindowedValueCoder inputCoder = + WindowedValue.getFullCoder( + VarIntCoder.of(), + windowingStrategy.getWindowFn().windowCoder()); + + WindowedValue.FullWindowedValueCoder outputCoder = + WindowedValue.getFullCoder( + StringUtf8Coder.of(), + windowingStrategy.getWindowFn().windowCoder()); + + + TupleTag outputTag = new TupleTag<>("main-output"); + + DoFnOperator doFnOperator = new DoFnOperator<>( + fn, + "stepName", + inputCoder, + outputTag, + Collections.>emptyList(), + new DoFnOperator.DefaultOutputManagerFactory(), + windowingStrategy, + new HashMap>(), /* side-input mapping */ + Collections.>emptyList(), /* side inputs */ + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + VarIntCoder.of() /* key coder */); + + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + doFnOperator, + new KeySelector, Integer>() { + @Override + public Integer getKey(WindowedValue integerWindowedValue) throws Exception { + return integerWindowedValue.getValue(); + } + }, + new CoderTypeInformation<>(VarIntCoder.of())); + + testHarness.setup(new CoderTypeSerializer<>(outputCoder)); + + testHarness.open(); + + testHarness.processWatermark(0); + + IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10_000)); + + // this should register a timer + testHarness.processElement( + new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING))); + + assertThat( + this.stripStreamRecordFromWindowedValue(testHarness.getOutput()), + emptyIterable()); + + // this does not yet fire the timer (in vanilla Flink it would) + testHarness.processWatermark(timerTimestamp.getMillis()); + + assertThat( + this.stripStreamRecordFromWindowedValue(testHarness.getOutput()), + emptyIterable()); + + testHarness.getOutput().clear(); + + // this must fire the timer + testHarness.processWatermark(timerTimestamp.getMillis() + 1); + + assertThat( + this.stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.of( + outputMessage, new Instant(timerTimestamp), window1, PaneInfo.NO_FIRING))); + + testHarness.close(); + } + @Test public void testLateDroppingForStatefulFn() throws Exception { @@ -394,11 +507,13 @@ public String getKey( // this should trigger both the window.maxTimestamp() timer and the GC timer // this tests that the GC timer fires after the user timer + // we have to add 1 here because Flink timers fire when watermark >= timestamp while Beam + // timers fire when watermark > timestamp testHarness.processWatermark( window1.maxTimestamp() .plus(windowingStrategy.getAllowedLateness()) .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS) - .getMillis()); + .getMillis() + 1); assertThat( this.>stripStreamRecordFromWindowedValue(testHarness.getOutput()),