Skip to content

Commit

Permalink
This closes #2285
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Mar 22, 2017
2 parents 7e97820 + 75100f8 commit e1dc7a8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -766,22 +767,26 @@ public PipelineOptions getPipelineOptions() {

@Override
public void output(OutputT output) {
context.outputWithTimestamp(output, timestamp);
context.outputWindowedValue(
output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
context.outputWithTimestamp(output, timestamp);
context.outputWindowedValue(
output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void sideOutput(TupleTag<T> tag, T output) {
context.sideOutputWithTimestamp(tag, output, timestamp);
context.sideOutputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
context.sideOutputWithTimestamp(tag, output, timestamp);
context.sideOutputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.direct;

import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -30,6 +31,7 @@
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
Expand Down Expand Up @@ -228,15 +230,20 @@ public StatefulParDoEvaluator(
@Override
public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult)
throws Exception {

BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());

for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
delegateEvaluator.processElement(windowedValue);
}

for (TimerData timer : gbkResult.getValue().timersIterable()) {
delegateEvaluator.onTimer(timer, window);
checkState(
timer.getNamespace() instanceof WindowNamespace,
"Expected Timer %s to be in a %s, but got %s",
timer,
WindowNamespace.class.getSimpleName(),
timer.getNamespace().getClass().getName());
WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
BoundedWindow timerWindow = windowNamespace.getWindow();
delegateEvaluator.onTimer(timer, timerWindow);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -1952,6 +1953,50 @@ public void onTimer(OnTimerContext context) {
pipeline.run();
}

@Test
@Category({RunnableOnService.class, UsesTimersInParDo.class})
public void testTimerReceivedInOriginalWindow() throws Exception {
final String timerId = "foo";

DoFn<KV<String, Integer>, BoundedWindow> fn =
new DoFn<KV<String, Integer>, BoundedWindow>() {

@TimerId(timerId)
private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
timer.setForNowPlus(Duration.standardSeconds(1));
}

@OnTimer(timerId)
public void onTimer(OnTimerContext context, BoundedWindow window) {
context.output(context.window());
}

public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class);
}
};

SlidingWindows windowing =
SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1));
PCollection<BoundedWindow> output =
pipeline
.apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L))))
.apply(Window.<KV<String, Integer>>into(windowing))
.apply(ParDo.of(fn));

PAssert.that(output)
.containsInAnyOrder(
new IntervalWindow(new Instant(0), Duration.standardMinutes(3)),
new IntervalWindow(
new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)),
new IntervalWindow(
new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3)));
pipeline.run();
}

/**
* Tests that an event time timer set absolutely for the last possible moment fires and results in
* supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.
Expand Down

0 comments on commit e1dc7a8

Please sign in to comment.