Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void processElementOrdered(BoundedWindow window, WindowedValue<InputT> v
if (value.getTimestamp().isBefore(minStamp)) {
minStamp = value.getTimestamp();
minStampState.write(minStamp);
setupFlushTimerAndWatermarkHold(namespace, minStamp);
setupFlushTimerAndWatermarkHold(namespace, window, minStamp);
}
} else {
reportDroppedElement(value, window);
Expand Down Expand Up @@ -206,6 +206,9 @@ public void onTimer(
if (timerId.equals(SORT_FLUSH_TIMER)) {
onSortFlushTimer(window, stepContext.timerInternals().currentInputWatermarkTime());
} else if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
if (requiresTimeSortedInput) {
onSortFlushTimer(window, BoundedWindow.TIMESTAMP_MAX_VALUE);
}
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
Expand Down Expand Up @@ -252,18 +255,36 @@ private void onSortFlushTimer(BoundedWindow window, Instant timestamp) {
keep.forEach(sortBuffer::add);
minStampState.write(newMinStamp);
if (newMinStamp.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
setupFlushTimerAndWatermarkHold(namespace, newMinStamp);
setupFlushTimerAndWatermarkHold(namespace, window, newMinStamp);
} else {
clearWatermarkHold(namespace);
}
}

private void setupFlushTimerAndWatermarkHold(StateNamespace namespace, Instant flush) {
/**
* Setup timer for flush time @{code flush}. The time is adjusted to respect allowed lateness and
* window garbage collection time. Setup watermark hold for the flush time.
*
* <p>Note that this is equivalent to {@link org.apache.beam.sdk.state.Timer#withOutputTimestamp}
* and should be reworked to use that feature once that is stable.
*/
private void setupFlushTimerAndWatermarkHold(
StateNamespace namespace, BoundedWindow window, Instant flush) {
Instant flushWithLateness = flush.plus(windowingStrategy.getAllowedLateness());
Instant windowGcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
if (flushWithLateness.isAfter(windowGcTime)) {
flushWithLateness = windowGcTime;
}
WatermarkHoldState watermark = stepContext.stateInternals().state(namespace, watermarkHold);
stepContext
.timerInternals()
.setTimer(
namespace, SORT_FLUSH_TIMER, SORT_FLUSH_TIMER, flush, flush, TimeDomain.EVENT_TIME);
namespace,
SORT_FLUSH_TIMER,
SORT_FLUSH_TIMER,
flushWithLateness,
flush,
TimeDomain.EVENT_TIME);
watermark.clear();
watermark.add(flush);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void testGarbageCollect(boolean ordered) throws Exception {

if (ordered) {
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1), runner);
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}

assertEquals(1, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
Expand All @@ -194,8 +194,9 @@ private void testGarbageCollect(boolean ordered) throws Exception {
KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING));

if (ordered) {
// move forward in time to so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1 + WINDOW_SIZE), runner);
// move forward in time so that the input might get flushed
advanceInputWatermark(
timerInternals, elementTime.plus(ALLOWED_LATENESS + 1 + WINDOW_SIZE), runner);
}

assertEquals(2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
Expand All @@ -204,13 +205,7 @@ private void testGarbageCollect(boolean ordered) throws Exception {
// the cleanup timer is set to window.maxTimestamp() + allowed lateness + 1
// to ensure that state is still available when a user timer for window.maxTimestamp() fires
advanceInputWatermark(
timerInternals,
WINDOW_1
.maxTimestamp()
.plus(ALLOWED_LATENESS)
.plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
.plus(1), // so the watermark is past the GC horizon, not on it
runner);
timerInternals, elementTime.plus(ALLOWED_LATENESS + 1 + WINDOW_SIZE), runner);

assertTrue(
stateInternals.isEmptyForTesting(
Expand Down Expand Up @@ -260,8 +255,8 @@ private void testOutput(
WindowedValue.of(KV.of("hello", 2), elementTime.minus(1), WINDOW_1, PaneInfo.NO_FIRING));

if (ordered) {
// move forward in time to so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1), runner);
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}

assertEquals(3, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
Expand Down Expand Up @@ -297,8 +292,8 @@ private void testOutput(
WindowedValue.of(KV.of("hello", 3), elementTime.minus(2), WINDOW_2, PaneInfo.NO_FIRING));

if (ordered) {
// move forward in time to so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(1), runner);
// move forward in time so that the input might get flushed
advanceInputWatermark(timerInternals, elementTime.plus(ALLOWED_LATENESS + 1), runner);
}

assertEquals(6, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
* Category tag for validation tests which utilize{@link DoFn.RequiresTimeSortedInput} in stateful
* {@link ParDo}.
*/
public @interface UsesRequiresTimeSortedInput {}
public interface UsesRequiresTimeSortedInput extends UsesTimersInParDo {}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand Down Expand Up @@ -2409,6 +2410,39 @@ public void testRequiresTimeSortedInputWithTestStream() {
testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
}

@Test
@Category({
ValidatesRunner.class,
UsesStatefulParDo.class,
UsesRequiresTimeSortedInput.class,
UsesStrictTimerOrdering.class,
UsesTestStream.class
})
public void testRequiresTimeSortedInputWithLateDataAndAllowedLateness() {
// generate list long enough to rule out random shuffle in sorted order
int numElements = 1000;
List<Long> eventStamps =
LongStream.range(0, numElements)
.mapToObj(i -> numElements - i)
.collect(Collectors.toList());
TestStream.Builder<Long> input = TestStream.create(VarLongCoder.of());
for (Long stamp : eventStamps) {
input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp)));
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
}
}
testTimeSortedInput(
numElements,
pipeline
.apply(input.advanceWatermarkToInfinity())
.apply(
Window.<Long>into(new GlobalWindows())
.withAllowedLateness(Duration.millis(5000))));
}

@Test
@Category({
ValidatesRunner.class,
Expand Down