From 75eb7943521b61589073adaee72a65451a14961b Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 6 Dec 2016 15:32:17 -0800 Subject: [PATCH] Improve ReduceFnRunner prefetching Add prefetch* methods for prefetching state matching existing methods. Prefetch triggers in processElements. Replace onTimer with batched onTimers method to allow prefetching across timers. --- .../sdk/util/BatchTimerInternals.java | 22 +- .../GroupAlsoByWindowViaWindowSetDoFn.java | 6 +- .../dataflow/sdk/util/PaneInfoTracker.java | 4 + .../sdk/util/ReduceFnContextFactory.java | 9 +- .../dataflow/sdk/util/ReduceFnRunner.java | 490 +++++++++++------- .../dataflow/sdk/util/TriggerRunner.java | 15 +- .../dataflow/sdk/util/WatermarkHold.java | 5 + .../sdk/util/BatchTimerInternalsTest.java | 55 +- .../dataflow/sdk/util/ReduceFnTester.java | 28 +- 9 files changed, 407 insertions(+), 227 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java index dc41f7db3b..ca34165927 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java @@ -23,6 +23,7 @@ import org.joda.time.Instant; +import java.util.ArrayList; import java.util.HashSet; import java.util.PriorityQueue; import java.util.Set; @@ -124,16 +125,19 @@ private void advance(ReduceFnRunner runner, Instant newTime, TimeDom PriorityQueue timers = queue(domain); boolean shouldFire = false; - do { - TimerData timer = timers.peek(); - // Timers fire if the new time is ahead of the timer - shouldFire = timer != null && newTime.isAfter(timer.getTimestamp()); - if (shouldFire) { - // Remove before firing, so that if the trigger adds another identical + while (true) { + ArrayList firedTimers = new ArrayList(); + while (!timers.isEmpty() && newTime.isAfter(timers.peek().getTimestamp())) { + // Remove before firing, so that if the callback adds another identical // timer we don't remove it. - timers.remove(); - runner.onTimer(timer); + TimerData timer = timers.remove(); + existingTimers.remove(timer); + firedTimers.add(timer); } - } while (shouldFire); + if (firedTimers.isEmpty()) { + break; + } + runner.onTimers(firedTimers); + } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java index 89a4fcb720..5110ff199f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java @@ -20,7 +20,6 @@ import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.DoFnRunner.ReduceFnExecutor; -import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.state.StateInternals; import com.google.cloud.dataflow.sdk.values.KV; @@ -81,9 +80,7 @@ public void processElement(ProcessContext c) throws Exception { c.getPipelineOptions()); reduceFnRunner.processElements(element.elementsIterable()); - for (TimerData timer : element.timersIterable()) { - reduceFnRunner.onTimer(timer); - } + reduceFnRunner.onTimers(element.timersIterable()); reduceFnRunner.persist(); } @@ -101,4 +98,3 @@ public Aggregator getDroppedDueToLatenessAggregator() { return droppedDueToLateness; } } - diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index 0cc6a048c9..934917bb1e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -51,6 +51,10 @@ public void clear(StateAccessor state) { state.access(PANE_INFO_TAG).clear(); } + public void prefetchPaneInfo(ReduceFn.Context context) { + context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater(); + } + /** * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane * info includes the timing for the pane, who's calculation is quite subtle. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java index dc97c58a28..06ae2e6593 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor; -import com.google.cloud.dataflow.sdk.util.state.ReadableState; import com.google.cloud.dataflow.sdk.util.state.State; import com.google.cloud.dataflow.sdk.util.state.StateAccessor; import com.google.cloud.dataflow.sdk.util.state.StateContext; @@ -99,7 +98,7 @@ public ReduceFn.ProcessValueContext forValue( } public ReduceFn.OnTriggerContext forTrigger(W window, - ReadableState pane, StateStyle style, OnTriggerCallbacks callbacks) { + PaneInfo pane, StateStyle style, OnTriggerCallbacks callbacks) { return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks); } @@ -371,11 +370,11 @@ public Timers timers() { private class OnTriggerContextImpl extends ReduceFn.OnTriggerContext { private final StateAccessorImpl state; - private final ReadableState pane; + private final PaneInfo pane; private final OnTriggerCallbacks callbacks; private final TimersImpl timers; - private OnTriggerContextImpl(StateAccessorImpl state, ReadableState pane, + private OnTriggerContextImpl(StateAccessorImpl state, PaneInfo pane, OnTriggerCallbacks callbacks) { reduceFn.super(); this.state = state; @@ -406,7 +405,7 @@ public StateAccessor state() { @Override public PaneInfo paneInfo() { - return pane.read(); + return pane; } @Override diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 9322efcbc1..500d755083 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -34,13 +34,14 @@ import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory.StateStyle; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode; -import com.google.cloud.dataflow.sdk.util.state.ReadableState; import com.google.cloud.dataflow.sdk.util.state.StateInternals; import com.google.cloud.dataflow.sdk.util.state.StateNamespaces.WindowNamespace; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; import org.joda.time.Duration; import org.joda.time.Instant; @@ -50,6 +51,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -254,6 +256,32 @@ boolean hasNoActiveWindows() { return activeWindows.getActiveAndNewWindows().isEmpty(); } + private Set openWindows(Collection windows) { + Set result = new HashSet<>(); + for (W window : windows) { + ReduceFn.Context directContext = contextFactory.base( + window, StateStyle.DIRECT); + if (!triggerRunner.isClosed(directContext.state())) { + result.add(window); + } + } + return result; + } + + private Collection windowsThatShouldFire(Set windows) throws Exception { + Collection result = new ArrayList<>(); + // Filter out timers that didn't trigger. + for (W window : windows) { + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + result.add(window); + } + } + return result; + } + /** * Incorporate {@code values} into the underlying reduce function, and manage holds, timers, * triggers, and window merging. @@ -279,25 +307,54 @@ boolean hasNoActiveWindows() { * */ public void processElements(Iterable> values) throws Exception { + if (!values.iterator().hasNext()) { + return; + } + + // Determine all the windows for elements. + Set windows = collectWindows(values); // If an incoming element introduces a new window, attempt to merge it into an existing // window eagerly. - Map windowToMergeResult = collectAndMergeWindows(values); + Map windowToMergeResult = mergeWindows(windows); + if (!windowToMergeResult.isEmpty()) { + // Update windows by removing all windows that were merged away and adding + // the windows they were merged to. We add after completing all the + // removals to avoid removing a window that was also added. + List addedWindows = new ArrayList<>(windowToMergeResult.size()); + for (Map.Entry entry : windowToMergeResult.entrySet()) { + windows.remove(entry.getKey()); + addedWindows.add(entry.getValue()); + } + windows.addAll(addedWindows); + } - Set windowsToConsider = new HashSet<>(); + prefetchWindowsForValues(windows); - // Process each element, using the updated activeWindows determined by collectAndMergeWindows. + // All windows that are open before element processing may need to fire. + Set windowsToConsider = openWindows(windows); + + // Process each element, using the updated activeWindows determined by mergeWindows. for (WindowedValue value : values) { - windowsToConsider.addAll(processElement(windowToMergeResult, value)); + processElement(windowToMergeResult, value); } - // Trigger output from any window for which the trigger is ready + // Now that we've processed the elements, see if any of the windows need to fire. + // Prefetch state necessary to determine if the triggers should fire. for (W mergedWindow : windowsToConsider) { - ReduceFn.Context directContext = - contextFactory.base(mergedWindow, StateStyle.DIRECT); - ReduceFn.Context renamedContext = - contextFactory.base(mergedWindow, StateStyle.RENAMED); - triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); - emitIfAppropriate(directContext, renamedContext); + triggerRunner.prefetchShouldFire( + mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state()); + } + // Filter to windows that are firing. + Collection windowsToFire = windowsThatShouldFire(windowsToConsider); + // Prefetch windows that are firing. + for (W window : windowsToFire) { + prefetchEmit(contextFactory.base(window, StateStyle.DIRECT), + contextFactory.base(window, StateStyle.RENAMED)); + } + // Trigger output from firing windows. + for (W window : windowsToFire) { + emit(contextFactory.base(window, StateStyle.DIRECT), + contextFactory.base(window, StateStyle.RENAMED)); } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -311,52 +368,61 @@ public void persist() { } /** - * Extract the windows associated with the values, and invoke merge. Return a map - * from windows to the merge result window. If a window is not in the domain of - * the result map then it did not get merged into a different window. + * Extract the windows associated with the values. */ - private Map collectAndMergeWindows(Iterable> values) - throws Exception { - // No-op if no merging can take place + private Set collectWindows(Iterable> values) throws Exception { + Set windows = new HashSet<>(); + for (WindowedValue value : values) { + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + windows.add(window); + } + } + return windows; + } + + /** + * Invoke merge for the given windows and return a map from windows to the + * merge result window. Windows that were not merged are not present in the + * map. + */ + private Map mergeWindows(Set windows) throws Exception { if (windowingStrategy.getWindowFn().isNonMerging()) { - return ImmutableMap.of(); + // Return an empty map, indicating that every window is not merged. + return Collections.emptyMap(); } + Map windowToMergeResult = new HashMap<>(); // Collect the windows from all elements (except those which are too late) and // make sure they are already in the active window set or are added as NEW windows. - for (WindowedValue value : values) { - for (BoundedWindow untypedWindow : value.getWindows()) { - @SuppressWarnings("unchecked") - W window = (W) untypedWindow; - - // For backwards compat with pre 1.4 only. - // We may still have ACTIVE windows with multiple state addresses, representing - // a window who's state has not yet been eagerly merged. - // We'll go ahead and merge that state now so that we don't have to worry about - // this legacy case anywhere else. - if (activeWindows.isActive(window)) { - Set stateAddressWindows = activeWindows.readStateAddresses(window); - if (stateAddressWindows.size() > 1) { - // This is a legacy window who's state has not been eagerly merged. - // Do that now. - ReduceFn.OnMergeContext premergeContext = - contextFactory.forPremerge(window); - reduceFn.onMerge(premergeContext); - watermarkHold.onMerge(premergeContext); - activeWindows.merged(window); - } + for (W window : windows) { + // For backwards compat with pre 1.4 only. + // We may still have ACTIVE windows with multiple state addresses, representing + // a window who's state has not yet been eagerly merged. + // We'll go ahead and merge that state now so that we don't have to worry about + // this legacy case anywhere else. + if (activeWindows.isActive(window)) { + Set stateAddressWindows = activeWindows.readStateAddresses(window); + if (stateAddressWindows.size() > 1) { + // This is a legacy window who's state has not been eagerly merged. + // Do that now. + ReduceFn.OnMergeContext premergeContext = + contextFactory.forPremerge(window); + reduceFn.onMerge(premergeContext); + watermarkHold.onMerge(premergeContext); + activeWindows.merged(window); } - - // Add this window as NEW if it is not currently ACTIVE. - // If we had already seen this window and closed its trigger, then the - // window will not be currently ACTIVE. It will then be added as NEW here, - // and fall into the merging logic as usual. - activeWindows.ensureWindowExists(window); } + + // Add this window as NEW if it is not currently ACTIVE. + // If we had already seen this window and closed its trigger, then the + // window will not be currently ACTIVE. It will then be added as NEW here, + // and fall into the merging logic as usual. + activeWindows.ensureWindowExists(window); } // Merge all of the active windows and retain a mapping from source windows to result windows. - Map windowToMergeResult = new HashMap<>(); activeWindows.merge(new OnMergeCallback(windowToMergeResult)); return windowToMergeResult; } @@ -458,38 +524,50 @@ public void onMerge(Collection toBeMerged, W mergeResult) throws Exception { } /** - * Process an element. - * - * @param value the value being processed - * @return the set of windows in which the element was actually processed + * Redirect element windows to the ACTIVE windows they have been merged into. + * The compressed representation (value, {window1, window2, ...}) actually represents + * distinct elements (value, window1), (value, window2), ... + * so if window1 and window2 merge, the resulting window will contain both copies + * of the value. */ - private Collection processElement(Map windowToMergeResult, WindowedValue value) - throws Exception { - // Redirect element windows to the ACTIVE windows they have been merged into. - // The compressed representation (value, {window1, window2, ...}) actually represents - // distinct elements (value, window1), (value, window2), ... - // so if window1 and window2 merge, the resulting window will contain both copies - // of the value. - Collection windows = new ArrayList<>(); - for (BoundedWindow untypedWindow : value.getWindows()) { - @SuppressWarnings("unchecked") - W window = (W) untypedWindow; - W mergeResult = windowToMergeResult.get(window); - if (mergeResult == null) { - mergeResult = window; - } - windows.add(mergeResult); - } + private ImmutableSet toMergedWindows(final Map windowToMergeResult, + final Collection windows) { + return ImmutableSet.copyOf( + FluentIterable.from(windows).transform( + new Function() { + @Override + public W apply(BoundedWindow untypedWindow) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + W mergedWindow = windowToMergeResult.get(window); + // If the element is not present in the map, the window is unmerged. + return (mergedWindow == null) ? window : mergedWindow; + } + } + )); + } + private void prefetchWindowsForValues(Collection windows) { // Prefetch in each of the windows if we're going to need to process triggers for (W window : windows) { - ReduceFn.ProcessValueContext directContext = contextFactory.forValue( - window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); + ReduceFn.Context directContext = contextFactory.base( + window, StateStyle.DIRECT); triggerRunner.prefetchForValue(window, directContext.state()); } + } + + /** + * Process an element. + * + * @param windowToMergeResult map of windows to merged windows. If a window is + * not present it is unmerged. + * @param value the value being processed + */ + private void processElement(Map windowToMergeResult, WindowedValue value) + throws Exception { + ImmutableSet windows = toMergedWindows(windowToMergeResult, value.getWindows()); // Process the element for each (mergeResultWindow, not closed) window it belongs to. - List triggerableWindows = new ArrayList<>(windows.size()); for (W window : windows) { ReduceFn.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); @@ -504,7 +582,6 @@ private Collection processElement(Map windowToMergeResult, WindowedValu continue; } - triggerableWindows.add(window); activeWindows.ensureWindowIsActive(window); ReduceFn.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); @@ -548,102 +625,152 @@ private Collection processElement(Map windowToMergeResult, WindowedValu // cannot take a trigger state from firing to non-firing. // (We don't actually assert this since it is too slow.) } - - return triggerableWindows; } /** - * Called when an end-of-window, garbage collection, or trigger-specific timer fires. + * Enriches TimerData with state necessary for processing a timer as well as + * common queries about a timer. */ - public void onTimer(TimerData timer) throws Exception { - // Which window is the timer for? - checkArgument(timer.getNamespace() instanceof WindowNamespace, - "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); - @SuppressWarnings("unchecked") - WindowNamespace windowNamespace = (WindowNamespace) timer.getNamespace(); - W window = windowNamespace.getWindow(); - ReduceFn.Context directContext = - contextFactory.base(window, StateStyle.DIRECT); - ReduceFn.Context renamedContext = - contextFactory.base(window, StateStyle.RENAMED); + private class EnrichedTimerData { + public final Instant timestamp; + public final ReduceFn.Context directContext; + public final ReduceFn.Context renamedContext; + // If this is an end-of-window timer then we may need to set a garbage collection timer + // if allowed lateness is non-zero. + public final boolean isEndOfWindow; + // If this is a garbage collection timer then we should trigger and + // garbage collect the window. We'll consider any timer at or after the + // end-of-window time to be a signal to garbage collect. + public final boolean isGarbageCollection; + + EnrichedTimerData( + TimerData timer, + ReduceFn.Context directContext, + ReduceFn.Context renamedContext) { + this.timestamp = timer.getTimestamp(); + this.directContext = directContext; + this.renamedContext = renamedContext; + W window = directContext.window(); + this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + Instant cleanupTime = garbageCollectionTime(window); + this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); + } // Has this window had its trigger finish? // - The trigger may implement isClosed as constant false. // - If the window function does not support windowing then all windows will be considered // active. // So we must take conjunction of activeWindows and triggerRunner state. - boolean windowIsActiveAndOpen = - activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state()); + public boolean windowIsActiveAndOpen() { + return activeWindows.isActive(directContext.window()) + && !triggerRunner.isClosed(directContext.state()); + } + } - if (!windowIsActiveAndOpen) { - WindowTracing.debug( - "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); + public void onTimers(Iterable timers) throws Exception { + if (!timers.iterator().hasNext()) { + return; } - // If this is an end-of-window timer then we may need to set a garbage collection timer - // if allowed lateness is non-zero. - boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); - - // If this is a garbage collection timer then we should trigger and garbage collect the window. - // We'll consider any timer at or after the end-of-window time to be a signal to garbage - // collect. - Instant cleanupTime = garbageCollectionTime(window); - boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain() - && !timer.getTimestamp().isBefore(cleanupTime); - - if (isGarbageCollection) { - WindowTracing.debug( - "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - - if (windowIsActiveAndOpen) { - // We need to call onTrigger to emit the final pane if required. - // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, - // and the watermark has passed the end of the window. - @Nullable Instant newHold = - onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); - checkState(newHold == null, - "Hold placed at %s despite isFinished being true.", newHold); + // Create a reusable context for each timer and begin prefetching necessary + // state. + List enrichedTimers = new LinkedList(); + for (TimerData timer : timers) { + checkArgument(timer.getNamespace() instanceof WindowNamespace, + "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); + @SuppressWarnings("unchecked") + WindowNamespace windowNamespace = (WindowNamespace) timer.getNamespace(); + W window = windowNamespace.getWindow(); + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + ReduceFn.Context renamedContext = + contextFactory.base(window, StateStyle.RENAMED); + EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext); + enrichedTimers.add(enrichedTimer); + + // Perform prefetching of state to determine if the trigger should fire. + if (enrichedTimer.isGarbageCollection) { + triggerRunner.prefetchIsClosed(directContext.state()); + } else { + triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); } + } - // Cleanup flavor B: Clear all the remaining state for this window since we'll never - // see elements for it again. - clearAllState(directContext, renamedContext, windowIsActiveAndOpen); - } else { - WindowTracing.debug( - "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - if (windowIsActiveAndOpen) { - emitIfAppropriate(directContext, renamedContext); + // For those windows that are active and open, prefetch the triggering or emitting state. + for (EnrichedTimerData timer : enrichedTimers) { + if (timer.windowIsActiveAndOpen()) { + ReduceFn.Context directContext = timer.directContext; + if (timer.isGarbageCollection) { + prefetchOnTrigger(directContext, timer.renamedContext); + } else if (triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + prefetchEmit(directContext, timer.renamedContext); + } } + } - if (isEndOfWindow) { - // If the window strategy trigger includes a watermark trigger then at this point - // there should be no data holds, either because we'd already cleared them on an - // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. - // We could assert this but it is very expensive. - - // Since we are processing an on-time firing we should schedule the garbage collection - // timer. (If getAllowedLateness is zero then the timer event will be considered a - // cleanup event and handled by the above). - // Note we must do this even if the trigger is finished so that we are sure to cleanup - // any final trigger finished bits. - checkState( - windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), - "Unexpected zero getAllowedLateness"); - WindowTracing.debug( - "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), + // Perform processing now that everything is prefetched. + for (EnrichedTimerData timer : enrichedTimers) { + ReduceFn.Context directContext = timer.directContext; + ReduceFn.Context renamedContext = timer.renamedContext; + + if (timer.isGarbageCollection) { + WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), timer.timestamp, + timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Cleanup time %s is beyond end-of-time", cleanupTime); - directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); + + boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen(); + if (windowIsActiveAndOpen) { + // We need to call onTrigger to emit the final pane if required. + // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, + // and the watermark has passed the end of the window. + @Nullable + Instant newHold = onTrigger( + directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow); + checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); + } + + // Cleanup flavor B: Clear all the remaining state for this window since we'll never + // see elements for it again. + clearAllState(directContext, renamedContext, windowIsActiveAndOpen); + } else { + WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), timer.timestamp, + timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + if (timer.windowIsActiveAndOpen() + && triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())) { + emit(directContext, renamedContext); + } + + if (timer.isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emit. + // We could assert this but it is very expensive. + + // Since we are processing an on-time firing we should schedule the garbage collection + // timer. (If getAllowedLateness is zero then the timer event will be considered a + // cleanup event and handled by the above). + // Note we must do this even if the trigger is finished so that we are sure to cleanup + // any final trigger finished bits. + checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), + "Unexpected zero getAllowedLateness"); + Instant cleanupTime = garbageCollectionTime(directContext.window()); + WindowTracing.debug( + "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Cleanup time %s is beyond end-of-time", cleanupTime); + directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME); + } } } } @@ -652,7 +779,7 @@ public void onTimer(TimerData timer) throws Exception { * Clear all the state associated with {@code context}'s window. * Should only be invoked if we know all future elements for this window will be considered * beyond allowed lateness. - * This is a superset of the clearing done by {@link #emitIfAppropriate} below since: + * This is a superset of the clearing done by {@link #emit} below since: *
    *
  1. We can clear the trigger finished bits since we'll never need to ask if the trigger is * closed again. @@ -678,10 +805,10 @@ private void clearAllState( } else { // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2). // For (1), if !activeWindows.isActive then the window must be merging and has been - // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired + // explicitly removed by emit. But in that case the trigger must have fired // and been closed, so this case reduces to (2). // For (2), if triggerRunner.isClosed then the trigger was fired and entered the - // closed state. In that case emitIfAppropriate will have cleared all state in + // closed state. In that case emit will have cleared all state in // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows. // We also know nonEmptyPanes must have been unconditionally cleared by the trigger. // Since the trigger fired the existing watermark holds must have been cleared, and since @@ -723,17 +850,23 @@ private boolean shouldDiscardAfterFiring(boolean isFinished) { return false; } + private void prefetchEmit(ReduceFn.Context directContext, + ReduceFn.Context renamedContext) { + triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); + triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); + triggerRunner.prefetchIsClosed(directContext.state()); + prefetchOnTrigger(directContext, renamedContext); + } + /** - * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state. + * Emit if a trigger is ready to fire or timers require it, and cleanup state. */ - private void emitIfAppropriate(ReduceFn.Context directContext, + private void emit( + ReduceFn.Context directContext, ReduceFn.Context renamedContext) throws Exception { - if (!triggerRunner.shouldFire( - directContext.window(), directContext.timers(), directContext.state())) { - // Ignore unless trigger is ready to fire - return; - } + checkState(triggerRunner.shouldFire( + directContext.window(), directContext.timers(), directContext.state())); // Inform the trigger of the transition to see if it is finished triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state()); @@ -768,7 +901,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon } /** - * Do we need to emit a pane? + * Do we need to emit? */ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { if (!isEmpty) { @@ -786,6 +919,15 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing return false; } + private void prefetchOnTrigger( + final ReduceFn.Context directContext, + ReduceFn.Context renamedContext) { + paneInfoTracker.prefetchPaneInfo(directContext); + watermarkHold.prefetchExtract(renamedContext); + nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); + reduceFn.prefetchOnTrigger(directContext.state()); + } + /** * Run the {@link ReduceFn#onTrigger} method and produce any necessary output. * @@ -799,25 +941,17 @@ private Instant onTrigger( throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); - // Prefetch necessary states - ReadableState outputTimestampFuture = - watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); - ReadableState paneFuture = - paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); - ReadableState isEmptyFuture = - nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); - - reduceFn.prefetchOnTrigger(directContext.state()); - triggerRunner.prefetchOnFire(directContext.window(), directContext.state()); - // Calculate the pane info. - final PaneInfo pane = paneFuture.read(); - // Extract the window hold, and as a side effect clear it. + final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); - WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + // Extract the window hold, and as a side effect clear it. + final WatermarkHold.OldAndNewHolds pair = + watermarkHold.extractAndRelease(renamedContext, isFinished).read(); final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; + final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read(); + if (newHold != null) { // We can't be finished yet. checkState( @@ -849,11 +983,11 @@ private Instant onTrigger( } // Only emit a pane if it has data or empty panes are observable. - if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { + if (needToEmit(isEmpty, isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. final List windows = Collections.singletonList(directContext.window()); ReduceFn.OnTriggerContext renamedTriggerContext = - contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED, + contextFactory.forTrigger(directContext.window(), pane, StateStyle.RENAMED, new OnTriggerCallbacks() { @Override public void output(OutputT toOutput) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index e59a1c20d7..672d846bb6 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -90,30 +90,31 @@ private void clearFinishedBits(ValueState state) { } state.clear(); } + /** Return true if the trigger is closed in the window corresponding to the specified state. */ public boolean isClosed(StateAccessor state) { return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); } - public void prefetchForValue(W window, StateAccessor state) { + public void prefetchIsClosed(StateAccessor state) { if (isFinishedSetNeeded()) { state.access(FINISHED_BITS_TAG).readLater(); } + } + + public void prefetchForValue(W window, StateAccessor state) { + prefetchIsClosed(state); rootTrigger.getSpec().prefetchOnElement( contextFactory.createStateAccessor(window, rootTrigger)); } public void prefetchOnFire(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } + prefetchIsClosed(state); rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); } public void prefetchShouldFire(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } + prefetchIsClosed(state); rootTrigger.getSpec().prefetchShouldFire( contextFactory.createStateAccessor(window, rootTrigger)); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index f23e5eb9e5..06de70fe5f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -443,6 +443,11 @@ public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { } } + public void prefetchExtract(final ReduceFn.Context context) { + context.state().access(elementHoldTag).readLater(); + context.state().access(EXTRA_HOLD_TAG).readLater(); + } + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternalsTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternalsTest.java index 25d07d62d7..3228c544bf 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternalsTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternalsTest.java @@ -15,6 +15,8 @@ */ package com.google.cloud.dataflow.sdk.util; +import static org.mockito.Matchers.argThat; + import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.state.StateNamespace; import com.google.cloud.dataflow.sdk.util.state.StateNamespaceForTest; @@ -24,10 +26,15 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + /** * Tests for {@link BatchTimerInternals}. */ @@ -44,6 +51,37 @@ public void setUp() { MockitoAnnotations.initMocks(this); } + private static class TimersAre extends ArgumentMatcher> { + final List expectedTimers; + TimersAre(List timers) { + expectedTimers = timers; + } + + @Override + public boolean matches(Object actual) { + if (actual == null || !(actual instanceof Iterable)) { + return false; + } + @SuppressWarnings("unchecked") + Iterable timers = (Iterable) actual; + + List actualTimers = new ArrayList(); + for (TimerData timer : timers) { + actualTimers.add(timer); + } + return expectedTimers.equals(actualTimers); + } + + @Override + public String toString() { + return "ordered timers " + expectedTimers.toString(); + } + } + + private static TimersAre timersAre(TimerData... timers) { + return new TimersAre(Arrays.asList(timers)); + } + @Test public void testFiringTimers() throws Exception { BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); @@ -54,7 +92,7 @@ public void testFiringTimers() throws Exception { underTest.setTimer(processingTime2); underTest.advanceProcessingTime(mockRunner, new Instant(20)); - Mockito.verify(mockRunner).onTimer(processingTime1); + Mockito.verify(mockRunner).onTimers(argThat(timersAre(processingTime1))); Mockito.verifyNoMoreInteractions(mockRunner); // Advancing just a little shouldn't refire @@ -63,13 +101,13 @@ public void testFiringTimers() throws Exception { // Adding the timer and advancing a little should refire underTest.setTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime1); underTest.advanceProcessingTime(mockRunner, new Instant(21)); + Mockito.verify(mockRunner, Mockito.times(2)).onTimers(argThat(timersAre(processingTime1))); Mockito.verifyNoMoreInteractions(mockRunner); // And advancing the rest of the way should still have the other timer underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime2); + Mockito.verify(mockRunner).onTimers(argThat(timersAre(processingTime2))); Mockito.verifyNoMoreInteractions(mockRunner); } @@ -87,13 +125,11 @@ public void testTimerOrdering() throws Exception { underTest.setTimer(watermarkTime2); underTest.advanceInputWatermark(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(watermarkTime1); - Mockito.verify(mockRunner).onTimer(watermarkTime2); + Mockito.verify(mockRunner).onTimers(argThat(timersAre(watermarkTime1, watermarkTime2))); Mockito.verifyNoMoreInteractions(mockRunner); underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime2); + Mockito.verify(mockRunner).onTimers(argThat(timersAre(processingTime1, processingTime2))); Mockito.verifyNoMoreInteractions(mockRunner); } @@ -107,10 +143,9 @@ public void testDeduplicate() throws Exception { underTest.setTimer(processingTime); underTest.setTimer(processingTime); underTest.advanceProcessingTime(mockRunner, new Instant(20)); + Mockito.verify(mockRunner).onTimers(argThat(timersAre(processingTime))); underTest.advanceInputWatermark(mockRunner, new Instant(20)); - - Mockito.verify(mockRunner).onTimer(processingTime); - Mockito.verify(mockRunner).onTimer(watermarkTime); + Mockito.verify(mockRunner).onTimers(argThat(timersAre(watermarkTime))); Mockito.verifyNoMoreInteractions(mockRunner); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java index e7da19f4d4..f206a4cfb9 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java @@ -418,8 +418,10 @@ public WindowedValue apply(TimestampedValue input) { public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception { ReduceFnRunner runner = createRunner(); - runner.onTimer( + ArrayList timers = new ArrayList(1); + timers.add( TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain)); + runner.onTimers(timers); runner.persist(); } @@ -760,20 +762,20 @@ private void advanceAndFire( PriorityQueue queue = queue(domain); boolean shouldFire = false; - do { - TimerData timer = queue.peek(); - // Timers fire when the current time progresses past the timer time. - shouldFire = timer != null && currentTime.isAfter(timer.getTimestamp()); - if (shouldFire) { - WindowTracing.trace( - "TestTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); - // Remove before firing, so that if the trigger adds another identical + while (true) { + ArrayList firedTimers = new ArrayList(); + while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + // Remove before firing, so that if the callback adds another identical // timer we don't remove it. - queue.remove(); - - runner.onTimer(timer); + TimerData timer = queue.remove(); + existingTimers.remove(timer); + firedTimers.add(timer); + } + if (firedTimers.isEmpty()) { + break; } - } while (shouldFire); + runner.onTimers(firedTimers); + } } } }