Skip to content

Commit

Permalink
This closes #1669: Preliminaries for timers in the direct runner
Browse files Browse the repository at this point in the history
  Hold output watermark according to pending timers
  Allow setting timer by ID in DirectTimerInternals
  Add UsesTestStream for use with JUnit @category
  Add static Window.withOutputTimeFn to match build method
  Use informative Instant formatter in WatermarkHold
  Add informative Instant formatter to BoundedWindow
  • Loading branch information
kennknowles committed Dec 21, 2016
2 parents ff39516 + dfe2e62 commit 57d9bbd
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ private Instant shift(Instant timestamp, W window) {
Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
checkState(!shifted.isBefore(timestamp),
"OutputTimeFn moved element from %s to earlier time %s for window %s",
timestamp, shifted, window);
BoundedWindow.formatTimestamp(timestamp),
BoundedWindow.formatTimestamp(shifted),
window);
checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
"OutputTimeFn moved element from %s to %s which is beyond end of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private DirectTimerInternals(
@Override
public void setTimer(StateNamespace namespace, String timerId, Instant target,
TimeDomain timeDomain) {
throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain));
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.SortedMultiset;
import com.google.common.collect.Table;
import com.google.common.collect.TreeMultiset;
import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,6 +58,7 @@
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;
Expand Down Expand Up @@ -208,6 +211,16 @@ public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTim
private static class AppliedPTransformInputWatermark implements Watermark {
private final Collection<? extends Watermark> inputWatermarks;
private final SortedMultiset<CommittedBundle<?>> pendingElements;

// This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key
// minimum
private final SortedMultiset<Instant> pendingTimers;

// Entries in this table represent the authoritative timestamp for which
// a per-key-and-StateNamespace timer is set.
private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;

// This per-key sorted set allows quick retrieval of timers that should fire for a key
private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;

private AtomicReference<Instant> currentWatermark;
Expand All @@ -217,11 +230,15 @@ public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWate
// The ordering must order elements by timestamp, and must not compare two distinct elements
// as equal. This is built on the assumption that any element added as a pending element will
// be consumed without modifications.
//
// The same logic is applied for pending timers
Ordering<CommittedBundle<?>> pendingBundleComparator =
new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
this.pendingElements =
TreeMultiset.create(pendingBundleComparator);
this.pendingTimers = TreeMultiset.create();
this.objectTimers = new HashMap<>();
this.existingTimers = new HashMap<>();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
}

Expand Down Expand Up @@ -270,23 +287,64 @@ private synchronized void removePending(CommittedBundle<?> completed) {
pendingElements.remove(completed);
}

private synchronized Instant getEarliestTimerTimestamp() {
if (pendingTimers.isEmpty()) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} else {
return pendingTimers.firstEntry().getElement();
}
}

private synchronized void updateTimers(TimerUpdate update) {
NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
if (keyTimers == null) {
keyTimers = new TreeSet<>();
objectTimers.put(update.key, keyTimers);
}
for (TimerData timer : update.setTimers) {
Table<StateNamespace, String, TimerData> existingTimersForKey =
existingTimers.get(update.key);
if (existingTimersForKey == null) {
existingTimersForKey = HashBasedTable.create();
existingTimers.put(update.key, existingTimersForKey);
}

for (TimerData timer : update.getSetTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
@Nullable
TimerData existingTimer =
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());

if (existingTimer == null) {
pendingTimers.add(timer.getTimestamp());
keyTimers.add(timer);
} else if (!existingTimer.equals(timer)) {
keyTimers.remove(existingTimer);
keyTimers.add(timer);
} // else the timer is already set identically, so noop

existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer);
}
}

for (TimerData timer : update.getDeletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
keyTimers.add(timer);
@Nullable
TimerData existingTimer =
existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());

if (existingTimer != null) {
pendingTimers.remove(existingTimer.getTimestamp());
keyTimers.remove(existingTimer);
existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
}
}
}
for (TimerData timer : update.deletedTimers) {

for (TimerData timer : update.getCompletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
keyTimers.remove(timer);
pendingTimers.remove(timer.getTimestamp());
}
}
// We don't keep references to timers that have been fired and delivered via #getFiredTimers()
}

