Skip to content

Commit

Permalink
Fix Output Windows in OnTimerContext
Browse files Browse the repository at this point in the history
When a User timer is delivered, output elements produced by that timer
firing should be placed within the same window as the timer is in.

Deliver timers in the window of thier namespace in the DirectRunner.

Test that timer deliveries maintain the window the timer was emitted in.
  • Loading branch information
tgroh committed Mar 22, 2017
1 parent 7e97820 commit 75100f8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -766,22 +767,26 @@ public PipelineOptions getPipelineOptions() {

@Override
public void output(OutputT output) {
context.outputWithTimestamp(output, timestamp);
context.outputWindowedValue(
output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
context.outputWithTimestamp(output, timestamp);
context.outputWindowedValue(
output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
context.sideOutputWithTimestamp(tag, output, timestamp);
context.sideOutputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
context.sideOutputWithTimestamp(tag, output, timestamp);
context.sideOutputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.direct;

import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -30,6 +31,7 @@
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
Expand Down Expand Up @@ -228,15 +230,20 @@ public StatefulParDoEvaluator(
@Override
public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult)
throws Exception {

BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());

for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
delegateEvaluator.processElement(windowedValue);
}

for (TimerData timer : gbkResult.getValue().timersIterable()) {
delegateEvaluator.onTimer(timer, window);
checkState(
timer.getNamespace() instanceof WindowNamespace,
"Expected Timer %s to be in a %s, but got %s",
timer,
WindowNamespace.class.getSimpleName(),
timer.getNamespace().getClass().getName());
WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
BoundedWindow timerWindow = windowNamespace.getWindow();
delegateEvaluator.onTimer(timer, timerWindow);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -1952,6 +1953,50 @@ public void onTimer(OnTimerContext context) {
pipeline.run();
}

@Test
@Category({RunnableOnService.class, UsesTimersInParDo.class})
public void testTimerReceivedInOriginalWindow() throws Exception {
final String timerId = "foo";

DoFn<KV<String, Integer>, BoundedWindow> fn =
new DoFn<KV<String, Integer>, BoundedWindow>() {

@TimerId(timerId)
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
timer.setForNowPlus(Duration.standardSeconds(1));
}

@OnTimer(timerId)
public void onTimer(OnTimerContext context, BoundedWindow window) {
context.output(context.window());
}

public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class);
}
};

SlidingWindows windowing =
SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1));
PCollection<BoundedWindow> output =
pipeline
.apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L))))
.apply(Window.<KV<String, Integer>>into(windowing))
.apply(ParDo.of(fn));

PAssert.that(output)
.containsInAnyOrder(
new IntervalWindow(new Instant(0), Duration.standardMinutes(3)),
new IntervalWindow(
new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)),
new IntervalWindow(
new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3)));
pipeline.run();
}

/**
* Tests that an event time timer set absolutely for the last possible moment fires and results in
* supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.
Expand Down

0 comments on commit 75100f8

Please sign in to comment.