From 84fef898e91b953371054f70640a96ad74643750 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Author=3A=20=E6=B3=A2=E7=89=B9?= Date: Fri, 26 May 2017 17:40:27 +0800 Subject: [PATCH] ReduceFnRunner.onTrigger: skip storeCurrentPaneInfo() if trigger isFinished. --- .../java/org/apache/beam/runners/core/ReduceFnRunner.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 62d519f6f8e5..b5c3e3ecc016 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -948,7 +948,7 @@ private void prefetchOnTrigger( private Instant onTrigger( final ReduceFn.Context directContext, ReduceFn.Context renamedContext, - boolean isFinished, boolean isEndOfWindow) + final boolean isFinished, boolean isEndOfWindow) throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); @@ -1005,9 +1005,11 @@ private Instant onTrigger( @Override public void output(OutputT toOutput) { // We're going to output panes, so commit the (now used) PaneInfo. - // TODO: This is unnecessary if the trigger isFinished since the saved + // This is unnecessary if the trigger isFinished since the saved // state will be immediately deleted. - paneInfoTracker.storeCurrentPaneInfo(directContext, pane); + if (!isFinished) { + paneInfoTracker.storeCurrentPaneInfo(directContext, pane); + } // Output the actual value. outputter.outputWindowedValue(