diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 9205bce1a83a..42a8833fea7b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -17,7 +17,8 @@ */ package org.apache.beam.runners.flink.translation.functions; -import java.util.Collections; +import com.google.common.collect.Lists; +import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -97,13 +98,14 @@ public void mapPartition( new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); } + List> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); + DoFnRunner doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), doFn, new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of additional outputs - Collections.>emptyList(), + additionalOutputTags, new FlinkNoOpStepContext(), windowingStrategy); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 6517bf2c1a9f..b07576893d01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -19,8 +19,9 @@ import static org.apache.flink.util.Preconditions.checkArgument; -import java.util.Collections; +import com.google.common.collect.Lists; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -114,13 +115,14 @@ public void reduce( timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + List> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); + DoFnRunner, OutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), dofn, new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of additional outputs - Collections.>emptyList(), + additionalOutputTags, new FlinkNoOpStepContext() { @Override public StateInternals stateInternals() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index d2ab7e1cabbd..e47304690b1c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -19,6 +19,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import java.io.DataInputStream; @@ -129,6 +130,8 @@ public class DoFnOperator protected transient long currentInputWatermark; + protected transient long currentSideInputWatermark; + protected transient long currentOutputWatermark; private transient StateTag>> pushedBackTag; @@ -197,6 +200,7 @@ public void open() throws Exception { super.open(); setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); sideInputReader = NullSideInputReader.of(sideInputs); @@ -308,6 +312,21 @@ public void open() throws Exception { @Override public void close() throws Exception { super.close(); + + // sanity check: these should have been flushed out by +Inf watermarks + if (pushbackStateInternals != null) { + BagState> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Iterable> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { + if (!Iterables.isEmpty(pushedBackContents)) { + String pushedBackString = Joiner.on(",").join(pushedBackContents); + throw new RuntimeException( + "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + } + } + } doFnInvoker.invokeTeardown(); } @@ -457,36 +476,56 @@ public void processWatermark1(Watermark mark) throws Exception { } pushbackDoFnRunner.finishBundle(); } + + // We do the check here because we are guaranteed to at least get the +Inf watermark on the + // main input when the job finishes. + if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + // we also do the check here because we might have received the side-input MAX watermark + // before receiving any main-input data + emitAllPushedBackData(); + } } @Override public void processWatermark2(Watermark mark) throws Exception { - if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + setCurrentSideInputWatermark(mark.getTimestamp()); + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { // this means we will never see any more side input - pushbackDoFnRunner.startBundle(); + emitAllPushedBackData(); - BagState> pushedBack = - pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + } - Iterable> pushedBackContents = pushedBack.read(); - if (pushedBackContents != null) { - for (WindowedValue elem : pushedBackContents) { + /** + * Emits all pushed-back data. This should be used once we know that there will not be + * any future side input, i.e. that there is no point in waiting. + */ + private void emitAllPushedBackData() throws Exception { + pushbackDoFnRunner.startBundle(); - // we need to set the correct key in case the operator is - // a (keyed) window operator - setKeyContextElement1(new StreamRecord<>(elem)); + BagState> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); - doFnRunner.processElement(elem); - } + Iterable> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { + for (WindowedValue elem : pushedBackContents) { + + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(elem)); + + doFnRunner.processElement(elem); } + } - setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + pushedBack.clear(); - pushbackDoFnRunner.finishBundle(); + setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); - // maybe output a new watermark - processWatermark1(new Watermark(currentInputWatermark)); - } + pushbackDoFnRunner.finishBundle(); } @Override @@ -610,6 +649,10 @@ private void setCurrentInputWatermark(long currentInputWatermark) { this.currentInputWatermark = currentInputWatermark; } + private void setCurrentSideInputWatermark(long currentInputWatermark) { + this.currentSideInputWatermark = currentInputWatermark; + } + private void setCurrentOutputWatermark(long currentOutputWatermark) { this.currentOutputWatermark = currentOutputWatermark; }