From 0454a1897c645f674754bc9ef69dc7bab2b3c3ba Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 1 Feb 2017 18:25:42 -0800 Subject: [PATCH] Drop late data in Flink runner --- .../wrappers/streaming/DoFnOperator.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index de0264a8f705..c1d33f750081 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -234,6 +235,8 @@ public Aggregator createAggregatorFor doFnInvoker.invokeSetup(); + ExecutionContext.StepContext stepContext = createStepContext(); + DoFnRunner doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), doFn, @@ -241,13 +244,26 @@ public Aggregator createAggregatorFor outputManager, mainOutputTag, sideOutputTags, - createStepContext(), + stepContext, aggregatorFactory, windowingStrategy); + if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) { + // When the doFn is this, we know it came from WindowDoFnOperator and + // InputT = KeyedWorkItem + // OutputT = KV + // + // for some K, V + + doFnRunner = DoFnRunners.lateDataDroppingRunner( + (DoFnRunner) doFnRunner, + stepContext, + windowingStrategy, + ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator()); + } + pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); - } @Override