Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3052] ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always #3988

Merged
merged 4 commits into from Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -45,7 +45,6 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
Expand Down Expand Up @@ -596,28 +595,12 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);

nonEmptyPanes.recordContent(renamedContext.state());

// Make sure we've scheduled the end-of-window or garbage collection timer for this window.
Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
scheduleGarbageCollectionTimer(directContext);

// Hold back progress of the output watermark until we have processed the pane this
// element will be included within. If the element is too late for that, place a hold at
// the end-of-window or garbage collection time to allow empty panes to contribute elements
// which won't be dropped due to lateness by a following computation (assuming the following
// computation uses the same allowed lateness value...)
@Nullable Instant hold = watermarkHold.addHolds(renamedContext);

if (hold != null) {
// Assert that holds have a proximate timer.
boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
checkState(
holdInWindow == timerInWindow,
"set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
hold,
timer,
directContext.window());
}
// element will be included within. If the element is later than the output watermark, the
// hold will be at GC time.
watermarkHold.addHolds(renamedContext);

// Execute the reduceFn, which will buffer the value as appropriate
reduceFn.processValue(renamedContext);
Expand Down Expand Up @@ -1070,48 +1053,33 @@ public void output(OutputT toOutput) {
}

/**
* Make sure we'll eventually have a timer fire which will tell us to garbage collect
* the window state. For efficiency we may need to do this in two steps rather
* than one. Return the time at which the timer will fire.
* Schedule a timer to garbage collect the window.
*
* <p>The timer:
*
* <ul>
* <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
* For simplicity we'll set our own timer for this situation even though an
* {@link AfterWatermark} trigger may have also set an end-of-window timer.
* ({@code setTimer} is idempotent.)
* <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
* collection time. However if the windows are large (eg hourly) and the allowedLateness is small
* (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
* instead set an end-of-window timer and then roll that forward to a garbage collection timer
* when it fires. We use the input watermark to distinguish those cases.
* <li>...must be fired strictly after the expiration of the window.
* <li>...should be as close to the expiration as possible, to have a timely output of
* remaining buffered data, and GC.
* </ul>
*/
private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
ReduceFn<?, ?, ?, W>.Context directContext) {
private void scheduleGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context directContext) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant endOfWindow = directContext.window().maxTimestamp();
String which;
Instant timer;
if (endOfWindow.isBefore(inputWM)) {
timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
which = "garbage collection";
} else {
timer = endOfWindow;
which = "end-of-window";
}
Instant gcTime =
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
WindowTracing.trace(
"ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
"ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at {} for "
+ "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
which,
timer,
gcTime,
key,
directContext.window(),
inputWM,
timerInternals.currentOutputWatermarkTime());
checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Timer %s is beyond end-of-time", timer);
directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
return timer;
checkState(
!gcTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Timer %s is beyond end-of-time",
gcTime);
directContext.timers().setTimer(gcTime, TimeDomain.EVENT_TIME);
}

private void cancelEndOfWindowAndGarbageCollectionTimers(
Expand Down
Expand Up @@ -25,9 +25,9 @@
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
Expand Down Expand Up @@ -84,112 +84,22 @@ public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> wind

/**
* Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
* of the element in {@code context}. We allow the actual hold time to be shifted later by the
* {@link TimestampCombiner}, but no further than the end of the window. The hold will
* remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
* was placed, or {@literal null} if no hold was placed.
* of the element in {@code context}.
*
* <p>In the following we'll write {@code E} to represent an element's timestamp after passing
* through the window strategy's output time function, {@code IWM} for the local input watermark,
* {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
* watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right,
* and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
* <p>The target time for the aggregated output is shifted by the {@link WindowFn} and combined
* with a {@link TimestampCombiner} to determine where the output watermark is held.
*
* <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
* is {@code ZERO}.
* <p>If the target time would be late, then we do not set this hold, but instead add the hold
* to allow a final output at GC time.
*
* <p>Here are the cases we need to handle. They are conceptually considered in the
* sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
* <ol>
* <li>(Normal)
* <pre>
* |
* [ | E ]
* |
* IWM
* </pre>
* This is, hopefully, the common and happy case. The element is locally on-time and can
* definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer
* for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
* timestamp (depending on the output time function). Thus the OWM will not proceed past E
* until the next pane fires.
*
* <li>(Discard - no target window)
* <pre>
* | |
* [ E ] | |
* | |
* GCWM <-getAllowedLateness-> IWM
* </pre>
* The element is very locally late. The window has been garbage collected, thus there
* is no target pane E could be assigned to. We discard E.
*
* <li>(Unobservably late)
* <pre>
* | |
* [ | E | ]
* | |
* OWM IWM
* </pre>
* The element is locally late, however we can still treat this case as for 'Normal' above
* since the IWM has not yet passed the end of the window and the element is ahead of the
* OWM. In effect, we get to 'launder' the locally late element and consider it as locally
* on-time because no downstream computation can observe the difference.
*
* <li>(Maybe late 1)
* <pre>
* | |
* [ | E ] |
* | |
* OWM IWM
* </pre>
* The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
* pane may have already been emitted. However, if timer firings have been delayed then it
* is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
* hold since we can't be sure if it will be cleared promptly. Thus this element *may* find
* its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
* timestamp. We may however set a garbage collection hold if required.
*
* <li>(Maybe late 2)
* <pre>
* | |
* [ E | | ]
* | |
* OWM IWM
* </pre>
* The end-of-window timer has not yet fired, so this element may still appear in an
* {@code ON_TIME} pane. However the element is too late to contribute to the output
* watermark hold, and thus won't contribute to the pane's timestamp. We can still place an
* end-of-window hold.
*
* <li>(Maybe late 3)
* <pre>
* | |
* [ E | ] |
* | |
* OWM IWM
* </pre>
* As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
* has already fired, or it is about to fire. We can place only the garbage collection hold,
* if required.
*
* <li>(Definitely late)
* <pre>
* | |
* [ E ] | |
* | |
* OWM IWM
* </pre>
* The element is definitely too late to make an {@code ON_TIME} pane. We are too late to
* place an end-of-window hold. We can still place a garbage collection hold if required.
*
* </ol>
* <p>See https://s.apache.org/beam-lateness for the full design of how late data and watermarks
* interact.
*/
@Nullable
public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
Instant hold = addElementHold(context);
if (hold == null) {
hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
hold = addGarbageCollectionHold(context, false /*paneIsEmpty*/);
}
return hold;
}
Expand Down Expand Up @@ -267,95 +177,23 @@ private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context)
return tooLate ? null : elementHold;
}

