Skip to content

Commit

Permalink
This closes #1602
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Dec 13, 2016
2 parents 9f3b063 + 8d89bfc commit b278088
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,23 @@ public void startBundle() {
*/
public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
if (views.isEmpty()) {
// When there are no side inputs, we can preserve the compressed representation.
processElement(elem);
return Collections.emptyList();
}
ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder();
ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder();
ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
if (isReady(mainInputWindow)) {
readyWindowsBuilder.add(mainInputWindow);
// When there are any side inputs, we have to process the element in each window
// individually, to disambiguate access to per-window side inputs.
processElement(windowElem);
} else {
notReadyWindows.add(mainInputWindow);
pushedBackWindowsBuilder.add(mainInputWindow);
pushedBack.add(windowElem);
}
}
ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build();
ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build();
if (!readyWindows.isEmpty()) {
processElement(
WindowedValue.of(
elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane()));
}
return pushedBackWindows.isEmpty()
? ImmutableList.<WindowedValue<InputT>>of()
: ImmutableList.of(
WindowedValue.of(
elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane()));
return pushedBack.build();
}

private boolean isReady(BoundedWindow mainInputWindow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.core;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void processElementSideInputNotReadyMultipleWindows() {
PaneInfo.ON_TIME_AND_ONLY_FIRING);
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, contains(multiWindow));
assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
}

Expand Down Expand Up @@ -165,10 +165,8 @@ public void processElementSideInputNotReadySomeWindows() {
underlying.inputElems,
containsInAnyOrder(
WindowedValue.of(
2,
new Instant(-2),
ImmutableList.of(littleWindow, bigWindow),
PaneInfo.NO_FIRING)));
2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
}

@Test
Expand All @@ -191,8 +189,9 @@ public void processElementSideInputReadyAllWindows() {
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, emptyIterable());
assertThat(underlying.inputElems,
containsInAnyOrder(ImmutableList.of(multiWindow).toArray()));
assertThat(
underlying.inputElems,
containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
}

@Test
Expand All @@ -212,6 +211,7 @@ public void processElementNoSideInputs() {
Iterable<WindowedValue<Integer>> multiWindowPushback =
runner.processElementInReadyWindows(multiWindow);
assertThat(multiWindowPushback, emptyIterable());
// Should preserve the compressed representation when there's no side inputs.
assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
Expand All @@ -88,6 +89,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -724,6 +726,49 @@ public void testParDoReadingFromUnknownSideInput() {
pipeline.run();
}

private static class FnWithSideInputs extends DoFn<String, String> {
private final PCollectionView<Integer> view;

private FnWithSideInputs(PCollectionView<Integer> view) {
this.view = view;
}

@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + ":" + c.sideInput(view));
}
}

@Test
@Category(RunnableOnService.class)
public void testSideInputsWithMultipleWindows() {
// Tests that the runner can safely run a DoFn that uses side inputs
// on an input where the element is in multiple windows. The complication is
// that side inputs are per-window, so the runner has to make sure
// to process each window individually.
Pipeline p = TestPipeline.create();

MutableDateTime mutableNow = Instant.now().toMutableDateTime();
mutableNow.setMillisOfSecond(0);
Instant now = mutableNow.toInstant();

SlidingWindows windowFn =
SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
PCollection<String> res =
p.apply(Create.timestamped(TimestampedValue.of("a", now)))
.apply(Window.<String>into(windowFn))
.apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));

for (int i = 0; i < 4; ++i) {
Instant base = now.minus(Duration.standardSeconds(i));
IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
PAssert.that(res).inWindow(window).containsInAnyOrder("a:1");
}

p.run();
}

@Test
@Category(NeedsRunner.class)
public void testParDoWithErrorInStartBatch() {
Expand Down

0 comments on commit b278088

Please sign in to comment.