Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.functions;

import java.util.Collections;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
Expand Down Expand Up @@ -97,13 +98,14 @@ public void mapPartition(
new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
}

List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());

DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(), doFn,
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
// see SimpleDoFnRunner, just use it to limit number of additional outputs
Collections.<TupleTag<?>>emptyList(),
additionalOutputTags,
new FlinkNoOpStepContext(),
windowingStrategy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import static org.apache.flink.util.Preconditions.checkArgument;

import java.util.Collections;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
Expand Down Expand Up @@ -114,13 +115,14 @@ public void reduce(
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());

List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());

DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(), dofn,
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
// see SimpleDoFnRunner, just use it to limit number of additional outputs
Collections.<TupleTag<?>>emptyList(),
additionalOutputTags,
new FlinkNoOpStepContext() {
@Override
public StateInternals stateInternals() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.flink.util.Preconditions.checkArgument;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
Expand Down Expand Up @@ -129,6 +130,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>

protected transient long currentInputWatermark;

protected transient long currentSideInputWatermark;

protected transient long currentOutputWatermark;

private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
Expand Down Expand Up @@ -197,6 +200,7 @@ public void open() throws Exception {
super.open();

setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());

sideInputReader = NullSideInputReader.of(sideInputs);
Expand Down Expand Up @@ -308,6 +312,21 @@ public void open() throws Exception {
@Override
public void close() throws Exception {
super.close();

// sanity check: these should have been flushed out by +Inf watermarks
if (pushbackStateInternals != null) {
BagState<WindowedValue<InputT>> pushedBack =
pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);

Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
if (pushedBackContents != null) {
if (!Iterables.isEmpty(pushedBackContents)) {
String pushedBackString = Joiner.on(",").join(pushedBackContents);
throw new RuntimeException(
"Leftover pushed-back data: " + pushedBackString + ". This indicates a bug.");
}
}
}
doFnInvoker.invokeTeardown();
}

Expand Down Expand Up @@ -457,36 +476,56 @@ public void processWatermark1(Watermark mark) throws Exception {
}
pushbackDoFnRunner.finishBundle();
}

// We do the check here because we are guaranteed to at least get the +Inf watermark on the
// main input when the job finishes.
if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
// this means we will never see any more side input
// we also do the check here because we might have received the side-input MAX watermark
// before receiving any main-input data
emitAllPushedBackData();
}
}

@Override
public void processWatermark2(Watermark mark) throws Exception {
if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
setCurrentSideInputWatermark(mark.getTimestamp());
if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
// this means we will never see any more side input
pushbackDoFnRunner.startBundle();
emitAllPushedBackData();

BagState<WindowedValue<InputT>> pushedBack =
pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
// maybe output a new watermark
processWatermark1(new Watermark(currentInputWatermark));
}
}

Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
if (pushedBackContents != null) {
for (WindowedValue<InputT> elem : pushedBackContents) {
/**
* Emits all pushed-back data. This should be used once we know that there will not be
* any future side input, i.e. that there is no point in waiting.
*/
private void emitAllPushedBackData() throws Exception {
pushbackDoFnRunner.startBundle();

// we need to set the correct key in case the operator is
// a (keyed) window operator
setKeyContextElement1(new StreamRecord<>(elem));
BagState<WindowedValue<InputT>> pushedBack =
pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);

doFnRunner.processElement(elem);
}
Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
if (pushedBackContents != null) {
for (WindowedValue<InputT> elem : pushedBackContents) {

// we need to set the correct key in case the operator is
// a (keyed) window operator
setKeyContextElement1(new StreamRecord<>(elem));

doFnRunner.processElement(elem);
}
}

setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
pushedBack.clear();

pushbackDoFnRunner.finishBundle();
setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());

// maybe output a new watermark
processWatermark1(new Watermark(currentInputWatermark));
}
pushbackDoFnRunner.finishBundle();
}

@Override
Expand Down Expand Up @@ -610,6 +649,10 @@ private void setCurrentInputWatermark(long currentInputWatermark) {
this.currentInputWatermark = currentInputWatermark;
}

private void setCurrentSideInputWatermark(long currentInputWatermark) {
this.currentSideInputWatermark = currentInputWatermark;
}

private void setCurrentOutputWatermark(long currentOutputWatermark) {
this.currentOutputWatermark = currentOutputWatermark;
}
Expand Down