/**
* Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
* Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added.
*/
@Nullable
private Instant addEndOfWindowOrGarbageCollectionHolds(
ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
Instant hold = addEndOfWindowHold(context, paneIsEmpty);
if (hold == null) {
hold = addGarbageCollectionHold(context, paneIsEmpty);
}
return hold;
}

/**
* Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added
* (ie the end of window time), or {@literal null} if no end of window hold is possible and we
* should fallback to a garbage collection hold.
*
* <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
* to clear it. In other words, the input watermark cannot be ahead of the end of window time.
*
* <p>An end-of-window hold is added in two situations:
* <ol>
* <li>An incoming element came in behind the output watermark (so we are too late for placing
* the usual element hold), but it may still be possible to include the element in an
* {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
* not be considered late by any downstream computation.
* <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at
* least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in
* a pane are processed due to a fired trigger we must set both an end of window timer and an end
* of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered
* late by any downstream computation.
* </ol>
*/
@Nullable
private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant eowHold = context.window().maxTimestamp();

if (eowHold.isBefore(inputWM)) {
WindowTracing.trace(
"WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
+ "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
eowHold, context.key(), context.window(), inputWM, outputWM);
return null;
}

checkState(outputWM == null || !eowHold.isBefore(outputWM),
"End-of-window hold %s cannot be before output watermark %s",
eowHold, outputWM);
checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"End-of-window hold %s is beyond end-of-time", eowHold);
// If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
// the hold away from the combining function in elementHoldTag.
// However if !paneIsEmpty then it could make sense to use the elementHoldTag here.
// Alas, onMerge is forced to add an end of window or garbage collection hold without
// knowing whether an element hold is already in place (stopping to check is too expensive).
// This it would end up adding an element hold at the end of the window which could
// upset the elementHoldTag combining function.
context.state().access(EXTRA_HOLD_TAG).add(eowHold);
WindowTracing.trace(
"WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
eowHold, context.key(), context.window(), inputWM, outputWM);
return eowHold;
}

/**
* Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at
* which the hold was added (ie the end of window time plus allowed lateness),
* or {@literal null} if no hold was added.
*
* <p>We only add the hold if it is distinct from what would be added by
* {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
* must be non-zero.
* which the hold was added (ie the end of window time plus allowed lateness), or {@literal null}
* if no hold was added.
*
* <p>A garbage collection hold is added in two situations:
*
* <ol>
* <li>An incoming element came in behind the output watermark, and was too late for placing
* the usual element hold or an end of window hold. Place the garbage collection hold so that
* we can guarantee when the pane is finally triggered its output will not be dropped due to
* excessive lateness by any downstream computation.
* <li>The {@link WindowingStrategy#getClosingBehavior()} is
* {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
* for all windows which saw at least one element. Again, the garbage collection hold guarantees
* that any empty final pane can be given a timestamp which will not be considered beyond
* allowed lateness by any downstream computation.
* <li>An incoming element has a timestamp earlier than the output watermark, and was too late
* for placing the usual element hold or an end of window hold. Place the garbage collection
* hold so that we can guarantee when the pane is finally triggered its output will not be
* dropped due to excessive lateness by any downstream computation.
* <li>The {@link WindowingStrategy#getClosingBehavior()} is {@link
* ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted for all
* windows which saw at least one element. Again, the garbage collection hold guarantees
* that any empty final pane can be given a timestamp which will not be considered beyond
* allowed lateness by any downstream computation.
* </ol>
*
* <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
Expand All @@ -367,12 +205,11 @@ private Instant addGarbageCollectionHold(
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy);

if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
if (gcHold.isBefore(inputWM)) {
WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+ "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
+ "outputWatermark:{}",
gcHold, context.key(), context.window(), inputWM, outputWM);
"{}.addGarbageCollectionHold: gc hold would be before the input watermark "
+ "for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}",
getClass().getSimpleName(), context.key(), context.window(), inputWM, outputWM);
return null;
}

Expand Down Expand Up @@ -432,7 +269,7 @@ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
// the hold depends on the min of the element timestamps.
// At least one merged window must be non-empty for the merge to have been triggered.
StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
addGarbageCollectionHold(context, false /*paneIsEmpty*/);
}

/**
Expand Down Expand Up @@ -497,7 +334,7 @@ public OldAndNewHolds read() {
oldHold = extraHold;
}
if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
// If no hold (eg because all elements came in behind the output watermark), or
// If no hold (eg because all elements came in before the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
Expand All @@ -514,9 +351,7 @@ public OldAndNewHolds read() {

@Nullable Instant newHold = null;
if (!isFinished) {
// Only need to leave behind an end-of-window or garbage collection hold
// if future elements will be processed.
newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
newHold = addGarbageCollectionHold(context, true /*paneIsEmpty*/);
}

return new OldAndNewHolds(oldHold, newHold);
Expand Down