From 280a6a8f729cb382616ad65f71860b61277cbd6f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 19 Dec 2016 20:40:11 -0800 Subject: [PATCH 1/6] Add informative Instant formatter to BoundedWindow --- .../beam/sdk/transforms/windowing/BoundedWindow.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index 3654074174c17..6da249581339f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -39,6 +39,18 @@ public abstract class BoundedWindow { public static final Instant TIMESTAMP_MAX_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + public static String formatTimestamp(Instant timestamp) { + if (timestamp.equals(TIMESTAMP_MIN_VALUE)) { + return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)"; + } else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) { + return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)"; + } else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) { + return timestamp.toString() + " (end of global window)"; + } else { + return timestamp.toString(); + } + } + /** * Returns the inclusive upper bound of timestamps for values in this window. */ From fa4958a6140eb00ceee08b2468f7d88f17538794 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 19 Dec 2016 20:40:47 -0800 Subject: [PATCH 2/6] Use informative Instant formatter in WatermarkHold --- .../beam/runners/core/WatermarkHold.java | 4 +++- .../transforms/windowing/BoundedWindow.java | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 7f1afcc0a79cc..5e5f44d9138eb 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -207,7 +207,9 @@ private Instant shift(Instant timestamp, W window) { Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); checkState(!shifted.isBefore(timestamp), "OutputTimeFn moved element from %s to earlier time %s for window %s", - timestamp, shifted, window); + BoundedWindow.formatTimestamp(timestamp), + BoundedWindow.formatTimestamp(shifted), + window); checkState(timestamp.isAfter(window.maxTimestamp()) || !shifted.isAfter(window.maxTimestamp()), "OutputTimeFn moved element from %s to %s which is beyond end of " diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index 6da249581339f..74223b586715f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -34,11 +34,30 @@ public abstract class BoundedWindow { // The min and max timestamps that won't overflow when they are converted to // usec. + + /** + * The minimum value for any Beam timestamp. Often referred to as "-infinity". + * + *

This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their + * microseconds-since-epoch can be safely represented with a {@code long}. + */ public static final Instant TIMESTAMP_MIN_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + + /** + * The maximum value for any Beam timestamp. Often referred to as "+infinity". + * + *

This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their + * microseconds-since-epoch can be safely represented with a {@code long}. + */ public static final Instant TIMESTAMP_MAX_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + /** + * Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating + * whether the timestamp is the end of the global window or one of the distinguished values {@link + * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}. + */ public static String formatTimestamp(Instant timestamp) { if (timestamp.equals(TIMESTAMP_MIN_VALUE)) { return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)"; From 8188040d930b1fa49efd4ed7d5f821d05d6f28ef Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 20 Dec 2016 13:57:55 -0800 Subject: [PATCH 3/6] Add static Window.withOutputTimeFn to match build method --- .../org/apache/beam/sdk/transforms/windowing/Window.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 0c430d0ddc50d..1241abe57c909 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -222,6 +222,15 @@ public static Bound withAllowedLateness(Duration allowedLateness) { return new Bound(null).withAllowedLateness(allowedLateness); } + /** + * (Experimental) Override the default {@link OutputTimeFn}, to control + * the output timestamp of values output from a {@link GroupByKey} operation. + */ + @Experimental(Kind.OUTPUT_TIME) + public static Bound withOutputTimeFn(OutputTimeFn outputTimeFn) { + return new Bound(null).withOutputTimeFn(outputTimeFn); + } + /** * A {@code PTransform} that windows the elements of a {@code PCollection}, * into finite windows according to a user-specified {@code WindowFn}. From 4d71924ccda9dae97c7cc9535a9780df9457cc3f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 20 Dec 2016 14:20:07 -0800 Subject: [PATCH 4/6] Add UsesTestStream for use with JUnit @Category --- .../beam/sdk/testing/UsesTestStream.java | 24 +++++++++++++++++++ .../beam/sdk/testing/TestStreamTest.java | 12 +++++----- 2 files changed, 30 insertions(+), 6 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java new file mode 100644 index 0000000000000..8debb465a6036 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for tests that use {@link TestStream}, which is not a part of the Beam model + * but a special feature currently only implemented by the direct runner. + */ +public interface UsesTestStream {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 64aeca3cfe213..c12e9f3324992 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -69,7 +69,7 @@ public class TestStreamTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testLateDataAccumulating() { Instant instant = new Instant(0); TestStream source = TestStream.create(VarIntCoder.of()) @@ -136,7 +136,7 @@ public Void apply(Iterable input) { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testProcessingTimeTrigger() { TestStream source = TestStream.create(VarLongCoder.of()) .addElements(TimestampedValue.of(1L, new Instant(1000L)), @@ -159,7 +159,7 @@ public void testProcessingTimeTrigger() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testDiscardingMode() { TestStream stream = TestStream.create(StringUtf8Coder.of()) @@ -208,7 +208,7 @@ public void testDiscardingMode() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testFirstElementLate() { Instant lateElementTimestamp = new Instant(-1_000_000); TestStream stream = @@ -238,7 +238,7 @@ public void testFirstElementLate() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testElementsAtAlmostPositiveInfinity() { Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); TestStream stream = TestStream.create(StringUtf8Coder.of()) @@ -261,7 +261,7 @@ public void testElementsAtAlmostPositiveInfinity() { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testMultipleStreams() { TestStream stream = TestStream.create(StringUtf8Coder.of()) .addElements("foo", "bar") From 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 7 Dec 2016 20:18:44 -0800 Subject: [PATCH 5/6] Allow setting timer by ID in DirectTimerInternals --- .../runners/direct/DirectTimerInternals.java | 2 +- .../beam/runners/direct/WatermarkManager.java | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 5ca276de56c0f..80e0721582742 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -49,7 +49,7 @@ private DirectTimerInternals( @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting timer by ID not yet supported."); + timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } @Deprecated diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 7bed75107f4ac..f7bafd16ca55f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -23,11 +23,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; +import com.google.common.collect.Table; import com.google.common.collect.TreeMultiset; import java.io.Serializable; import java.util.ArrayList; @@ -56,6 +58,7 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Instant; @@ -210,6 +213,10 @@ private static class AppliedPTransformInputWatermark implements Watermark { private final SortedMultiset> pendingElements; private final Map, NavigableSet> objectTimers; + // Entries in this table represent the authoritative timestamp for which + // a per-key-and-StateNamespace timer is set. + private final Map, Table> existingTimers; + private AtomicReference currentWatermark; public AppliedPTransformInputWatermark(Collection inputWatermarks) { @@ -222,6 +229,7 @@ public AppliedPTransformInputWatermark(Collection inputWate this.pendingElements = TreeMultiset.create(pendingBundleComparator); this.objectTimers = new HashMap<>(); + this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); } @@ -276,14 +284,31 @@ private synchronized void updateTimers(TimerUpdate update) { keyTimers = new TreeSet<>(); objectTimers.put(update.key, keyTimers); } + Table existingTimersForKey = + existingTimers.get(update.key); + if (existingTimersForKey == null) { + existingTimersForKey = HashBasedTable.create(); + existingTimers.put(update.key, existingTimersForKey); + } + for (TimerData timer : update.setTimers) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + @Nullable + TimerData existingTimer = + existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); + + if (existingTimer != null) { + keyTimers.remove(existingTimer); + } keyTimers.add(timer); + existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); } } + for (TimerData timer : update.deletedTimers) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { keyTimers.remove(timer); + existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId()); } } // We don't keep references to timers that have been fired and delivered via #getFiredTimers() From dfe2e62d103595583e3ca4594cc03885fe1bba16 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 20 Dec 2016 13:37:40 -0800 Subject: [PATCH 6/6] Hold output watermark according to pending timers --- .../beam/runners/direct/WatermarkManager.java | 59 +++++++++++++++---- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index f7bafd16ca55f..248fafdb64938 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -211,12 +211,18 @@ public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTim private static class AppliedPTransformInputWatermark implements Watermark { private final Collection inputWatermarks; private final SortedMultiset> pendingElements; - private final Map, NavigableSet> objectTimers; + + // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key + // minimum + private final SortedMultiset pendingTimers; // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. private final Map, Table> existingTimers; + // This per-key sorted set allows quick retrieval of timers that should fire for a key + private final Map, NavigableSet> objectTimers; + private AtomicReference currentWatermark; public AppliedPTransformInputWatermark(Collection inputWatermarks) { @@ -224,10 +230,13 @@ public AppliedPTransformInputWatermark(Collection inputWate // The ordering must order elements by timestamp, and must not compare two distinct elements // as equal. This is built on the assumption that any element added as a pending element will // be consumed without modifications. + // + // The same logic is applied for pending timers Ordering> pendingBundleComparator = new BundleByElementTimestampComparator().compound(Ordering.arbitrary()); this.pendingElements = TreeMultiset.create(pendingBundleComparator); + this.pendingTimers = TreeMultiset.create(); this.objectTimers = new HashMap<>(); this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); @@ -278,6 +287,14 @@ private synchronized void removePending(CommittedBundle completed) { pendingElements.remove(completed); } + private synchronized Instant getEarliestTimerTimestamp() { + if (pendingTimers.isEmpty()) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } else { + return pendingTimers.firstEntry().getElement(); + } + } + private synchronized void updateTimers(TimerUpdate update) { NavigableSet keyTimers = objectTimers.get(update.key); if (keyTimers == null) { @@ -291,27 +308,43 @@ private synchronized void updateTimers(TimerUpdate update) { existingTimers.put(update.key, existingTimersForKey); } - for (TimerData timer : update.setTimers) { + for (TimerData timer : update.getSetTimers()) { + if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + @Nullable + TimerData existingTimer = + existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); + + if (existingTimer == null) { + pendingTimers.add(timer.getTimestamp()); + keyTimers.add(timer); + } else if (!existingTimer.equals(timer)) { + keyTimers.remove(existingTimer); + keyTimers.add(timer); + } // else the timer is already set identically, so noop + + existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); + } + } + + for (TimerData timer : update.getDeletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { @Nullable TimerData existingTimer = existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); if (existingTimer != null) { + pendingTimers.remove(existingTimer.getTimestamp()); keyTimers.remove(existingTimer); + existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId()); } - keyTimers.add(timer); - existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); } } - for (TimerData timer : update.deletedTimers) { + for (TimerData timer : update.getCompletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - keyTimers.remove(timer); - existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId()); + pendingTimers.remove(timer.getTimestamp()); } } - // We don't keep references to timers that have been fired and delivered via #getFiredTimers() } private synchronized Map, List> extractFiredEventTimeTimers() { @@ -336,11 +369,12 @@ public synchronized String toString() { * {@link #refresh()} for more information. */ private static class AppliedPTransformOutputWatermark implements Watermark { - private final Watermark inputWatermark; + private final AppliedPTransformInputWatermark inputWatermark; private final PerKeyHolds holds; private AtomicReference currentWatermark; - public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) { + public AppliedPTransformOutputWatermark( + AppliedPTransformInputWatermark inputWatermark) { this.inputWatermark = inputWatermark; holds = new PerKeyHolds(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); @@ -377,7 +411,10 @@ public Instant get() { @Override public synchronized WatermarkUpdate refresh() { Instant oldWatermark = currentWatermark.get(); - Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold()); + Instant newWatermark = INSTANT_ORDERING.min( + inputWatermark.get(), + inputWatermark.getEarliestTimerTimestamp(), + holds.getMinHold()); newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark); currentWatermark.set(newWatermark); return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);