From e70fc86a7c475a3497c5c6a9a1b95c65a3fd8a5b Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 11 May 2017 09:26:30 -0700 Subject: [PATCH] Use a consistent calculation for GC Time Truncate all garbage collection timestamps to be at the end of the global window at the latest. Add a reshuffle test, which was failing when late data arrived. Update ReifyTimestamps to permit infinite skew. Elements that have timestamps extracted from them may be late, but that is not the concern of ReifyTimestamps. --- .../core/LateDataDroppingDoFnRunner.java | 2 +- .../beam/runners/core/LateDataUtils.java | 33 ++++++- .../beam/runners/core/ReduceFnRunner.java | 43 +++------ .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/StatefulDoFnRunner.java | 6 +- .../beam/runners/core/WatermarkHold.java | 9 +- .../beam/runners/core/LateDataUtilsTest.java | 90 +++++++++++++++++++ .../beam/sdk/transforms/ReifyTimestamps.java | 6 ++ .../apache/beam/sdk/transforms/Reshuffle.java | 29 +++--- .../sdk/transforms/ReifyTimestampsTest.java | 36 ++++++++ .../beam/sdk/transforms/ReshuffleTest.java | 27 ++++++ 11 files changed, 234 insertions(+), 49 deletions(-) create mode 100644 runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 66385c116ffb4..570f524636661 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -159,7 +159,7 @@ public boolean apply(WindowedValue input) { /** Is {@code window} expired w.r.t. the garbage collection watermark? */ private boolean canDropDueToExpiredWindow(BoundedWindow window) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + return LateDataUtils.garbageCollectionTime(window, windowingStrategy).isBefore(inputWM); } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java index 732e60c6c6571..8a2b7c6508b00 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -24,15 +24,46 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.metrics.CounterCell; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; - +import org.joda.time.Duration; +import org.joda.time.Instant; /** * Utils to handle late data. */ public class LateDataUtils { + private LateDataUtils() {} + + /** + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. + */ + public static Instant garbageCollectionTime( + BoundedWindow window, WindowingStrategy windowingStrategy) { + return garbageCollectionTime(window, windowingStrategy.getAllowedLateness()); + } + + /** + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. + */ + public static Instant garbageCollectionTime(BoundedWindow window, Duration allowedLateness) { + + // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the + // global window, then we truncate it. The conditional is phrased like it is because the + // addition of EOW + allowed lateness might even overflow the maximum allowed Instant + if (GlobalWindow.INSTANCE + .maxTimestamp() + .minus(allowedLateness) + .isBefore(window.maxTimestamp())) { + return GlobalWindow.INSTANCE.maxTimestamp(); + } else { + return window.maxTimestamp().plus(allowedLateness); + } + } /** * Returns an {@code Iterable>} that only contains non-late input elements. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index d2ed835ba4634..62d519f6f8e5d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -48,7 +48,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -663,7 +662,7 @@ private class EnrichedTimerData { W window = directContext.window(); this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); - Instant cleanupTime = garbageCollectionTime(window); + Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); } @@ -769,9 +768,11 @@ public void onTimers(Iterable timers) throws Exception { // cleanup event and handled by the above). // Note we must do this even if the trigger is finished so that we are sure to cleanup // any final trigger finished bits. - checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), + checkState( + windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO), "Unexpected zero getAllowedLateness"); - Instant cleanupTime = garbageCollectionTime(directContext.window()); + Instant cleanupTime = + LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); WindowTracing.debug( "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", @@ -957,6 +958,7 @@ private Instant onTrigger( // Extract the window hold, and as a side effect clear it. final WatermarkHold.OldAndNewHolds pair = watermarkHold.extractAndRelease(renamedContext, isFinished).read(); + // TODO: This isn't accurate if the elements are late. See BEAM-2262 final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; @@ -972,11 +974,12 @@ private Instant onTrigger( if (newHold.isAfter(directContext.window().maxTimestamp())) { // The hold must be for garbage collection, which can't have happened yet. checkState( - newHold.isEqual(garbageCollectionTime(directContext.window())), - "new hold %s should be at garbage collection for window %s plus %s", - newHold, - directContext.window(), - windowingStrategy.getAllowedLateness()); + newHold.isEqual( + LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); } else { // The hold must be for the end-of-window, which can't have happened yet. checkState( @@ -1042,7 +1045,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer( String which; Instant timer; if (endOfWindow.isBefore(inputWM)) { - timer = garbageCollectionTime(directContext.window()); + timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); which = "garbage collection"; } else { timer = endOfWindow; @@ -1072,28 +1075,10 @@ private void cancelEndOfWindowAndGarbageCollectionTimers( timerInternals.currentOutputWatermarkTime()); Instant eow = directContext.window().maxTimestamp(); directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME); - Instant gc = garbageCollectionTime(directContext.window()); + Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); if (gc.isAfter(eow)) { directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME); } } - /** - * Return when {@code window} should be garbage collected. If the window's expiration time is on - * or after the end of the global window, it will be truncated to the end of the global window. - */ - private Instant garbageCollectionTime(W window) { - - // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the - // global window, then we truncate it. The conditional is phrased like it is because the - // addition of EOW + allowed lateness might even overflow the maximum allowed Instant - if (GlobalWindow.INSTANCE - .maxTimestamp() - .minus(windowingStrategy.getAllowedLateness()) - .isBefore(window.maxTimestamp())) { - return GlobalWindow.INSTANCE.maxTimestamp(); - } else { - return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - } - } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 85423c0431b46..7ca305e51d0a5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -949,7 +949,7 @@ public Timer align(Duration period) { */ private Instant minTargetAndGcTime(Instant target) { if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, allowedLateness); if (target.isAfter(windowExpiry)) { return windowExpiry; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 28a9deef87950..c68a94319e9aa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -104,7 +104,7 @@ public void processElement(WindowedValue input) { } private boolean isLate(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); Instant inputWM = cleanupTimer.currentInputWatermarkTime(); return gcTime.isBefore(inputWM); } @@ -208,7 +208,7 @@ public Instant currentInputWatermarkTime() { @Override public void setForWindow(BoundedWindow window) { - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); // make sure this fires after any window.maxTimestamp() timers gcTime = gcTime.plus(GC_DELAY_MS); timerInternals.setTimer(StateNamespaces.window(windowCoder, window), @@ -222,7 +222,7 @@ public boolean isForWindow( Instant timestamp, TimeDomain timeDomain) { boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); gcTime = gcTime.plus(GC_DELAY_MS); return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 64f5d9b165cc0..13e4c43d66a67 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -365,8 +365,7 @@ private Instant addGarbageCollectionHold( ReduceFn.Context context, boolean paneIsEmpty) { Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eow = context.window().maxTimestamp(); - Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness()); + Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy); if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { WindowTracing.trace( @@ -387,6 +386,12 @@ private Instant addGarbageCollectionHold( return null; } + if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + // If the garbage collection hold is past the timestamp we can represent, instead truncate + // to the maximum timestamp that is not positive infinity. This ensures all windows will + // eventually be garbage collected. + gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L)); + } checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java new file mode 100644 index 0000000000000..f0f315d39328f --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.ReadableInstant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LateDataUtils}. + */ +@RunWith(JUnit4.class) +public class LateDataUtilsTest { + @Test + public void beforeEndOfGlobalWindowSame() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5)); + Duration allowedLateness = Duration.standardMinutes(2); + WindowingStrategy strategy = + WindowingStrategy.globalDefault() + .withWindowFn(windowFn) + .withAllowedLateness(allowedLateness); + + IntervalWindow window = windowFn.assignWindow(new Instant(10)); + assertThat( + LateDataUtils.garbageCollectionTime(window, strategy), + equalTo(window.maxTimestamp().plus(allowedLateness))); + } + + @Test + public void garbageCollectionTimeAfterEndOfGlobalWindow() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5)); + WindowingStrategy strategy = + WindowingStrategy.globalDefault() + .withWindowFn(windowFn); + + IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat( + window.maxTimestamp(), + Matchers.greaterThan(GlobalWindow.INSTANCE.maxTimestamp())); + assertThat( + LateDataUtils.garbageCollectionTime(window, strategy), + equalTo(GlobalWindow.INSTANCE.maxTimestamp())); + } + + @Test + public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5)); + Duration allowedLateness = Duration.millis(Long.MAX_VALUE); + WindowingStrategy strategy = + WindowingStrategy.globalDefault() + .withWindowFn(windowFn) + .withAllowedLateness(allowedLateness); + + IntervalWindow window = windowFn.assignWindow(new Instant(-100)); + assertThat( + window.maxTimestamp().plus(allowedLateness), + Matchers.greaterThan(GlobalWindow.INSTANCE.maxTimestamp())); + assertThat( + LateDataUtils.garbageCollectionTime(window, strategy), + equalTo(GlobalWindow.INSTANCE.maxTimestamp())); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java index 0b1ab25886727..990f235456f47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; /** * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original @@ -63,6 +64,11 @@ public void processElement(ProcessContext context) { private static class ExtractTimestampedValueDoFn extends DoFn>, KV> { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + @ProcessElement public void processElement(ProcessContext context) { KV> kv = context.element(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 5394826e8551f..3b7122c656f04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -71,22 +71,27 @@ public PCollection> expand(PCollection> input) { .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - return input.apply(rewindow) + return input + .apply(rewindow) .apply("ReifyOriginalTimestamps", ReifyTimestamps.inValues()) .apply(GroupByKey.>create()) // Set the windowing strategy directly, so that it doesn't get counted as the user having // set allowed lateness. .setWindowingStrategyInternal(originalStrategy) - .apply("ExpandIterable", ParDo.of( - new DoFn>>, KV>>() { - @ProcessElement - public void processElement(ProcessContext c) { - K key = c.element().getKey(); - for (TimestampedValue value : c.element().getValue()) { - c.output(KV.of(key, value)); - } - } - })) - .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues()); + .apply( + "ExpandIterable", + ParDo.of( + new DoFn>>, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + K key = c.element().getKey(); + for (TimestampedValue value : c.element().getValue()) { + c.output(KV.of(key, value)); + } + } + })) + .apply( + "RestoreOriginalTimestamps", + ReifyTimestamps.extractFromValues()); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java index 181433eaafc18..e8728423743dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java @@ -101,4 +101,40 @@ public void verifyTimestampsEqualValue(ProcessContext context) { pipeline.run(); } + + @Test + @Category(ValidatesRunner.class) + public void extractFromValuesWhenValueTimestampedLaterSucceeds() { + PCollection>> preified = + pipeline.apply( + Create.timestamped( + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(0, new Instant((0)))), new Instant(100)), + TimestampedValue.of( + KV.of("foo", TimestampedValue.of(1, new Instant(1))), new Instant(101L)), + TimestampedValue.of( + KV.of("bar", TimestampedValue.of(2, new Instant(2))), new Instant(102L)), + TimestampedValue.of( + KV.of("baz", TimestampedValue.of(3, new Instant(3))), new Instant(103L)))); + + PCollection> timestamped = + preified.apply(ReifyTimestamps.extractFromValues()); + + PAssert.that(timestamped) + .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)); + + timestamped.apply( + "AssertElementTimestamps", + ParDo.of( + new DoFn, Void>() { + @ProcessElement + public void verifyTimestampsEqualValue(ProcessContext context) { + assertThat( + new Instant(context.element().getValue().longValue()), + equalTo(context.timestamp())); + } + })); + + pipeline.run(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 1038fd63810bf..3cd7cf9cc69f1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -27,8 +27,11 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -263,4 +266,28 @@ public void testReshuffleAfterSlidingWindows() { pipeline.run(); } + + @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testReshuffleWithTimestampsStreaming() { + TestStream stream = + TestStream.create(VarLongCoder.of()) + .advanceWatermarkTo(new Instant(0L).plus(Duration.standardDays(48L))) + .addElements( + TimestampedValue.of(0L, new Instant(0L)), + TimestampedValue.of(1L, new Instant(0L).plus(Duration.standardDays(48L))), + TimestampedValue.of( + 2L, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L)))) + .advanceWatermarkToInfinity(); + PCollection> input = + pipeline + .apply(stream).apply(WithKeys.of("")) + .apply( + Window.>into(FixedWindows.of(Duration.standardMinutes(10L)))); + + PCollection> reshuffled = input.apply(Reshuffle.of()); + PAssert.that(reshuffled.apply(Values.create())).containsInAnyOrder(0L, 1L, 2L); + + pipeline.run(); + } }