From d2b3f48c81e2f3378b09e3b30f85a3a10214c49c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 1 Feb 2017 18:25:42 -0800 Subject: [PATCH] Drop late data in Flink runner --- .../wrappers/streaming/DoFnOperator.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index ac85b3c090da..4e52380c793a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; @@ -244,6 +245,8 @@ public Aggregator createAggregatorFor sideInputReader = sideInputHandler; } + ExecutionContext.StepContext stepContext = createStepContext(); + DoFnRunner doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), oldDoFn, @@ -251,10 +254,24 @@ public Aggregator createAggregatorFor outputManagerFactory.create(output), mainOutputTag, sideOutputTags, - createStepContext(), + stepContext, aggregatorFactory, windowingStrategy); + if (oldDoFn instanceof GroupAlsoByWindowViaWindowSetDoFn) { + // When the doFn is this, we know it came from WindowDoFnOperator and + // InputT = KeyedWorkItem + // OutputT = KV + // + // for some K, V + + doFnRunner = DoFnRunners.lateDataDroppingRunner( + (DoFnRunner) doFnRunner, + stepContext, + windowingStrategy, + ((GroupAlsoByWindowViaWindowSetDoFn) oldDoFn).getDroppedDueToLatenessAggregator()); + } + pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);