private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
Expand All @@ -311,11 +369,12 @@ public synchronized String toString() {
* {@link #refresh()} for more information.
*/
private static class AppliedPTransformOutputWatermark implements Watermark {
private final Watermark inputWatermark;
private final AppliedPTransformInputWatermark inputWatermark;
private final PerKeyHolds holds;
private AtomicReference<Instant> currentWatermark;

public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
public AppliedPTransformOutputWatermark(
AppliedPTransformInputWatermark inputWatermark) {
this.inputWatermark = inputWatermark;
holds = new PerKeyHolds();
currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
Expand Down Expand Up @@ -352,7 +411,10 @@ public Instant get() {
@Override
public synchronized WatermarkUpdate refresh() {
Instant oldWatermark = currentWatermark.get();
Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
Instant newWatermark = INSTANT_ORDERING.min(
inputWatermark.get(),
inputWatermark.getEarliestTimerTimestamp(),
holds.getMinHold());
newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
currentWatermark.set(newWatermark);
return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;

/**
* Category tag for tests that use {@link TestStream}, which is not a part of the Beam model
* but a special feature currently only implemented by the direct runner.
*/
public interface UsesTestStream {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,42 @@
public abstract class BoundedWindow {
// The min and max timestamps that won't overflow when they are converted to
// usec.

/**
* The minimum value for any Beam timestamp. Often referred to as "-infinity".
*
* <p>This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their
* microseconds-since-epoch can be safely represented with a {@code long}.
*/
public static final Instant TIMESTAMP_MIN_VALUE =
new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));

/**
* The maximum value for any Beam timestamp. Often referred to as "+infinity".
*
* <p>This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their
* microseconds-since-epoch can be safely represented with a {@code long}.
*/
public static final Instant TIMESTAMP_MAX_VALUE =
new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));

/**
* Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating
* whether the timestamp is the end of the global window or one of the distinguished values {@link
* #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
*/
public static String formatTimestamp(Instant timestamp) {
if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";
} else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) {
return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)";
} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) {
return timestamp.toString() + " (end of global window)";
} else {
return timestamp.toString();
}
}

/**
* Returns the inclusive upper bound of timestamps for values in this window.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
return new Bound(null).withAllowedLateness(allowedLateness);
}

/**
* <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control
* the output timestamp of values output from a {@link GroupByKey} operation.
*/
@Experimental(Kind.OUTPUT_TIME)
public static <T> Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
return new Bound(null).withOutputTimeFn(outputTimeFn);
}

/**
* A {@code PTransform} that windows the elements of a {@code PCollection<T>},
* into finite windows according to a user-specified {@code WindowFn}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class TestStreamTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, UsesTestStream.class})
public void testLateDataAccumulating() {
Instant instant = new Instant(0);
TestStream<Integer> source = TestStream.create(VarIntCoder.of())
Expand Down Expand Up @@ -136,7 +136,7 @@ public Void apply(Iterable<Integer> input) {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, UsesTestStream.class})
public void testProcessingTimeTrigger() {
TestStream<Long> source = TestStream.create(VarLongCoder.of())
.addElements(TimestampedValue.of(1L, new Instant(1000L)),
Expand All @@ -159,7 +159,7 @@ public void testProcessingTimeTrigger() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, UsesTestStream.class})
public void testDiscardingMode() {
TestStream<String> stream =
TestStream.create(StringUtf8Coder.of())
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testDiscardingMode() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, UsesTestStream.class})
public void testFirstElementLate() {
Instant lateElementTimestamp = new Instant(-1_000_000);
TestStream<String> stream =
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testFirstElementLate() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, UsesTestStream.class})
public void testElementsAtAlmostPositiveInfinity() {
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
Expand All @@ -261,7 +261,7 @@ public void testElementsAtAlmostPositiveInfinity() {
}

@Test
@Category(NeedsRunner.class)
@Category({NeedsRunner.class, UsesTestStream.class})
public void testMultipleStreams() {
TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
.addElements("foo", "bar")
Expand Down

0 comments on commit 57d9bbd

Please sign in to comment.