From 3b143f5f90c133fb9ad7e74a01bb6bdbd57a00a6 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Thu, 10 Nov 2016 12:59:49 -0800 Subject: [PATCH 1/5] Improvements to ReduceFnRunner prefetching: - add prefetch* methods for prefetching state matching existing methods - replace onTimer with batched onTimers method to allow prefetching across timers - prefetch triggers in processElements --- .../GroupAlsoByWindowViaWindowSetDoFn.java | 11 +- .../beam/runners/core/PaneInfoTracker.java | 6 + .../runners/core/ReduceFnContextFactory.java | 9 +- .../beam/runners/core/ReduceFnRunner.java | 311 +++++++++++------- .../beam/runners/core/WatermarkHold.java | 5 + .../triggers/TriggerStateMachineRunner.java | 14 +- .../beam/runners/core/ReduceFnTester.java | 4 +- .../GroupAlsoByWindowEvaluatorFactory.java | 5 +- .../beam/sdk/transforms/DoFnTester.java | 6 +- .../util/state/InMemoryTimerInternals.java | 22 +- .../beam/sdk/util/state/TimerCallback.java | 9 +- .../state/InMemoryTimerInternalsTest.java | 54 ++- 12 files changed, 295 insertions(+), 161 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index dde883c6d60c..a234947f5476 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; @@ -73,9 +72,9 @@ private GroupAlsoByWindowViaWindowSetDoFn( @Override public void processElement(ProcessContext c) throws Exception { - KeyedWorkItem element = c.element(); + KeyedWorkItem keyedWorkItem = c.element(); - K key = c.element().key(); + K key = keyedWorkItem.key(); TimerInternals timerInternals = c.windowingInternals().timerInternals(); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); @@ -92,10 +91,8 @@ public void processElement(ProcessContext c) throws Exception { reduceFn, c.getPipelineOptions()); - reduceFnRunner.processElements(element.elementsIterable()); - for (TimerData timer : element.timersIterable()) { - reduceFnRunner.onTimer(timer); - } + reduceFnRunner.processElements(keyedWorkItem.elementsIterable()); + reduceFnRunner.onTimers(keyedWorkItem.timersIterable()); reduceFnRunner.persist(); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index 114f5e6313a4..f904bc6e571b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -55,6 +55,12 @@ public void clear(StateAccessor state) { state.access(PANE_INFO_TAG).clear(); } + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchPaneInfo(ReduceFn.Context context) { + context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater(); + } + /** * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane * info includes the timing for the pane, who's calculation is quite subtle. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 500c6e7f827a..9578810f228f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateAccessor; import org.apache.beam.sdk.util.state.StateContext; @@ -104,7 +103,7 @@ public ReduceFn.ProcessValueContext forValue( } public ReduceFn.OnTriggerContext forTrigger(W window, - ReadableState pane, StateStyle style, OnTriggerCallbacks callbacks) { + PaneInfo pane, StateStyle style, OnTriggerCallbacks callbacks) { return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks); } @@ -376,11 +375,11 @@ public Timers timers() { private class OnTriggerContextImpl extends ReduceFn.OnTriggerContext { private final StateAccessorImpl state; - private final ReadableState pane; + private final PaneInfo pane; private final OnTriggerCallbacks callbacks; private final TimersImpl timers; - private OnTriggerContextImpl(StateAccessorImpl state, ReadableState pane, + private OnTriggerContextImpl(StateAccessorImpl state, PaneInfo pane, OnTriggerCallbacks callbacks) { reduceFn.super(); this.state = state; @@ -411,7 +410,7 @@ public StateAccessor state() { @Override public PaneInfo paneInfo() { - return pane.read(); + return pane; } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 66fb27c4a28f..04a533b9b847 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -22,11 +22,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -58,7 +61,6 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; import org.apache.beam.sdk.util.state.TimerCallback; @@ -292,6 +294,10 @@ boolean hasNoActiveWindows() { * */ public void processElements(Iterable> values) throws Exception { + if (!values.iterator().hasNext()) { + return; + } + // If an incoming element introduces a new window, attempt to merge it into an existing // window eagerly. Map windowToMergeResult = collectAndMergeWindows(values); @@ -303,14 +309,34 @@ public void processElements(Iterable> values) throws Excep windowsToConsider.addAll(processElement(windowToMergeResult, value)); } - // Trigger output from any window for which the trigger is ready - for (W mergedWindow : windowsToConsider) { - ReduceFn.Context directContext = - contextFactory.base(mergedWindow, StateStyle.DIRECT); - ReduceFn.Context renamedContext = - contextFactory.base(mergedWindow, StateStyle.RENAMED); - triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); - emitIfAppropriate(directContext, renamedContext); + if (!windowsToConsider.isEmpty()) { + // Prefetch state necessary to determine if the triggers should fire. + for (W mergedWindow : windowsToConsider) { + ReduceFn.Context directContext = + contextFactory.base(mergedWindow, StateStyle.DIRECT); + triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); + } + + // Filter out timers that didn't trigger and prefetch state for triggering. + for (Iterator it = windowsToConsider.iterator(); it.hasNext(); ) { + W mergedWindow = it.next(); + ReduceFn.Context directContext = + contextFactory.base(mergedWindow, StateStyle.DIRECT); + if (triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + ReduceFn.Context renamedContext = + contextFactory.base(mergedWindow, StateStyle.RENAMED); + prefetchEmitPane(directContext, renamedContext); + } else { + it.remove(); + } + } + + // Trigger output from any window for which the trigger is ready. + for (W mergedWindow : windowsToConsider) { + emitPane(contextFactory.base(mergedWindow, StateStyle.DIRECT), + contextFactory.base(mergedWindow, StateStyle.RENAMED)); + } } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -565,98 +591,146 @@ private Collection processElement(Map windowToMergeResult, WindowedValu return triggerableWindows; } - /** - * Called when an end-of-window, garbage collection, or trigger-specific timer fires. - */ - public void onTimer(TimerData timer) throws Exception { - // Which window is the timer for? - checkArgument(timer.getNamespace() instanceof WindowNamespace, - "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); - @SuppressWarnings("unchecked") - WindowNamespace windowNamespace = (WindowNamespace) timer.getNamespace(); - W window = windowNamespace.getWindow(); - ReduceFn.Context directContext = - contextFactory.base(window, StateStyle.DIRECT); - ReduceFn.Context renamedContext = - contextFactory.base(window, StateStyle.RENAMED); + private class TimerContext { + public final Instant timestamp; + public final ReduceFn.Context directContext; + public final ReduceFn.Context renamedContext; + // If this is an end-of-window timer then we may need to set a garbage collection timer + // if allowed lateness is non-zero. + public final boolean isEndOfWindow; + // If this is a garbage collection timer then we should trigger and + // garbage collect the window. We'll consider any timer at or after the + // end-of-window time to be a signal to garbage collect. + public final boolean isGarbageCollection; + + TimerContext( + TimerData timer, + ReduceFn.Context directContext, + ReduceFn.Context renamedContext) { + this.timestamp = timer.getTimestamp(); + this.directContext = directContext; + this.renamedContext = renamedContext; + W window = directContext.window(); + this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + Instant cleanupTime = garbageCollectionTime(window); + this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); + } // Has this window had its trigger finish? // - The trigger may implement isClosed as constant false. // - If the window function does not support windowing then all windows will be considered // active. // So we must take conjunction of activeWindows and triggerRunner state. - boolean windowIsActiveAndOpen = - activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state()); + public boolean windowIsActiveAndOpen() { + return activeWindows.isActive(directContext.window()) + && !triggerRunner.isClosed(directContext.state()); + } + } - if (!windowIsActiveAndOpen) { - WindowTracing.debug( - "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); + public void onTimers(Iterable timers) throws Exception { + if (!timers.iterator().hasNext()) { + return; } - // If this is an end-of-window timer then we may need to set a garbage collection timer - // if allowed lateness is non-zero. - boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); - - // If this is a garbage collection timer then we should trigger and garbage collect the window. - // We'll consider any timer at or after the end-of-window time to be a signal to garbage - // collect. - Instant cleanupTime = garbageCollectionTime(window); - boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain() - && !timer.getTimestamp().isBefore(cleanupTime); - - if (isGarbageCollection) { - WindowTracing.debug( - "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - - if (windowIsActiveAndOpen) { - // We need to call onTrigger to emit the final pane if required. - // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, - // and the watermark has passed the end of the window. - @Nullable Instant newHold = - onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); - checkState(newHold == null, - "Hold placed at %s despite isFinished being true.", newHold); + // Create a reusable context for each timer and begin prefetching necessary + // state. + List timerContexts = new LinkedList(); + for (TimerData timer : timers) { + checkArgument(timer.getNamespace() instanceof WindowNamespace, + "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); + @SuppressWarnings("unchecked") + WindowNamespace windowNamespace = (WindowNamespace) timer.getNamespace(); + W window = windowNamespace.getWindow(); + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + ReduceFn.Context renamedContext = + contextFactory.base(window, StateStyle.RENAMED); + TimerContext timerContext = new TimerContext(timer, directContext, renamedContext); + timerContexts.add(timerContext); + + // Perform prefetching of state to determine if the trigger should fire. + if (!timerContext.isGarbageCollection) { + triggerRunner.prefetchIsClosed(directContext.state()); + } else { + triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); } + } - // Cleanup flavor B: Clear all the remaining state for this window since we'll never - // see elements for it again. - clearAllState(directContext, renamedContext, windowIsActiveAndOpen); - } else { - WindowTracing.debug( - "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - if (windowIsActiveAndOpen) { - emitIfAppropriate(directContext, renamedContext); + // For those windows that are active and open, prefetch the triggering or emitting state. + for (TimerContext timerContext : timerContexts) { + if (timerContext.windowIsActiveAndOpen()) { + ReduceFn.Context directContext = timerContext.directContext; + if (timerContext.isGarbageCollection) { + prefetchOnTrigger(directContext, timerContext.renamedContext); + } else if (triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + prefetchEmitPane(directContext, timerContext.renamedContext); + } } + } - if (isEndOfWindow) { - // If the window strategy trigger includes a watermark trigger then at this point - // there should be no data holds, either because we'd already cleared them on an - // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. - // We could assert this but it is very expensive. - - // Since we are processing an on-time firing we should schedule the garbage collection - // timer. (If getAllowedLateness is zero then the timer event will be considered a - // cleanup event and handled by the above). - // Note we must do this even if the trigger is finished so that we are sure to cleanup - // any final trigger finished bits. - checkState( - windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), - "Unexpected zero getAllowedLateness"); - WindowTracing.debug( - "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), + // Perform processing now that everything is prefetched. + for (TimerContext timerContext : timerContexts) { + ReduceFn.Context directContext = timerContext.directContext; + ReduceFn.Context renamedContext = timerContext.renamedContext; + + if (timerContext.isGarbageCollection) { + WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), timerContext.timestamp, + timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Cleanup time %s is beyond end-of-time", cleanupTime); - directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); + + boolean windowIsActiveAndOpen = timerContext.windowIsActiveAndOpen(); + if (windowIsActiveAndOpen) { + // We need to call onTrigger to emit the final pane if required. + // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, + // and the watermark has passed the end of the window. + @Nullable + Instant newHold = onTrigger( + directContext, renamedContext, true /* isFinished */, timerContext.isEndOfWindow); + checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); + } + + // Cleanup flavor B: Clear all the remaining state for this window since we'll never + // see elements for it again. + clearAllState(directContext, renamedContext, windowIsActiveAndOpen); + } else { + WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), timerContext.timestamp, + timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + if (timerContext.windowIsActiveAndOpen() + && triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + emitPane(directContext, renamedContext); + } + + if (timerContext.isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emit. + // We could assert this but it is very expensive. + + // Since we are processing an on-time firing we should schedule the garbage collection + // timer. (If getAllowedLateness is zero then the timer event will be considered a + // cleanup event and handled by the above). + // Note we must do this even if the trigger is finished so that we are sure to cleanup + // any final trigger finished bits. + checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), + "Unexpected zero getAllowedLateness"); + Instant cleanupTime = garbageCollectionTime(directContext.window()); + WindowTracing.debug( + "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Cleanup time %s is beyond end-of-time", cleanupTime); + directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); + } } } } @@ -665,7 +739,7 @@ public void onTimer(TimerData timer) throws Exception { * Clear all the state associated with {@code context}'s window. * Should only be invoked if we know all future elements for this window will be considered * beyond allowed lateness. - * This is a superset of the clearing done by {@link #emitIfAppropriate} below since: + * This is a superset of the clearing done by {@link #emitPane} below since: *
    *
  1. We can clear the trigger finished bits since we'll never need to ask if the trigger is * closed again. @@ -691,10 +765,10 @@ private void clearAllState( } else { // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2). // For (1), if !activeWindows.isActive then the window must be merging and has been - // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired + // explicitly removed by emit. But in that case the trigger must have fired // and been closed, so this case reduces to (2). // For (2), if triggerRunner.isClosed then the trigger was fired and entered the - // closed state. In that case emitIfAppropriate will have cleared all state in + // closed state. In that case emit will have cleared all state in // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows. // We also know nonEmptyPanes must have been unconditionally cleared by the trigger. // Since the trigger fired the existing watermark holds must have been cleared, and since @@ -736,17 +810,23 @@ private boolean shouldDiscardAfterFiring(boolean isFinished) { return false; } + private void prefetchEmitPane(ReduceFn.Context directContext, + ReduceFn.Context renamedContext) { + triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); + triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); + triggerRunner.prefetchIsClosed(directContext.state()); + prefetchOnTrigger(directContext, renamedContext); + } + /** - * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state. + * Emit a pane if a trigger is ready to fire or timers require it, and cleanup state. */ - private void emitIfAppropriate(ReduceFn.Context directContext, + private void emitPane( + ReduceFn.Context directContext, ReduceFn.Context renamedContext) throws Exception { - if (!triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())) { - // Ignore unless trigger is ready to fire - return; - } + checkState(triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())); // Inform the trigger of the transition to see if it is finished triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state()); @@ -783,7 +863,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon /** * Do we need to emit a pane? */ - private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { + private boolean needToEmitPane(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { if (!isEmpty) { // The pane has elements. return true; @@ -799,6 +879,17 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing return false; } + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + private void prefetchOnTrigger( + final ReduceFn.Context directContext, + ReduceFn.Context renamedContext) { + paneInfoTracker.prefetchPaneInfo(directContext); + watermarkHold.prefetchExtract(renamedContext); + nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); + reduceFn.prefetchOnTrigger(directContext.state()); + } + /** * Run the {@link ReduceFn#onTrigger} method and produce any necessary output. * @@ -812,25 +903,17 @@ private Instant onTrigger( throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); - // Prefetch necessary states - ReadableState outputTimestampFuture = - watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); - ReadableState paneFuture = - paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); - ReadableState isEmptyFuture = - nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); - - reduceFn.prefetchOnTrigger(directContext.state()); - triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); - // Calculate the pane info. - final PaneInfo pane = paneFuture.read(); - // Extract the window hold, and as a side effect clear it. + final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); - WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + // Extract the window hold, and as a side effect clear it. + final WatermarkHold.OldAndNewHolds pair = + watermarkHold.extractAndRelease(renamedContext, isFinished).read(); final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; + final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read(); + if (newHold != null) { // We can't be finished yet. checkState( @@ -862,11 +945,11 @@ private Instant onTrigger( } // Only emit a pane if it has data or empty panes are observable. - if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { + if (needToEmitPane(isEmpty, isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. final List windows = Collections.singletonList(directContext.window()); ReduceFn.OnTriggerContext renamedTriggerContext = - contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED, + contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED, new OnTriggerCallbacks() { @Override public void output(OutputT toOutput) { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 7d0b608f8cd9..6cac34a51144 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -445,6 +445,11 @@ public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { } } + public void prefetchExtract(final ReduceFn.Context context) { + context.state().access(elementHoldTag).readLater(); + context.state().access(EXTRA_HOLD_TAG).readLater(); + } + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index 9f03216f5b9b..2f277eb5bce3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -99,25 +99,25 @@ public boolean isClosed(StateAccessor state) { return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); } - public void prefetchForValue(W window, StateAccessor state) { + public void prefetchIsClosed(StateAccessor state) { if (isFinishedSetNeeded()) { state.access(FINISHED_BITS_TAG).readLater(); } + } + + public void prefetchForValue(W window, StateAccessor state) { + prefetchIsClosed(state); rootTrigger.getSpec().prefetchOnElement( contextFactory.createStateAccessor(window, rootTrigger)); } public void prefetchOnFire(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } + prefetchIsClosed(state); rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); } public void prefetchShouldFire(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } + prefetchIsClosed(state); rootTrigger.getSpec().prefetchShouldFire( contextFactory.createStateAccessor(window, rootTrigger)); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index f5ab8ea2362f..74b354631455 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -509,8 +509,10 @@ public WindowedValue apply(TimestampedValue input) { public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception { ReduceFnRunner runner = createRunner(); - runner.onTimer( + ArrayList timers = new ArrayList(1); + timers.add( TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain)); + runner.onTimers(timers); runner.persist(); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index e5c5e4b96cae..2ffa40f9875b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; @@ -181,9 +180,7 @@ public void processElement(WindowedValue> element) throws Ex // Drop any elements within expired windows reduceFnRunner.processElements( dropExpiredWindows(key, workItem.elementsIterable(), timerInternals)); - for (TimerData timer : workItem.timersIterable()) { - reduceFnRunner.onTimer(timer); - } + reduceFnRunner.onTimers(workItem.timersIterable()); reduceFnRunner.persist(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 79957193986c..79b52257a44a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -468,8 +468,10 @@ public AggregateT getAggregatorValue(Aggregator agg) private static TimerCallback collectInto(final List firedTimers) { return new TimerCallback() { @Override - public void onTimer(TimerInternals.TimerData timer) throws Exception { - firedTimers.add(timer); + public void onTimers(Iterable timers) throws Exception { + for (TimerInternals.TimerData timer : timers) { + firedTimers.add(timer); + } } }; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index a3bb45a4cc14..f1ddaac437f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; +import java.util.ArrayList; import java.util.HashSet; import java.util.PriorityQueue; import java.util.Set; @@ -235,13 +236,20 @@ private void advanceAndFire( throws Exception { checkNotNull(timerCallback); PriorityQueue queue = queue(domain); - while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { - // Remove before firing, so that if the callback adds another identical - // timer we don't remove it. - TimerData timer = queue.remove(); - WindowTracing.trace( - "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); - timerCallback.onTimer(timer); + while (true) { + ArrayList firedTimers = new ArrayList(); + while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + // Remove before firing, so that if the callback adds another identical + // timer we don't remove it. + TimerData timer = queue.remove(); + firedTimers.add(timer); + WindowTracing.trace( + "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); + } + if (firedTimers.isEmpty()) { + break; + } + timerCallback.onTimers(firedTimers); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java index 6598e300f256..dfdfd5b41672 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java @@ -19,16 +19,17 @@ import org.apache.beam.sdk.util.TimerInternals; + /** - * A callback that processes a {@link TimerInternals.TimerData TimerData}. + * A callback that processes an Iterable of {@link TimerInternals.TimerData TimerData}. */ public interface TimerCallback { - /** Processes the {@link TimerInternals.TimerData TimerData}. */ - void onTimer(TimerInternals.TimerData timer) throws Exception; + /** Processes an Iterable of {@link TimerInternals.TimerData TimerData}. */ + void onTimers(Iterable timers) throws Exception; TimerCallback NO_OP = new TimerCallback() { @Override - public void onTimer(TimerInternals.TimerData timer) throws Exception { + public void onTimers(Iterable timers) throws Exception { // Nothing } }; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java index 951803a8f659..a3a774952de5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -17,6 +17,11 @@ */ package org.apache.beam.sdk.util.state; +import static org.mockito.Matchers.argThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.joda.time.Instant; @@ -24,6 +29,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -44,6 +50,37 @@ public void setUp() { MockitoAnnotations.initMocks(this); } + private static class TimersAre extends ArgumentMatcher> { + final List expectedTimers; + TimersAre(List timers) { + expectedTimers = timers; + } + + @Override + public boolean matches(Object actual) { + if (actual == null || !(actual instanceof Iterable)) { + return false; + } + @SuppressWarnings("unchecked") + Iterable timers = (Iterable) actual; + + List actualTimers = new ArrayList(); + for (TimerData timer : timers) { + actualTimers.add(timer); + } + return expectedTimers.equals(actualTimers); + } + + @Override + public String toString() { + return "ordered timers " + expectedTimers.toString(); + } + } + + private static TimersAre timersAre(TimerData... timers) { + return new TimersAre(Arrays.asList(timers)); + } + @Test public void testFiringTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); @@ -54,7 +91,7 @@ public void testFiringTimers() throws Exception { underTest.setTimer(processingTime2); underTest.advanceProcessingTime(timerCallback, new Instant(20)); - Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1))); Mockito.verifyNoMoreInteractions(timerCallback); // Advancing just a little shouldn't refire @@ -63,13 +100,13 @@ public void testFiringTimers() throws Exception { // Adding the timer and advancing a little should refire underTest.setTimer(processingTime1); - Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1))); underTest.advanceProcessingTime(timerCallback, new Instant(21)); Mockito.verifyNoMoreInteractions(timerCallback); // And advancing the rest of the way should still have the other timer underTest.advanceProcessingTime(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimer(processingTime2); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime2))); Mockito.verifyNoMoreInteractions(timerCallback); } @@ -87,13 +124,11 @@ public void testTimerOrdering() throws Exception { underTest.setTimer(watermarkTime2); underTest.advanceInputWatermark(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimer(watermarkTime1); - Mockito.verify(timerCallback).onTimer(watermarkTime2); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime1, watermarkTime2))); Mockito.verifyNoMoreInteractions(timerCallback); underTest.advanceProcessingTime(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimer(processingTime1); - Mockito.verify(timerCallback).onTimer(processingTime2); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime1, processingTime2))); Mockito.verifyNoMoreInteractions(timerCallback); } @@ -107,10 +142,9 @@ public void testDeduplicate() throws Exception { underTest.setTimer(processingTime); underTest.setTimer(processingTime); underTest.advanceProcessingTime(timerCallback, new Instant(20)); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(processingTime))); underTest.advanceInputWatermark(timerCallback, new Instant(20)); - - Mockito.verify(timerCallback).onTimer(processingTime); - Mockito.verify(timerCallback).onTimer(watermarkTime); + Mockito.verify(timerCallback).onTimers(argThat(timersAre(watermarkTime))); Mockito.verifyNoMoreInteractions(timerCallback); } } From 184e3ca34c48d6c79ecd6ba2396226bae1c08af8 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 22 Nov 2016 12:10:35 -0800 Subject: [PATCH 2/5] Address comment by extracting functions from processElements to make it more readable Pull out window set calculation from processElement return value to ProcessElements so we can prefetch at once and simplify logic. --- .../beam/runners/core/ReduceFnRunner.java | 148 +++++++++++------- 1 file changed, 89 insertions(+), 59 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 04a533b9b847..3d10b65025e7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -18,17 +18,16 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -269,6 +268,32 @@ boolean hasNoActiveWindows() { return activeWindows.getActiveAndNewWindows().isEmpty(); } + private Set openWindows(Collection windows) { + Set result = new HashSet<>(); + for (W window : windows) { + ReduceFn.Context directContext = contextFactory.base( + window, StateStyle.DIRECT); + if (!triggerRunner.isClosed(directContext.state())) { + result.add(window); + } + } + return result; + } + + private Collection windowsThatShouldFire(Set windows) throws Exception { + Collection result = new ArrayList<>(); + // Filter out timers that didn't trigger. + for (W window : windows) { + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + result.add(window); + } + } + return result; + } + /** * Incorporate {@code values} into the underlying reduce function, and manage holds, timers, * triggers, and window merging. @@ -301,42 +326,33 @@ public void processElements(Iterable> values) throws Excep // If an incoming element introduces a new window, attempt to merge it into an existing // window eagerly. Map windowToMergeResult = collectAndMergeWindows(values); + prefetchWindowsForValues(windowToMergeResult.values()); - Set windowsToConsider = new HashSet<>(); + // All windows that are open before element processing may need to fire. + Set windowsToConsider = openWindows(windowToMergeResult.values()); // Process each element, using the updated activeWindows determined by collectAndMergeWindows. for (WindowedValue value : values) { - windowsToConsider.addAll(processElement(windowToMergeResult, value)); + processElement(windowToMergeResult, value); } - if (!windowsToConsider.isEmpty()) { - // Prefetch state necessary to determine if the triggers should fire. - for (W mergedWindow : windowsToConsider) { - ReduceFn.Context directContext = - contextFactory.base(mergedWindow, StateStyle.DIRECT); - triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); - } - - // Filter out timers that didn't trigger and prefetch state for triggering. - for (Iterator it = windowsToConsider.iterator(); it.hasNext(); ) { - W mergedWindow = it.next(); - ReduceFn.Context directContext = - contextFactory.base(mergedWindow, StateStyle.DIRECT); - if (triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())) { - ReduceFn.Context renamedContext = - contextFactory.base(mergedWindow, StateStyle.RENAMED); - prefetchEmitPane(directContext, renamedContext); - } else { - it.remove(); - } - } - - // Trigger output from any window for which the trigger is ready. - for (W mergedWindow : windowsToConsider) { - emitPane(contextFactory.base(mergedWindow, StateStyle.DIRECT), - contextFactory.base(mergedWindow, StateStyle.RENAMED)); - } + // Now that we've processed the elements, see if any of the windows need to fire. + // Prefetch state necessary to determine if the triggers should fire. + for (W mergedWindow : windowsToConsider) { + triggerRunner.prefetchShouldFire( + mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state()); + } + // Filter to windows that are firing. + Collection windowsToFire = windowsThatShouldFire(windowsToConsider); + // Prefetch windows that are firing. + for (W window : windowsToFire) { + prefetchEmitPane(contextFactory.base(window, StateStyle.DIRECT), + contextFactory.base(window, StateStyle.RENAMED)); + } + // Trigger output from firing windows. + for (W window : windowsToFire) { + emitPane(contextFactory.base(window, StateStyle.DIRECT), + contextFactory.base(window, StateStyle.RENAMED)); } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -351,14 +367,23 @@ public void persist() { /** * Extract the windows associated with the values, and invoke merge. Return a map - * from windows to the merge result window. If a window is not in the domain of - * the result map then it did not get merged into a different window. + * from windows to the merge result window. Windows that were not merged are present + * in the map with equal key and value. */ private Map collectAndMergeWindows(Iterable> values) throws Exception { + Map windowToMergeResult = new HashMap<>(); + // No-op if no merging can take place if (windowingStrategy.getWindowFn().isNonMerging()) { - return ImmutableMap.of(); + for (WindowedValue value : values) { + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + windowToMergeResult.put(window, window); + } + } + return windowToMergeResult; } // Collect the windows from all elements (except those which are too late) and @@ -367,6 +392,8 @@ private Map collectAndMergeWindows(Iterable> values) for (BoundedWindow untypedWindow : value.getWindows()) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; + // This will be overwritten by the merge callback if merging takes place. + windowToMergeResult.put(window, window); // For backwards compat with pre 1.4 only. // We may still have ACTIVE windows with multiple state addresses, representing @@ -395,7 +422,7 @@ private Map collectAndMergeWindows(Iterable> values) } // Merge all of the active windows and retain a mapping from source windows to result windows. - Map windowToMergeResult = new HashMap<>(); + activeWindows.merge(new OnMergeCallback(windowToMergeResult)); return windowToMergeResult; } @@ -497,38 +524,44 @@ public void onMerge(Collection toBeMerged, W mergeResult) throws Exception { } /** - * Process an element. - * - * @param value the value being processed - * @return the set of windows in which the element was actually processed + * Redirect element windows to the ACTIVE windows they have been merged into. + * The compressed representation (value, {window1, window2, ...}) actually represents + * distinct elements (value, window1), (value, window2), ... + * so if window1 and window2 merge, the resulting window will contain both copies + * of the value. */ - private Collection processElement(Map windowToMergeResult, WindowedValue value) - throws Exception { - // Redirect element windows to the ACTIVE windows they have been merged into. - // The compressed representation (value, {window1, window2, ...}) actually represents - // distinct elements (value, window1), (value, window2), ... - // so if window1 and window2 merge, the resulting window will contain both copies - // of the value. - Collection windows = new ArrayList<>(); - for (BoundedWindow untypedWindow : value.getWindows()) { + private Set toMergedWindows(Map windowToMergeResult, + Collection windows) { + Set mergedWindows = new HashSet<>(); + for (BoundedWindow untypedWindow : windows) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; W mergeResult = windowToMergeResult.get(window); - if (mergeResult == null) { - mergeResult = window; - } - windows.add(mergeResult); + checkNotNull(mergeResult); + mergedWindows.add(mergeResult); } + return mergedWindows; + } + private void prefetchWindowsForValues(Collection windows) { // Prefetch in each of the windows if we're going to need to process triggers for (W window : windows) { - ReduceFn.ProcessValueContext directContext = contextFactory.forValue( - window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); + ReduceFn.Context directContext = contextFactory.base( + window, StateStyle.DIRECT); triggerRunner.prefetchForValue(window, directContext.state()); } + } + + /** + * Process an element. + * + * @param value the value being processed + */ + private void processElement(Map windowToMergeResult, WindowedValue value) + throws Exception { + Set windows = toMergedWindows(windowToMergeResult, value.getWindows()); // Process the element for each (mergeResultWindow, not closed) window it belongs to. - List triggerableWindows = new ArrayList<>(windows.size()); for (W window : windows) { ReduceFn.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); @@ -543,7 +576,6 @@ private Collection processElement(Map windowToMergeResult, WindowedValu continue; } - triggerableWindows.add(window); activeWindows.ensureWindowIsActive(window); ReduceFn.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); @@ -587,8 +619,6 @@ private Collection processElement(Map windowToMergeResult, WindowedValu // cannot take a trigger state from firing to non-firing. // (We don't actually assert this since it is too slow.) } - - return triggerableWindows; } private class TimerContext { From 102a75ef56852e827f36815ce453b97936acfe24 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 22 Nov 2016 12:29:28 -0800 Subject: [PATCH 3/5] Remove unnecessary RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT suppressions --- .../java/org/apache/beam/runners/core/PaneInfoTracker.java | 5 ----- .../java/org/apache/beam/runners/core/ReduceFnRunner.java | 3 --- .../java/org/apache/beam/runners/core/WatermarkHold.java | 3 --- 3 files changed, 11 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index f904bc6e571b..69a4cfd5b9ef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; @@ -55,8 +54,6 @@ public void clear(StateAccessor state) { state.access(PANE_INFO_TAG).clear(); } - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") public void prefetchPaneInfo(ReduceFn.Context context) { context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater(); } @@ -77,8 +74,6 @@ public ReadableState getNextPaneInfo( return new ReadableState() { @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") public ReadableState readLater() { previousPaneFuture.readLater(); return this; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 3d10b65025e7..8779c59c63e8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -909,8 +908,6 @@ private boolean needToEmitPane(boolean isEmpty, boolean isFinished, PaneInfo.Tim return false; } - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") private void prefetchOnTrigger( final ReduceFn.Context directContext, ReduceFn.Context renamedContext) { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 6cac34a51144..7f1afcc0a79c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Serializable; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -470,8 +469,6 @@ public ReadableState extractAndRelease( final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); return new ReadableState() { @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "") public ReadableState readLater() { elementHoldState.readLater(); extraHoldState.readLater(); From 4d86a1f71e457afe0b870a0fd9379fa5ac3305b8 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 22 Nov 2016 13:14:47 -0800 Subject: [PATCH 4/5] Change HashMap per element to immutable map since they are generally small --- .../beam/runners/core/ReduceFnRunner.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 8779c59c63e8..deede04f6af8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -22,6 +22,9 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -529,17 +532,19 @@ public void onMerge(Collection toBeMerged, W mergeResult) throws Exception { * so if window1 and window2 merge, the resulting window will contain both copies * of the value. */ - private Set toMergedWindows(Map windowToMergeResult, - Collection windows) { - Set mergedWindows = new HashSet<>(); - for (BoundedWindow untypedWindow : windows) { - @SuppressWarnings("unchecked") - W window = (W) untypedWindow; - W mergeResult = windowToMergeResult.get(window); - checkNotNull(mergeResult); - mergedWindows.add(mergeResult); - } - return mergedWindows; + private ImmutableSet toMergedWindows(final Map windowToMergeResult, + final Collection windows) { + return ImmutableSet.copyOf( + FluentIterable.from(windows).transform( + new Function() { + @Override + public W apply(BoundedWindow untypedWindow) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + return checkNotNull(windowToMergeResult.get(window)); + } + } + )); } private void prefetchWindowsForValues(Collection windows) { @@ -558,7 +563,7 @@ private void prefetchWindowsForValues(Collection windows) { */ private void processElement(Map windowToMergeResult, WindowedValue value) throws Exception { - Set windows = toMergedWindows(windowToMergeResult, value.getWindows()); + ImmutableSet windows = toMergedWindows(windowToMergeResult, value.getWindows()); // Process the element for each (mergeResultWindow, not closed) window it belongs to. for (W window : windows) { From 07b44e7e2b4ea5629510577f5d1f93f8a70a50a4 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Mon, 28 Nov 2016 12:41:45 -0800 Subject: [PATCH 5/5] Address review comments --- .../beam/runners/core/ReduceFnRunner.java | 181 ++++++++++-------- 1 file changed, 100 insertions(+), 81 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index deede04f6af8..6448b4058116 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; @@ -325,13 +324,27 @@ public void processElements(Iterable> values) throws Excep return; } + // Determine all the windows for elements. + Set windows = collectWindows(values); // If an incoming element introduces a new window, attempt to merge it into an existing // window eagerly. - Map windowToMergeResult = collectAndMergeWindows(values); - prefetchWindowsForValues(windowToMergeResult.values()); + Map windowToMergeResult = mergeWindows(windows); + if (!windowToMergeResult.isEmpty()) { + // Update windows by removing all windows that were merged away and adding + // the windows they were merged to. We add after completing all the + // removals to avoid removing a window that was also added. + List addedWindows = new ArrayList<>(windowToMergeResult.size()); + for (Map.Entry entry : windowToMergeResult.entrySet()) { + windows.remove(entry.getKey()); + addedWindows.add(entry.getValue()); + } + windows.addAll(addedWindows); + } + + prefetchWindowsForValues(windows); // All windows that are open before element processing may need to fire. - Set windowsToConsider = openWindows(windowToMergeResult.values()); + Set windowsToConsider = openWindows(windows); // Process each element, using the updated activeWindows determined by collectAndMergeWindows. for (WindowedValue value : values) { @@ -348,12 +361,12 @@ public void processElements(Iterable> values) throws Excep Collection windowsToFire = windowsThatShouldFire(windowsToConsider); // Prefetch windows that are firing. for (W window : windowsToFire) { - prefetchEmitPane(contextFactory.base(window, StateStyle.DIRECT), + prefetchEmit(contextFactory.base(window, StateStyle.DIRECT), contextFactory.base(window, StateStyle.RENAMED)); } // Trigger output from firing windows. for (W window : windowsToFire) { - emitPane(contextFactory.base(window, StateStyle.DIRECT), + emit(contextFactory.base(window, StateStyle.DIRECT), contextFactory.base(window, StateStyle.RENAMED)); } @@ -368,63 +381,61 @@ public void persist() { } /** - * Extract the windows associated with the values, and invoke merge. Return a map - * from windows to the merge result window. Windows that were not merged are present - * in the map with equal key and value. + * Extract the windows associated with the values. */ - private Map collectAndMergeWindows(Iterable> values) - throws Exception { - Map windowToMergeResult = new HashMap<>(); - - // No-op if no merging can take place - if (windowingStrategy.getWindowFn().isNonMerging()) { - for (WindowedValue value : values) { - for (BoundedWindow untypedWindow : value.getWindows()) { - @SuppressWarnings("unchecked") + private Set collectWindows(Iterable> values) throws Exception { + Set windows = new HashSet<>(); + for (WindowedValue value : values) { + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") W window = (W) untypedWindow; - windowToMergeResult.put(window, window); - } + windows.add(window); } - return windowToMergeResult; + } + return windows; + } + + /** + * Invoke merge for the given windows and return a map from windows to the + * merge result window. Windows that were not merged are not present in the + * map. + */ + private Map mergeWindows(Set windows) throws Exception { + if (windowingStrategy.getWindowFn().isNonMerging()) { + // Return an empty map, indicating that every window is not merged. + return Collections.emptyMap(); } + Map windowToMergeResult = new HashMap<>(); // Collect the windows from all elements (except those which are too late) and // make sure they are already in the active window set or are added as NEW windows. - for (WindowedValue value : values) { - for (BoundedWindow untypedWindow : value.getWindows()) { - @SuppressWarnings("unchecked") - W window = (W) untypedWindow; - // This will be overwritten by the merge callback if merging takes place. - windowToMergeResult.put(window, window); - - // For backwards compat with pre 1.4 only. - // We may still have ACTIVE windows with multiple state addresses, representing - // a window who's state has not yet been eagerly merged. - // We'll go ahead and merge that state now so that we don't have to worry about - // this legacy case anywhere else. - if (activeWindows.isActive(window)) { - Set stateAddressWindows = activeWindows.readStateAddresses(window); - if (stateAddressWindows.size() > 1) { - // This is a legacy window who's state has not been eagerly merged. - // Do that now. - ReduceFn.OnMergeContext premergeContext = - contextFactory.forPremerge(window); - reduceFn.onMerge(premergeContext); - watermarkHold.onMerge(premergeContext); - activeWindows.merged(window); - } + for (W window : windows) { + // For backwards compat with pre 1.4 only. + // We may still have ACTIVE windows with multiple state addresses, representing + // a window who's state has not yet been eagerly merged. + // We'll go ahead and merge that state now so that we don't have to worry about + // this legacy case anywhere else. + if (activeWindows.isActive(window)) { + Set stateAddressWindows = activeWindows.readStateAddresses(window); + if (stateAddressWindows.size() > 1) { + // This is a legacy window who's state has not been eagerly merged. + // Do that now. + ReduceFn.OnMergeContext premergeContext = + contextFactory.forPremerge(window); + reduceFn.onMerge(premergeContext); + watermarkHold.onMerge(premergeContext); + activeWindows.merged(window); } - - // Add this window as NEW if it is not currently ACTIVE. - // If we had already seen this window and closed its trigger, then the - // window will not be currently ACTIVE. It will then be added as NEW here, - // and fall into the merging logic as usual. - activeWindows.ensureWindowExists(window); } + + // Add this window as NEW if it is not currently ACTIVE. + // If we had already seen this window and closed its trigger, then the + // window will not be currently ACTIVE. It will then be added as NEW here, + // and fall into the merging logic as usual. + activeWindows.ensureWindowExists(window); } // Merge all of the active windows and retain a mapping from source windows to result windows. - activeWindows.merge(new OnMergeCallback(windowToMergeResult)); return windowToMergeResult; } @@ -541,7 +552,9 @@ private ImmutableSet toMergedWindows(final Map windowToMergeResult, public W apply(BoundedWindow untypedWindow) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; - return checkNotNull(windowToMergeResult.get(window)); + W mergedWindow = windowToMergeResult.get(window); + // If the element is not present in the map, the window is unmerged. + return (mergedWindow == null) ? window : mergedWindow; } } )); @@ -559,6 +572,8 @@ private void prefetchWindowsForValues(Collection windows) { /** * Process an element. * + * @param windowToMergeResult map of windows to merged windows. If a window is + * not present it is unmerged. * @param value the value being processed */ private void processElement(Map windowToMergeResult, WindowedValue value) @@ -625,7 +640,11 @@ private void processElement(Map windowToMergeResult, WindowedValue } } - private class TimerContext { + /** + * Enriches TimerData with state necessary for processing a timer as well as + * common queries about a timer. + */ + private class EnrichedTimerData { public final Instant timestamp; public final ReduceFn.Context directContext; public final ReduceFn.Context renamedContext; @@ -637,7 +656,7 @@ private class TimerContext { // end-of-window time to be a signal to garbage collect. public final boolean isGarbageCollection; - TimerContext( + EnrichedTimerData( TimerData timer, ReduceFn.Context directContext, ReduceFn.Context renamedContext) { @@ -669,7 +688,7 @@ public void onTimers(Iterable timers) throws Exception { // Create a reusable context for each timer and begin prefetching necessary // state. - List timerContexts = new LinkedList(); + List enrichedTimers = new LinkedList(); for (TimerData timer : timers) { checkArgument(timer.getNamespace() instanceof WindowNamespace, "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); @@ -680,11 +699,11 @@ public void onTimers(Iterable timers) throws Exception { contextFactory.base(window, StateStyle.DIRECT); ReduceFn.Context renamedContext = contextFactory.base(window, StateStyle.RENAMED); - TimerContext timerContext = new TimerContext(timer, directContext, renamedContext); - timerContexts.add(timerContext); + EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext); + enrichedTimers.add(enrichedTimer); // Perform prefetching of state to determine if the trigger should fire. - if (!timerContext.isGarbageCollection) { + if (enrichedTimer.isGarbageCollection) { triggerRunner.prefetchIsClosed(directContext.state()); } else { triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); @@ -692,38 +711,38 @@ public void onTimers(Iterable timers) throws Exception { } // For those windows that are active and open, prefetch the triggering or emitting state. - for (TimerContext timerContext : timerContexts) { - if (timerContext.windowIsActiveAndOpen()) { - ReduceFn.Context directContext = timerContext.directContext; - if (timerContext.isGarbageCollection) { - prefetchOnTrigger(directContext, timerContext.renamedContext); + for (EnrichedTimerData timer : enrichedTimers) { + if (timer.windowIsActiveAndOpen()) { + ReduceFn.Context directContext = timer.directContext; + if (timer.isGarbageCollection) { + prefetchOnTrigger(directContext, timer.renamedContext); } else if (triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { - prefetchEmitPane(directContext, timerContext.renamedContext); + prefetchEmit(directContext, timer.renamedContext); } } } // Perform processing now that everything is prefetched. - for (TimerContext timerContext : timerContexts) { - ReduceFn.Context directContext = timerContext.directContext; - ReduceFn.Context renamedContext = timerContext.renamedContext; + for (EnrichedTimerData timer : enrichedTimers) { + ReduceFn.Context directContext = timer.directContext; + ReduceFn.Context renamedContext = timer.renamedContext; - if (timerContext.isGarbageCollection) { + if (timer.isGarbageCollection) { WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), timerContext.timestamp, + key, directContext.window(), timer.timestamp, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - boolean windowIsActiveAndOpen = timerContext.windowIsActiveAndOpen(); + boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen(); if (windowIsActiveAndOpen) { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. @Nullable Instant newHold = onTrigger( - directContext, renamedContext, true /* isFinished */, timerContext.isEndOfWindow); + directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow); checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); } @@ -733,16 +752,16 @@ public void onTimers(Iterable timers) throws Exception { } else { WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), timerContext.timestamp, + key, directContext.window(), timer.timestamp, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - if (timerContext.windowIsActiveAndOpen() + if (timer.windowIsActiveAndOpen() && triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { - emitPane(directContext, renamedContext); + emit(directContext, renamedContext); } - if (timerContext.isEndOfWindow) { + if (timer.isEndOfWindow) { // If the window strategy trigger includes a watermark trigger then at this point // there should be no data holds, either because we'd already cleared them on an // earlier onTrigger, or because we just cleared them on the above emit. @@ -844,7 +863,7 @@ private boolean shouldDiscardAfterFiring(boolean isFinished) { return false; } - private void prefetchEmitPane(ReduceFn.Context directContext, + private void prefetchEmit(ReduceFn.Context directContext, ReduceFn.Context renamedContext) { triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); @@ -853,9 +872,9 @@ private void prefetchEmitPane(ReduceFn.Context directCont } /** - * Emit a pane if a trigger is ready to fire or timers require it, and cleanup state. + * Emit if a trigger is ready to fire or timers require it, and cleanup state. */ - private void emitPane( + private void emit( ReduceFn.Context directContext, ReduceFn.Context renamedContext) throws Exception { @@ -895,9 +914,9 @@ private void emitPane( } /** - * Do we need to emit a pane? + * Do we need to emit? */ - private boolean needToEmitPane(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { + private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { if (!isEmpty) { // The pane has elements. return true; @@ -977,7 +996,7 @@ private Instant onTrigger( } // Only emit a pane if it has data or empty panes are observable. - if (needToEmitPane(isEmpty, isFinished, pane.getTiming())) { + if (needToEmit(isEmpty, isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. final List windows = Collections.singletonList(directContext.window()); ReduceFn.OnTriggerContext renamedTriggerContext =