Skip to content

Commit

Permalink
Use a consistent calculation for GC Time
Browse files Browse the repository at this point in the history
Truncate all garbage collection timestamps to be at the end of the
global window at the latest.

Add a reshuffle test, which was failing when late data arrived.

Update ReifyTimestamps to permit infinite skew. Elements that have
timestamps extracted from them may be late, but that is not the concern
of ReifyTimestamps.
  • Loading branch information
tgroh committed May 11, 2017
1 parent 66b3ce0 commit e70fc86
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 49 deletions.
Expand Up @@ -159,7 +159,7 @@ public boolean apply(WindowedValue<InputT> input) {
/** Is {@code window} expired w.r.t. the garbage collection watermark? */
private boolean canDropDueToExpiredWindow(BoundedWindow window) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
return LateDataUtils.garbageCollectionTime(window, windowingStrategy).isBefore(inputWM);
}
}
}
Expand Up @@ -24,15 +24,46 @@
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;

import org.joda.time.Duration;
import org.joda.time.Instant;

/**
* Utils to handle late data.
*/
public class LateDataUtils {
private LateDataUtils() {}

/**
* Return when {@code window} should be garbage collected. If the window's expiration time is on
* or after the end of the global window, it will be truncated to the end of the global window.
*/
public static Instant garbageCollectionTime(
BoundedWindow window, WindowingStrategy windowingStrategy) {
return garbageCollectionTime(window, windowingStrategy.getAllowedLateness());
}

/**
* Return when {@code window} should be garbage collected. If the window's expiration time is on
* or after the end of the global window, it will be truncated to the end of the global window.
*/
public static Instant garbageCollectionTime(BoundedWindow window, Duration allowedLateness) {

// If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
// global window, then we truncate it. The conditional is phrased like it is because the
// addition of EOW + allowed lateness might even overflow the maximum allowed Instant
if (GlobalWindow.INSTANCE
.maxTimestamp()
.minus(allowedLateness)
.isBefore(window.maxTimestamp())) {
return GlobalWindow.INSTANCE.maxTimestamp();
} else {
return window.maxTimestamp().plus(allowedLateness);
}
}

/**
* Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late input elements.
Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
Expand Down Expand Up @@ -663,7 +662,7 @@ private class EnrichedTimerData {
W window = directContext.window();
this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
Instant cleanupTime = garbageCollectionTime(window);
Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
}

Expand Down Expand Up @@ -769,9 +768,11 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
// cleanup event and handled by the above).
// Note we must do this even if the trigger is finished so that we are sure to cleanup
// any final trigger finished bits.
checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
checkState(
windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
"Unexpected zero getAllowedLateness");
Instant cleanupTime = garbageCollectionTime(directContext.window());
Instant cleanupTime =
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
WindowTracing.debug(
"ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
Expand Down Expand Up @@ -957,6 +958,7 @@ private Instant onTrigger(
// Extract the window hold, and as a side effect clear it.
final WatermarkHold.OldAndNewHolds pair =
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
// TODO: This isn't accurate if the elements are late. See BEAM-2262
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;

Expand All @@ -972,11 +974,12 @@ private Instant onTrigger(
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
checkState(
newHold.isEqual(garbageCollectionTime(directContext.window())),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
newHold.isEqual(
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
checkState(
Expand Down Expand Up @@ -1042,7 +1045,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
String which;
Instant timer;
if (endOfWindow.isBefore(inputWM)) {
timer = garbageCollectionTime(directContext.window());
timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
which = "garbage collection";
} else {
timer = endOfWindow;
Expand Down Expand Up @@ -1072,28 +1075,10 @@ private void cancelEndOfWindowAndGarbageCollectionTimers(
timerInternals.currentOutputWatermarkTime());
Instant eow = directContext.window().maxTimestamp();
directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
Instant gc = garbageCollectionTime(directContext.window());
Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
if (gc.isAfter(eow)) {
directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME);
}
}

/**
* Return when {@code window} should be garbage collected. If the window's expiration time is on
* or after the end of the global window, it will be truncated to the end of the global window.
*/
private Instant garbageCollectionTime(W window) {

// If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
// global window, then we truncate it. The conditional is phrased like it is because the
// addition of EOW + allowed lateness might even overflow the maximum allowed Instant
if (GlobalWindow.INSTANCE
.maxTimestamp()
.minus(windowingStrategy.getAllowedLateness())
.isBefore(window.maxTimestamp())) {
return GlobalWindow.INSTANCE.maxTimestamp();
} else {
return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
}
}
}
Expand Up @@ -949,7 +949,7 @@ public Timer align(Duration period) {
*/
private Instant minTargetAndGcTime(Instant target) {
if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness);
if (target.isAfter(windowExpiry)) {
return windowExpiry;
}
Expand Down
Expand Up @@ -104,7 +104,7 @@ public void processElement(WindowedValue<InputT> input) {
}

private boolean isLate(BoundedWindow window) {
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
Instant inputWM = cleanupTimer.currentInputWatermarkTime();
return gcTime.isBefore(inputWM);
}
Expand Down Expand Up @@ -208,7 +208,7 @@ public Instant currentInputWatermarkTime() {

@Override
public void setForWindow(BoundedWindow window) {
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
// make sure this fires after any window.maxTimestamp() timers
gcTime = gcTime.plus(GC_DELAY_MS);
timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
Expand All @@ -222,7 +222,7 @@ public boolean isForWindow(
Instant timestamp,
TimeDomain timeDomain) {
boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
gcTime = gcTime.plus(GC_DELAY_MS);
return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
}
Expand Down
Expand Up @@ -365,8 +365,7 @@ private Instant addGarbageCollectionHold(
ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant eow = context.window().maxTimestamp();
Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy);

if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
WindowTracing.trace(
Expand All @@ -387,6 +386,12 @@ private Instant addGarbageCollectionHold(
return null;
}

if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// If the garbage collection hold is past the timestamp we can represent, instead truncate
// to the maximum timestamp that is not positive infinity. This ensures all windows will
// eventually be garbage collected.
gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
}
checkState(!gcHold.isBefore(inputWM),
"Garbage collection hold %s cannot be before input watermark %s",
gcHold, inputWM);
Expand Down
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.core;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Tests for {@link LateDataUtils}.
*/
@RunWith(JUnit4.class)
public class LateDataUtilsTest {
@Test
public void beforeEndOfGlobalWindowSame() {
FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
Duration allowedLateness = Duration.standardMinutes(2);
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault()
.withWindowFn(windowFn)
.withAllowedLateness(allowedLateness);

IntervalWindow window = windowFn.assignWindow(new Instant(10));
assertThat(
LateDataUtils.garbageCollectionTime(window, strategy),
equalTo(window.maxTimestamp().plus(allowedLateness)));
}

@Test
public void garbageCollectionTimeAfterEndOfGlobalWindow() {
FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault()
.withWindowFn(windowFn);

IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(
window.maxTimestamp(),
Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
assertThat(
LateDataUtils.garbageCollectionTime(window, strategy),
equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
}

@Test
public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() {
FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
Duration allowedLateness = Duration.millis(Long.MAX_VALUE);
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault()
.withWindowFn(windowFn)
.withAllowedLateness(allowedLateness);

IntervalWindow window = windowFn.assignWindow(new Instant(-100));
assertThat(
window.maxTimestamp().plus(allowedLateness),
Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
assertThat(
LateDataUtils.garbageCollectionTime(window, strategy),
equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;

/**
* {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original
Expand Down Expand Up @@ -63,6 +64,11 @@ public void processElement(ProcessContext context) {

private static class ExtractTimestampedValueDoFn<K, V>
extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
}

@ProcessElement
public void processElement(ProcessContext context) {
KV<K, TimestampedValue<V>> kv = context.element();
Expand Down
Expand Up @@ -71,22 +71,27 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));

return input.apply(rewindow)
return input
.apply(rewindow)
.apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
.apply(GroupByKey.<K, TimestampedValue<V>>create())
// Set the windowing strategy directly, so that it doesn't get counted as the user having
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy)
.apply("ExpandIterable", ParDo.of(
new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
K key = c.element().getKey();
for (TimestampedValue<V> value : c.element().getValue()) {
c.output(KV.of(key, value));
}
}
}))
.apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, V>extractFromValues());
.apply(
"ExpandIterable",
ParDo.of(
new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
K key = c.element().getKey();
for (TimestampedValue<V> value : c.element().getValue()) {
c.output(KV.of(key, value));
}
}
}))
.apply(
"RestoreOriginalTimestamps",
ReifyTimestamps.<K, V>extractFromValues());
}
}

0 comments on commit e70fc86

Please sign in to comment.