Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8543] Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle #11924

Merged
merged 7 commits into from Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -101,6 +101,7 @@
* Upgrade Sphinx to 3.0.3 for building PyDoc.
* Added a PTransform for image annotation using Google Cloud AI image processing service
([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646))
* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).

## Breaking Changes

Expand Down
Expand Up @@ -24,10 +24,12 @@
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -522,6 +524,7 @@ public void start(

this.cachedFiredTimers = null;
this.cachedFiredUserTimers = null;
this.toBeFiredTimersOrdered.clear();
}

public void flushState() {
Expand Down Expand Up @@ -564,10 +567,13 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
// Lazily initialized
private Iterator<TimerData> cachedFiredUserTimers = null;

private PriorityQueue<TimerData> toBeFiredTimersOrdered =
new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));

public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
if (cachedFiredUserTimers == null) {
cachedFiredUserTimers =
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
.filter(
timer ->
WindmillTimerInternals.isUserTimer(timer)
Expand All @@ -577,12 +583,21 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
.iterator();

cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need cachedFiredUserTimers? It seems obsolete if we populate the priority queue. The name is also wrong - even before this PR it wasn't a cache. It is a lazily initialized iterator. Instead, we should have a lazily initialized priority queue (like you do) and just a flag to say whether the incoming timers have been loaded yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime();
if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
while (!toBeFiredTimersOrdered.isEmpty()) {
userTimerInternals.setTimer(toBeFiredTimersOrdered.poll());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles for comment. This doesn't look right to me, as I don't think we should be modifying the WindmillTimerInternals here. I think we just want to merge the timer modifications from processing the workitem into this priority queue; note that if timers are deleted, we need to detect that as well and remove from the priority queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don't actually understand what this block is for.

FWIW to do timer deletion/reset cheaply without building a bespoke data structure just keep a map from id to firing time or tombstone. This way, whenever a timer comes up in the prio queue you pull out the actual time for it from the map. If it is actually set for another time, don't fire it. If it is obsolete, don't fire it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reuvenlax done


if (!cachedFiredUserTimers.hasNext()) {
if (toBeFiredTimersOrdered.isEmpty()) {
return null;
}
TimerData nextTimer = cachedFiredUserTimers.next();
TimerData nextTimer = toBeFiredTimersOrdered.poll();
// User timers must be explicitly deleted when delivered, to release the implied hold
userTimerInternals.deleteTimer(nextTimer);
return nextTimer;
Expand Down
Expand Up @@ -227,6 +227,18 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
timers.clear();
}

public boolean hasTimerBefore(Instant time) {
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
if (cell.getValue()) {
if (timerData.getTimestamp().isBefore(time)) {
return true;
}
}
}
return false;
}

private boolean needsWatermarkHold(TimerData timerData) {
// If it is a user timer or a system timer with outputTimestamp different than timestamp
return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
Expand Down
Expand Up @@ -139,6 +139,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -3938,10 +3939,11 @@ public void testEventTimeTimerOrdering() throws Exception {
ValidatesRunner.class,
UsesTimersInParDo.class,
UsesStatefulParDo.class,
UsesUnboundedPCollections.class,
UsesStrictTimerOrdering.class
})
public void testEventTimeTimerOrderingWithCreate() throws Exception {
final int numTestElements = 100;
final int numTestElements = 5;
rehmanmuradali marked this conversation as resolved.
Show resolved Hide resolved
final Instant now = new Instant(1500000000000L);

List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
Expand Down Expand Up @@ -4009,6 +4011,7 @@ public void onTimer(

List<TimestampedValue<String>> flush = new ArrayList<>();
Instant flushTime = context.timestamp();
int bagSize = Iterators.size(bagState.read().iterator());
for (TimestampedValue<String> val : bagState.read()) {
if (!val.getTimestamp().isAfter(flushTime)) {
flush.add(val);
Expand All @@ -4018,7 +4021,7 @@ public void onTimer(
context.output(
Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator()));
Instant newMinStamp = flushTime.plus(1);
if (flush.size() < numTestElements) {
if (flush.size() < numTestElements && bagSize > 0) {
timer.set(newMinStamp);
}
}
Expand All @@ -4040,7 +4043,8 @@ public void onTimer(
}
};

PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn));
PCollection<String> output =
pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn));
rehmanmuradali marked this conversation as resolved.
Show resolved Hide resolved
List<String> expected =
IntStream.rangeClosed(0, numTestElements)
.mapToObj(expandFn(numTestElements))
Expand Down