From 159003267a0e05c8c90cb5bbb3565980514da9d5 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 30 May 2017 16:12:23 -0700 Subject: [PATCH 1/2] Flink*DoFnFunction: fix check for single-output dofns Fixes Findbugs and (presumably) increases efficiency --- .../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +- .../flink/translation/functions/FlinkStatefulDoFnFunction.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 42a8833fea7b..ab2ac6b0ea2f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -90,7 +90,7 @@ public void mapPartition( RuntimeContext runtimeContext = getRuntimeContext(); DoFnRunners.OutputManager outputManager; - if (outputMap == null) { + if (outputMap.size() == 1) { outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); } else { // it has some additional outputs diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index b07576893d01..11d4fee48957 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -91,7 +91,7 @@ public void reduce( RuntimeContext runtimeContext = getRuntimeContext(); DoFnRunners.OutputManager outputManager; - if (outputMap == null) { + if (outputMap.size() == 1) { outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); } else { // it has some additional Outputs From e0434d352ed30d32a502a76f1ec5317027468cb4 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 31 May 2017 10:50:46 -0700 Subject: [PATCH 2/2] Add RawUnion code to FlinkDoFnFunction --- .../flink/translation/functions/FlinkDoFnFunction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index ab2ac6b0ea2f..d8ed622ffb89 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -146,7 +146,9 @@ static class DoFnOutputManager @Override @SuppressWarnings("unchecked") public void output(TupleTag tag, WindowedValue output) { - collector.collect(output); + collector.collect( + WindowedValue.of(new RawUnionValue(0 /* single output */, output.getValue()), + output.getTimestamp(), output.getWindows(), output.getPane())); } }