diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 66385c116ffb4..570f524636661 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -159,7 +159,7 @@ public boolean apply(WindowedValue input) { /** Is {@code window} expired w.r.t. the garbage collection watermark? */ private boolean canDropDueToExpiredWindow(BoundedWindow window) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + return LateDataUtils.garbageCollectionTime(window, windowingStrategy).isBefore(inputWM); } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index 732e60c6c6571..8a2b7c6508b00 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -24,15 +24,46 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.CounterCell; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; - +import org.joda.time.Duration; +import org.joda.time.Instant; /** * Utils to handle late data. */ public class LateDataUtils { + private LateDataUtils() {} + + /** + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. + */ + public static Instant garbageCollectionTime( + BoundedWindow window, WindowingStrategy windowingStrategy) { + return garbageCollectionTime(window, windowingStrategy.getAllowedLateness()); + } + + /** + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. + */ + public static Instant garbageCollectionTime(BoundedWindow window, Duration allowedLateness) { + + // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the + // global window, then we truncate it. The conditional is phrased like it is because the + // addition of EOW + allowed lateness might even overflow the maximum allowed Instant + if (GlobalWindow.INSTANCE + .maxTimestamp() + .minus(allowedLateness) + .isBefore(window.maxTimestamp())) { + return GlobalWindow.INSTANCE.maxTimestamp(); + } else { + return window.maxTimestamp().plus(allowedLateness); + } + } /** * Returns an {@code Iterable>} that only contains non-late input elements. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index d2ed835ba4634..62d519f6f8e5d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -663,7 +662,7 @@ private class EnrichedTimerData { W window = directContext.window(); this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); - Instant cleanupTime = garbageCollectionTime(window); + Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); } @@ -769,9 +768,11 @@ public void onTimers(Iterable timers) throws Exception { // cleanup event and handled by the above). // Note we must do this even if the trigger is finished so that we are sure to cleanup // any final trigger finished bits. - checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), + checkState( + windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), "Unexpected zero getAllowedLateness"); - Instant cleanupTime = garbageCollectionTime(directContext.window()); + Instant cleanupTime = + LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); WindowTracing.debug( "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", @@ -957,6 +958,7 @@ private Instant onTrigger( // Extract the window hold, and as a side effect clear it. final WatermarkHold.OldAndNewHolds pair = watermarkHold.extractAndRelease(renamedContext, isFinished).read(); + // TODO: This isn't accurate if the elements are late. See BEAM-2262 final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; @@ -972,11 +974,12 @@ private Instant onTrigger( if (newHold.isAfter(directContext.window().maxTimestamp())) { // The hold must be for garbage collection, which can't have happened yet. checkState( - newHold.isEqual(garbageCollectionTime(directContext.window())), - "new hold %s should be at garbage collection for window %s plus %s", - newHold, - directContext.window(), - windowingStrategy.getAllowedLateness()); + newHold.isEqual( + LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); } else { // The hold must be for the end-of-window, which can't have happened yet. checkState( @@ -1042,7 +1045,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer( String which; Instant timer; if (endOfWindow.isBefore(inputWM)) { - timer = garbageCollectionTime(directContext.window()); + timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); which = "garbage collection"; } else { timer = endOfWindow; @@ -1072,28 +1075,10 @@ private void cancelEndOfWindowAndGarbageCollectionTimers( timerInternals.currentOutputWatermarkTime()); Instant eow = directContext.window().maxTimestamp(); directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME); - Instant gc = garbageCollectionTime(directContext.window()); + Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); if (gc.isAfter(eow)) { directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME); } } - /** - * Return when {@code window} should be garbage collected. If the window's expiration time is on - * or after the end of the global window, it will be truncated to the end of the global window. - */ - private Instant garbageCollectionTime(W window) { - - // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the - // global window, then we truncate it. The conditional is phrased like it is because the - // addition of EOW + allowed lateness might even overflow the maximum allowed Instant - if (GlobalWindow.INSTANCE - .maxTimestamp() - .minus(windowingStrategy.getAllowedLateness()) - .isBefore(window.maxTimestamp())) { - return GlobalWindow.INSTANCE.maxTimestamp(); - } else { - return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - } - } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 85423c0431b46..7ca305e51d0a5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -949,7 +949,7 @@ public Timer align(Duration period) { */ private Instant minTargetAndGcTime(Instant target) { if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); if (target.isAfter(windowExpiry)) { return windowExpiry; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 28a9deef87950..c68a94319e9aa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -104,7 +104,7 @@ public void processElement(WindowedValue input) { } private boolean isLate(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); Instant inputWM = cleanupTimer.currentInputWatermarkTime(); return gcTime.isBefore(inputWM); } @@ -208,7 +208,7 @@ public Instant currentInputWatermarkTime() { @Override public void setForWindow(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); // make sure this fires after any window.maxTimestamp() timers gcTime = gcTime.plus(GC_DELAY_MS); timerInternals.setTimer(StateNamespaces.window(windowCoder, window), @@ -222,7 +222,7 @@ public boolean isForWindow( Instant timestamp, TimeDomain timeDomain) { boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); gcTime = gcTime.plus(GC_DELAY_MS); return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 64f5d9b165cc0..13e4c43d66a67 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -365,8 +365,7 @@ private Instant addGarbageCollectionHold( ReduceFn.Context context, boolean paneIsEmpty) { Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eow = context.window().maxTimestamp(); - Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness()); + Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy); if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { WindowTracing.trace( @@ -387,6 +386,12 @@ private Instant addGarbageCollectionHold( return null; } + if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + // If the garbage collection hold is past the timestamp we can represent, instead truncate + // to the maximum timestamp that is not positive infinity. This ensures all windows will + // eventually be garbage collected. + gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L)); + } checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java new file mode 100644 index 0000000000000..f0f315d39328f --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.ReadableInstant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LateDataUtils}. + */ +@RunWith(JUnit4.class) +public class LateDataUtilsTest { + @Test + public void beforeEndOfGlobalWindowSame() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5)); + Duration allowedLateness = Duration.standardMinutes(2); + WindowingStrategy strategy = + WindowingStrategy.globalDefault() + .withWindowFn(windowFn) + .withAllowedLateness(allowedLateness); + + IntervalWindow window = windowFn.assignWindow(new Instant(10)); + assertThat( + LateDataUtils.garbageCollectionTime(window, strategy), + equalTo(window.maxTimestamp().plus(allowedLateness))); + } + + @Test + public void garbageCollectionTimeAfterEndOfGlobalWindow() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5)); + WindowingStrategy strategy = + WindowingStrategy.globalDefault() + .withWindowFn(windowFn); + + IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat( + window.maxTimestamp(), + Matchers.greaterThan(GlobalWindow.INSTANCE.maxTimestamp())); + assertThat( + LateDataUtils.garbageCollectionTime(window, strategy), + equalTo(GlobalWindow.INSTANCE.maxTimestamp())); + } + + @Test + public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5)); + Duration allowedLateness = Duration.millis(Long.MAX_VALUE); + WindowingStrategy strategy = + WindowingStrategy.globalDefault() + .withWindowFn(windowFn) + .withAllowedLateness(allowedLateness); + + IntervalWindow window = windowFn.assignWindow(new Instant(-100)); + assertThat( + window.maxTimestamp().plus(allowedLateness), + Matchers.greaterThan(GlobalWindow.INSTANCE.maxTimestamp())); + assertThat( + LateDataUtils.garbageCollectionTime(window, strategy), + equalTo(GlobalWindow.INSTANCE.maxTimestamp())); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java index 0b1ab25886727..990f235456f47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; /** * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original @@ -63,6 +64,11 @@ public void processElement(ProcessContext context) { private static class ExtractTimestampedValueDoFn extends DoFn>, KV> { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + @ProcessElement public void processElement(ProcessContext context) { KV> kv = context.element(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 5394826e8551f..3b7122c656f04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -71,22 +71,27 @@ public PCollection> expand(PCollection> input) { .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - return input.apply(rewindow) + return input + .apply(rewindow) .apply("ReifyOriginalTimestamps", ReifyTimestamps.inValues()) .apply(GroupByKey.>create()) // Set the windowing strategy directly, so that it doesn't get counted as the user having // set allowed lateness. .setWindowingStrategyInternal(originalStrategy) - .apply("ExpandIterable", ParDo.of( - new DoFn>>, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - K key = c.element().getKey(); - for (TimestampedValue value : c.element().getValue()) { - c.output(KV.of(key, value)); - } - } - })) - .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues()); + .apply( + "ExpandIterable", + ParDo.of( + new DoFn>>, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + K key = c.element().getKey(); + for (TimestampedValue value : c.element().getValue()) { + c.output(KV.of(key, value)); + } + } + })) + .apply( + "RestoreOriginalTimestamps", + ReifyTimestamps.extractFromValues()); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java index 181433eaafc18..e8728423743dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java @@ -101,4 +101,40 @@ public void verifyTimestampsEqualValue(ProcessContext context) { pipeline.run(); } + + @Test + @Category(ValidatesRunner.class) + public void extractFromValuesWhenValueTimestampedLaterSucceeds() { + PCollection>> preified = + pipeline.apply( + Create.timestamped( + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(0, new Instant((0)))), new Instant(100)), + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(1, new Instant(1))), new Instant(101L)), + TimestampedValue.of( + KV.of("bar", TimestampedValue.of(2, new Instant(2))), new Instant(102L)), + TimestampedValue.of( + KV.of("baz", TimestampedValue.of(3, new Instant(3))), new Instant(103L)))); + + PCollection> timestamped = + preified.apply(ReifyTimestamps.extractFromValues()); + + PAssert.that(timestamped) + .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)); + + timestamped.apply( + "AssertElementTimestamps", + ParDo.of( + new DoFn, Void>() { + @ProcessElement + public void verifyTimestampsEqualValue(ProcessContext context) { + assertThat( + new Instant(context.element().getValue().longValue()), + equalTo(context.timestamp())); + } + })); + + pipeline.run(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 1038fd63810bf..3cd7cf9cc69f1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -27,8 +27,11 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -263,4 +266,28 @@ public void testReshuffleAfterSlidingWindows() { pipeline.run(); } + + @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testReshuffleWithTimestampsStreaming() { + TestStream stream = + TestStream.create(VarLongCoder.of()) + .advanceWatermarkTo(new Instant(0L).plus(Duration.standardDays(48L))) + .addElements( + TimestampedValue.of(0L, new Instant(0L)), + TimestampedValue.of(1L, new Instant(0L).plus(Duration.standardDays(48L))), + TimestampedValue.of( + 2L, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L)))) + .advanceWatermarkToInfinity(); + PCollection> input = + pipeline + .apply(stream).apply(WithKeys.of("")) + .apply( + Window.>into(FixedWindows.of(Duration.standardMinutes(10L)))); + + PCollection> reshuffled = input.apply(Reshuffle.of()); + PAssert.that(reshuffled.apply(Values.create())).containsInAnyOrder(0L, 1L, 2L); + + pipeline.run(); + } }