From c91697eefde71d04d2b7382a9938b07d507b26a6 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 20 Apr 2018 12:01:06 -0700 Subject: [PATCH] Subdivide CombineTests into subclasses. Gradle parallelizes test execution at the TestClass level. For ValidatesRunner tests, each test case can have significant overhead. For example, under Dataflow each TestPipeline launcheds a Dataflow job which takes at least 3 minutes due to spinning up VMs. We can achieve better overall execution time by splitting tests into scenario classes for parallelization. --- .../beam/sdk/transforms/CombineTest.java | 1416 +++++++++-------- 1 file changed, 717 insertions(+), 699 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 7c8c4bac885db..839472b61942c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -90,13 +90,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** * Tests for {@link Combine} transforms. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class CombineTest implements Serializable { // This test is Serializable, just so that it's easy to have // anonymous inner classes inside the non-static test methods. @@ -155,427 +156,781 @@ private void runTestSimpleCombineWithContext(List> table, pipeline.run(); } - @Test - @Category(ValidatesRunner.class) - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testSimpleCombine() { - runTestSimpleCombine(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113"))); - } + /** Tests validating basic Combine transform scenarios. */ + @RunWith(JUnit4.class) + public class BasicTests { + @Test + @Category(ValidatesRunner.class) + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSimpleCombine() { + runTestSimpleCombine(Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + ), 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113"))); + } - @Test - @Category(ValidatesRunner.class) - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testSimpleCombineWithContext() { - runTestSimpleCombineWithContext(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), 20, - Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")), - new String[] {"20:111134"}); - } + @Test + @Category(ValidatesRunner.class) + public void testSimpleCombineEmpty() { + runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList()); + } - @Test - @Category(NeedsRunner.class) - public void testSimpleCombineWithContextEmpty() { - runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {}); - } + @Test + @Category(ValidatesRunner.class) + public void testBasicCombine() { + runTestBasicCombine(Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + ), ImmutableSet.of(1, 13, 4), Arrays.asList( + KV.of("a", (Set) ImmutableSet.of(1, 4)), + KV.of("b", (Set) ImmutableSet.of(1, 13)))); + } - @Test - @Category(NeedsRunner.class) - public void testSimpleCombineEmpty() { - runTestSimpleCombine(EMPTY_TABLE, 0, Collections.emptyList()); - } + @Test + @Category(ValidatesRunner.class) + public void testBasicCombineEmpty() { + runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList()); + } - @SuppressWarnings("unchecked") - private void runTestBasicCombine(List> table, - Set globalUnique, - List>> perKeyUnique) { - PCollection> input = createInput(pipeline, table); + // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine) have good names. + @Test + public void testCombinerNames() { + Combine.PerKey min = Min.integersPerKey(); + Combine.PerKey max = Max.integersPerKey(); + Combine.PerKey mean = Mean.perKey(); + Combine.PerKey sum = Sum.integersPerKey(); + + assertThat(min.getName(), equalTo("Combine.perKey(MinInteger)")); + assertThat(max.getName(), equalTo("Combine.perKey(MaxInteger)")); + assertThat(mean.getName(), equalTo("Combine.perKey(Mean)")); + assertThat(sum.getName(), equalTo("Combine.perKey(SumInteger)")); + } - PCollection> unique = - input.apply(Values.create()).apply(Combine.globally(new UniqueInts())); + @Test + @Category(ValidatesRunner.class) + public void testHotKeyCombining() { + PCollection> input = copy(createInput(pipeline, Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + )), 10); + + CombineFn mean = new MeanInts(); + PCollection> coldMean = input.apply("ColdMean", + Combine.perKey(mean).withHotKeyFanout(0)); + PCollection> warmMean = input.apply("WarmMean", + Combine.perKey(mean).withHotKeyFanout(hotKeyFanout)); + PCollection> hotMean = input.apply("HotMean", + Combine.perKey(mean).withHotKeyFanout(5)); + PCollection> splitMean = input.apply("SplitMean", + Combine.perKey(mean).withHotKeyFanout(splitHotKeyFanout)); + + List> expected = Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)); + PAssert.that(coldMean).containsInAnyOrder(expected); + PAssert.that(warmMean).containsInAnyOrder(expected); + PAssert.that(hotMean).containsInAnyOrder(expected); + PAssert.that(splitMean).containsInAnyOrder(expected); + + pipeline.run(); + } - PCollection>> uniquePerKey = - input.apply(Combine.perKey(new UniqueInts())); + @Test + @Category(ValidatesRunner.class) + public void testHotKeyCombiningWithAccumulationMode() { + PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5)); + + PCollection output = input + .apply(Window.into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .accumulatingFiredPanes() + .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS)) + .apply(Sum.integersGlobally().withoutDefaults().withFanout(2)) + .apply(ParDo.of(new GetLast())); + + PAssert.that(output) + .satisfies( + input1 -> { + assertThat(input1, hasItem(15)); + return null; + }); + + pipeline.run(); + } - PAssert.that(unique).containsInAnyOrder(globalUnique); - PAssert.that(uniquePerKey).containsInAnyOrder(perKeyUnique); + @Test + @Category(NeedsRunner.class) + public void testBinaryCombineFn() { + PCollection> input = copy(createInput(pipeline, Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + )), 2); + PCollection> intProduct = + input.apply("IntProduct", Combine.perKey(new TestProdInt())); + PCollection> objProduct = + input.apply("ObjProduct", Combine.perKey(new TestProdObj())); + + List> expected = Arrays.asList(KV.of("a", 16), KV.of("b", 169)); + PAssert.that(intProduct).containsInAnyOrder(expected); + PAssert.that(objProduct).containsInAnyOrder(expected); + + pipeline.run(); + } - pipeline.run(); - } + @Test + public void testBinaryCombineFnWithNulls() { + testCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45); + testCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30); + testCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18); + testCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12); + testCombineFn(new NullCombiner(), Arrays.asList(null, null, null), 8); + } - @Test - @Category(ValidatesRunner.class) - public void testBasicCombine() { - runTestBasicCombine(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), ImmutableSet.of(1, 13, 4), Arrays.asList( - KV.of("a", (Set) ImmutableSet.of(1, 4)), - KV.of("b", (Set) ImmutableSet.of(1, 13)))); - } + @Test + public void testCombineGetName() { + assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName()); + assertEquals( + "Combine.GloballyAsSingletonView", + Combine.globally(new SumInts()).asSingletonView().getName()); + assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName()); + assertEquals( + "Combine.perKeyWithFanout(Test)", + Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName()); + } - @Test - @Category(NeedsRunner.class) - public void testBasicCombineEmpty() { - runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList()); - } + @Test + public void testDisplayData() { + UniqueInts combineFn = new UniqueInts() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("fnMetadata", "foobar")); + } + }; + Combine.Globally combine = Combine.globally(combineFn) + .withFanout(1234); + DisplayData displayData = DisplayData.from(combine); + + assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass())); + assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true)); + assertThat(displayData, hasDisplayItem("fanout", 1234)); + assertThat(displayData, includesDisplayDataFor("combineFn", combineFn)); + } - private void runTestAccumulatingCombine(List> table, - Double globalMean, - List> perKeyMeans) { - PCollection> input = createInput(pipeline, table); + @Test + public void testDisplayDataForWrappedFn() { + UniqueInts combineFn = new UniqueInts() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }; + Combine.PerKey combine = Combine.perKey(combineFn); + DisplayData displayData = DisplayData.from(combine); - PCollection mean = input.apply(Values.create()).apply(Combine.globally(new MeanInts())); + assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass())); + assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass()))); + } - PCollection> meanPerKey = input.apply(Combine.perKey(new MeanInts())); + @Test + @Category(ValidatesRunner.class) + public void testCombinePerKeyPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PAssert.that(mean).containsInAnyOrder(globalMean); - PAssert.that(meanPerKey).containsInAnyOrder(perKeyMeans); + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform>, ? extends POutput> combine = + Combine.perKey(combineFn); - pipeline.run(); - } + Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - @Test - @Category(NeedsRunner.class) - public void testFixedWindowsCombine() { - PCollection> input = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(1L)), - TimestampedValue.of(KV.of("a", 4), new Instant(6L)), - TimestampedValue.of(KV.of("b", 1), new Instant(7L)), - TimestampedValue.of(KV.of("b", 13), new Instant(8L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.into(FixedWindows.of(Duration.millis(2)))); - - PCollection sum = - input.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults()); + assertThat("Combine.perKey should include the combineFn in its primitive transform", + displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + } - PCollection> sumPerKey = input.apply(Combine.perKey(new TestCombineFn())); + @Test + @Category(ValidatesRunner.class) + public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() { + int hotKeyFanout = 2; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PAssert.that(sum).containsInAnyOrder(2, 5, 13); - PAssert.that(sumPerKey) - .containsInAnyOrder( - Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13"))); - pipeline.run(); - } + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform>, PCollection>>> combine = + Combine.>perKey(combineFn).withHotKeyFanout(hotKeyFanout); - @Test - @Category(NeedsRunner.class) - public void testFixedWindowsCombineWithContext() { - PCollection> perKeyInput = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(1L)), - TimestampedValue.of(KV.of("a", 4), new Instant(6L)), - TimestampedValue.of(KV.of("b", 1), new Instant(7L)), - TimestampedValue.of(KV.of("b", 13), new Instant(8L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.into(FixedWindows.of(Duration.millis(2)))); + Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - PCollection globallyInput = perKeyInput.apply(Values.create()); + assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout))); + } - PCollection sum = globallyInput - .apply("Sum", Combine.globally(new SumInts()).withoutDefaults()); + /** + * Tests creation of a per-key {@link Combine} via a Java 8 lambda. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombinePerKeyLambda() { + + PCollection> output = pipeline + .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) + .apply(Combine.perKey(integers -> { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + })); + + PAssert.that(output).containsInAnyOrder( + KV.of("a", 4), + KV.of("b", 2), + KV.of("c", 4)); + pipeline.run(); + } - PCollectionView globallySumView = sum.apply(View.asSingleton()); + /** + * Tests creation of a per-key {@link Combine} via a Java 8 method reference. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombinePerKeyInstanceMethodReference() { + + PCollection> output = pipeline + .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) + .apply(Combine.perKey(new Summer()::sum)); + + PAssert.that(output).containsInAnyOrder( + KV.of("a", 4), + KV.of("b", 2), + KV.of("c", 4)); + pipeline.run(); + } - PCollection> combinePerKeyWithContext = - perKeyInput.apply( - Combine.perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(globallySumView)); + /** + * Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda. + * Lambdas can be problematic because the {@link Class} object is synthetic and cannot be + * deserialized. + */ + @Test + public void testLambdaSerialization() { + SerializableFunction, Object> combiner = xs -> Iterables.getFirst(xs, 0); - PCollection combineGloballyWithContext = globallyInput - .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) - .withoutDefaults() - .withSideInputs(globallySumView)); + boolean lambdaClassSerializationThrows; + try { + SerializableUtils.clone(combiner.getClass()); + lambdaClassSerializationThrows = false; + } catch (IllegalArgumentException e) { + // Expected + lambdaClassSerializationThrows = true; + } + Assume.assumeTrue("Expected lambda class serialization to fail. " + + "If it's fixed, we can remove special behavior in Combine.", + lambdaClassSerializationThrows); - PAssert.that(sum).containsInAnyOrder(2, 5, 13); - PAssert.that(combinePerKeyWithContext) - .containsInAnyOrder( - Arrays.asList( - KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13"))); - PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13"); - pipeline.run(); + + Combine.Globally combine = Combine.globally(combiner); + SerializableUtils.clone(combine); // should not throw. + } + + @Test + public void testLambdaDisplayData() { + Combine.Globally combine = Combine.globally(xs -> Iterables.getFirst(xs, 0)); + DisplayData displayData = DisplayData.from(combine); + MatcherAssert.assertThat(displayData.items(), not(empty())); + } } - @Test - @Category(NeedsRunner.class) - public void testSlidingWindowsCombine() { - PCollection input = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of("a", new Instant(1L)), - TimestampedValue.of("b", new Instant(2L)), - TimestampedValue.of("c", new Instant(3L)))) - .apply(Window.into(SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L)))); - PCollection> combined = - input.apply( - Combine.globally( - new CombineFn, List>() { - @Override - public List createAccumulator() { - return new ArrayList<>(); - } + /** Tests validating CombineWithContext behaviors. */ + @RunWith(JUnit4.class) + public class CombineWithContextTests { + @Test + @Category(ValidatesRunner.class) + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSimpleCombineWithContext() { + runTestSimpleCombineWithContext(Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + ), 20, + Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")), + new String[] {"20:111134"}); + } - @Override - public List addInput(List accumulator, String input) { - accumulator.add(input); - return accumulator; - } + @Test + @Category(ValidatesRunner.class) + public void testSimpleCombineWithContextEmpty() { + runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {}); + } + } - @Override - public List mergeAccumulators(Iterable> accumulators) { - // Mutate all of the accumulators. Instances should be used in only one - // place, and not - // reused after merging. - List cur = createAccumulator(); - for (List accumulator : accumulators) { - accumulator.addAll(cur); - cur = accumulator; - } - return cur; - } + /** Tests validating windowing behaviors. */ + @RunWith(JUnit4.class) + public class WindowingTests { + @Test + @Category(ValidatesRunner.class) + public void testFixedWindowsCombine() { + PCollection> input = + pipeline + .apply( + Create.timestamped( + TimestampedValue.of(KV.of("a", 1), new Instant(0L)), + TimestampedValue.of(KV.of("a", 1), new Instant(1L)), + TimestampedValue.of(KV.of("a", 4), new Instant(6L)), + TimestampedValue.of(KV.of("b", 1), new Instant(7L)), + TimestampedValue.of(KV.of("b", 13), new Instant(8L))) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.into(FixedWindows.of(Duration.millis(2)))); + + PCollection sum = + input.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults()); + + PCollection> sumPerKey = input.apply(Combine.perKey(new TestCombineFn())); + + PAssert.that(sum).containsInAnyOrder(2, 5, 13); + PAssert.that(sumPerKey) + .containsInAnyOrder( + Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13"))); + pipeline.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testFixedWindowsCombineWithContext() { + PCollection> perKeyInput = + pipeline + .apply( + Create.timestamped( + TimestampedValue.of(KV.of("a", 1), new Instant(0L)), + TimestampedValue.of(KV.of("a", 1), new Instant(1L)), + TimestampedValue.of(KV.of("a", 4), new Instant(6L)), + TimestampedValue.of(KV.of("b", 1), new Instant(7L)), + TimestampedValue.of(KV.of("b", 13), new Instant(8L))) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.into(FixedWindows.of(Duration.millis(2)))); + + PCollection globallyInput = perKeyInput.apply(Values.create()); + + PCollection sum = globallyInput + .apply("Sum", Combine.globally(new SumInts()).withoutDefaults()); + + PCollectionView globallySumView = sum.apply(View.asSingleton()); + + PCollection> combinePerKeyWithContext = + perKeyInput.apply( + Combine.perKey(new TestCombineFnWithContext(globallySumView)) + .withSideInputs(globallySumView)); + + PCollection combineGloballyWithContext = globallyInput + .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) + .withoutDefaults() + .withSideInputs(globallySumView)); + + PAssert.that(sum).containsInAnyOrder(2, 5, 13); + PAssert.that(combinePerKeyWithContext) + .containsInAnyOrder( + Arrays.asList( + KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13"))); + PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13"); + pipeline.run(); + } - @Override - public List extractOutput(List accumulator) { - List result = new ArrayList<>(accumulator); - Collections.sort(result); - return result; + @Test + @Category(ValidatesRunner.class) + public void testSlidingWindowsCombine() { + PCollection input = + pipeline + .apply( + Create.timestamped( + TimestampedValue.of("a", new Instant(1L)), + TimestampedValue.of("b", new Instant(2L)), + TimestampedValue.of("c", new Instant(3L)))) + .apply(Window.into(SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L)))); + PCollection> combined = + input.apply( + Combine.globally( + new CombineFn, List>() { + @Override + public List createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List addInput(List accumulator, String input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List mergeAccumulators(Iterable> accumulators) { + // Mutate all of the accumulators. Instances should be used in only one + // place, and not + // reused after merging. + List cur = createAccumulator(); + for (List accumulator : accumulators) { + accumulator.addAll(cur); + cur = accumulator; } - }) - .withoutDefaults()); + return cur; + } + + @Override + public List extractOutput(List accumulator) { + List result = new ArrayList<>(accumulator); + Collections.sort(result); + return result; + } + }) + .withoutDefaults()); + + PAssert.that(combined) + .containsInAnyOrder( + ImmutableList.of("a"), + ImmutableList.of("a", "b"), + ImmutableList.of("a", "b", "c"), + ImmutableList.of("b", "c"), + ImmutableList.of("c")); + + pipeline.run(); + } - PAssert.that(combined) - .containsInAnyOrder( - ImmutableList.of("a"), - ImmutableList.of("a", "b"), - ImmutableList.of("a", "b", "c"), - ImmutableList.of("b", "c"), - ImmutableList.of("c")); + @Test + @Category(ValidatesRunner.class) + public void testSlidingWindowsCombineWithContext() { + // [a: 1, 1], [a: 4; b: 1], [b: 13] + PCollection> perKeyInput = + pipeline + .apply( + Create.timestamped( + TimestampedValue.of(KV.of("a", 1), new Instant(2L)), + TimestampedValue.of(KV.of("a", 1), new Instant(3L)), + TimestampedValue.of(KV.of("a", 4), new Instant(8L)), + TimestampedValue.of(KV.of("b", 1), new Instant(9L)), + TimestampedValue.of(KV.of("b", 13), new Instant(10L))) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.into(SlidingWindows.of(Duration.millis(2)))); + + PCollection globallyInput = perKeyInput.apply(Values.create()); + + PCollection sum = globallyInput.apply("Sum", Sum.integersGlobally().withoutDefaults()); + + PCollectionView globallySumView = sum.apply(View.asSingleton()); + + PCollection> combinePerKeyWithContext = + perKeyInput.apply( + Combine.perKey(new TestCombineFnWithContext(globallySumView)) + .withSideInputs(globallySumView)); + + PCollection combineGloballyWithContext = globallyInput + .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) + .withoutDefaults() + .withSideInputs(globallySumView)); + + PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13); + PAssert.that(combinePerKeyWithContext) + .containsInAnyOrder( + Arrays.asList( + KV.of("a", "1:1"), + KV.of("a", "2:11"), + KV.of("a", "1:1"), + KV.of("a", "4:4"), + KV.of("a", "5:4"), + KV.of("b", "5:1"), + KV.of("b", "14:113"), + KV.of("b", "13:13"))); + PAssert.that(combineGloballyWithContext).containsInAnyOrder( + "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13"); + pipeline.run(); + } - pipeline.run(); - } + @Test + @Category(ValidatesRunner.class) + public void testGlobalCombineWithDefaultsAndTriggers() { + PCollection input = pipeline.apply(Create.of(1, 1)); + + PCollection output = input + .apply(Window.into(new GlobalWindows()) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .accumulatingFiredPanes() + .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS)) + .apply(Sum.integersGlobally()) + .apply(ParDo.of(new FormatPaneInfo())); + + // The actual elements produced are nondeterministic. Could be one, could be two. + // But it should certainly have a final element with the correct final sum. + PAssert.that(output) + .satisfies( + input1 -> { + assertThat(input1, hasItem("2: true")); + return null; + }); + + pipeline.run(); + } - @Test - @Category(NeedsRunner.class) - public void testSlidingWindowsCombineWithContext() { - // [a: 1, 1], [a: 4; b: 1], [b: 13] - PCollection> perKeyInput = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(2L)), - TimestampedValue.of(KV.of("a", 1), new Instant(3L)), - TimestampedValue.of(KV.of("a", 4), new Instant(8L)), - TimestampedValue.of(KV.of("b", 1), new Instant(9L)), - TimestampedValue.of(KV.of("b", 13), new Instant(10L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.into(SlidingWindows.of(Duration.millis(2)))); + @Test + @Category(ValidatesRunner.class) + public void testSessionsCombine() { + PCollection> input = + pipeline + .apply( + Create.timestamped( + TimestampedValue.of(KV.of("a", 1), new Instant(0L)), + TimestampedValue.of(KV.of("a", 1), new Instant(4L)), + TimestampedValue.of(KV.of("a", 4), new Instant(7L)), + TimestampedValue.of(KV.of("b", 1), new Instant(10L)), + TimestampedValue.of(KV.of("b", 13), new Instant(16L))) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.into(Sessions.withGapDuration(Duration.millis(5)))); + + PCollection sum = + input.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults()); + + PCollection> sumPerKey = input.apply(Combine.perKey(new TestCombineFn())); + + PAssert.that(sum).containsInAnyOrder(7, 13); + PAssert.that(sumPerKey) + .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13"))); + pipeline.run(); + } - PCollection globallyInput = perKeyInput.apply(Values.create()); + @Test + @Category(ValidatesRunner.class) + public void testSessionsCombineWithContext() { + PCollection> perKeyInput = + pipeline.apply( + Create.timestamped( + TimestampedValue.of(KV.of("a", 1), new Instant(0L)), + TimestampedValue.of(KV.of("a", 1), new Instant(4L)), + TimestampedValue.of(KV.of("a", 4), new Instant(7L)), + TimestampedValue.of(KV.of("b", 1), new Instant(10L)), + TimestampedValue.of(KV.of("b", 13), new Instant(16L))) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + PCollection globallyInput = perKeyInput.apply(Values.create()); + + PCollection fixedWindowsSum = + globallyInput + .apply("FixedWindows", Window.into(FixedWindows.of(Duration.millis(5)))) + .apply("Sum", Combine.globally(new SumInts()).withoutDefaults()); + + PCollectionView globallyFixedWindowsView = + fixedWindowsSum.apply(View.asSingleton().withDefaultValue(0)); + + PCollection> sessionsCombinePerKey = + perKeyInput + .apply( + "PerKey Input Sessions", Window.into(Sessions.withGapDuration(Duration.millis(5)))) + .apply( + Combine.perKey( + new TestCombineFnWithContext(globallyFixedWindowsView)) + .withSideInputs(globallyFixedWindowsView)); + + PCollection sessionsCombineGlobally = + globallyInput + .apply( + "Globally Input Sessions", + Window.into(Sessions.withGapDuration(Duration.millis(5)))) + .apply( + Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView)) + .withoutDefaults() + .withSideInputs(globallyFixedWindowsView)); + + PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13); + PAssert.that(sessionsCombinePerKey) + .containsInAnyOrder( + Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13"))); + PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13"); + pipeline.run(); + } - PCollection sum = globallyInput.apply("Sum", Sum.integersGlobally().withoutDefaults()); + @Test + @Category(ValidatesRunner.class) + public void testWindowedCombineEmpty() { + PCollection mean = + pipeline + .apply(Create.empty(BigEndianIntegerCoder.of())) + .apply(Window.into(FixedWindows.of(Duration.millis(1)))) + .apply(Combine.globally(new MeanInts()).withoutDefaults()); - PCollectionView globallySumView = sum.apply(View.asSingleton()); + PAssert.that(mean).empty(); - PCollection> combinePerKeyWithContext = - perKeyInput.apply( - Combine.perKey(new TestCombineFnWithContext(globallySumView)) - .withSideInputs(globallySumView)); + pipeline.run(); + } - PCollection combineGloballyWithContext = globallyInput - .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) - .withoutDefaults() - .withSideInputs(globallySumView)); + @Test + @Category(ValidatesRunner.class) + public void testCombineGloballyAsSingletonView() { + final PCollectionView view = pipeline + .apply("CreateEmptySideInput", Create.empty(BigEndianIntegerCoder.of())) + .apply(Sum.integersGlobally().asSingletonView()); + + PCollection output = pipeline + .apply("CreateVoidMainInput", Create.of((Void) null)) + .apply("OutputSideInput", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.sideInput(view)); + } + }).withSideInputs(view)); + + PAssert.thatSingleton(output).isEqualTo(0); + pipeline.run(); + } - PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13); - PAssert.that(combinePerKeyWithContext) - .containsInAnyOrder( - Arrays.asList( - KV.of("a", "1:1"), - KV.of("a", "2:11"), - KV.of("a", "1:1"), - KV.of("a", "4:4"), - KV.of("a", "5:4"), - KV.of("b", "5:1"), - KV.of("b", "14:113"), - KV.of("b", "13:13"))); - PAssert.that(combineGloballyWithContext).containsInAnyOrder( - "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13"); - pipeline.run(); - } + @Test + @Category(ValidatesRunner.class) + public void testWindowedCombineGloballyAsSingletonView() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1)); + final PCollectionView view = + pipeline + .apply( + "CreateSideInput", + Create.timestamped( + TimestampedValue.of(1, new Instant(100)), + TimestampedValue.of(3, new Instant(100)))) + .apply("WindowSideInput", Window.into(windowFn)) + .apply("CombineSideInput", Sum.integersGlobally().asSingletonView()); + + TimestampedValue nonEmptyElement = TimestampedValue.of(null, new Instant(100)); + TimestampedValue emptyElement = TimestampedValue.atMinimumTimestamp(null); + PCollection output = + pipeline + .apply( + "CreateMainInput", + Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) + .apply("WindowMainInput", Window.into(windowFn)) + .apply( + "OutputSideInput", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.sideInput(view)); + } + }) + .withSideInputs(view)); + + PAssert.that(output).containsInAnyOrder(4, 0); + PAssert.that(output) + .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp())) + .containsInAnyOrder(4); + PAssert.that(output) + .inWindow(windowFn.assignWindow(emptyElement.getTimestamp())) + .containsInAnyOrder(0); + pipeline.run(); + } - private static class FormatPaneInfo extends DoFn { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element() + ": " + c.pane().isLast()); + /** + * Tests creation of a global {@link Combine} via Java 8 lambda. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombineGloballyLambda() { + + PCollection output = pipeline + .apply(Create.of(1, 2, 3, 4)) + .apply(Combine.globally(integers -> { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + })); + + PAssert.that(output).containsInAnyOrder(10); + pipeline.run(); } - } - @Test - @Category(ValidatesRunner.class) - public void testGlobalCombineWithDefaultsAndTriggers() { - PCollection input = pipeline.apply(Create.of(1, 1)); - - PCollection output = input - .apply(Window.into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .accumulatingFiredPanes() - .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS)) - .apply(Sum.integersGlobally()) - .apply(ParDo.of(new FormatPaneInfo())); - - // The actual elements produced are nondeterministic. Could be one, could be two. - // But it should certainly have a final element with the correct final sum. - PAssert.that(output) - .satisfies( - input1 -> { - assertThat(input1, hasItem("2: true")); - return null; - }); + /** + * Tests creation of a global {@link Combine} via a Java 8 method reference. + */ + @Test + @Category(ValidatesRunner.class) + public void testCombineGloballyInstanceMethodReference() { - pipeline.run(); - } + PCollection output = pipeline + .apply(Create.of(1, 2, 3, 4)) + .apply(Combine.globally(new Summer()::sum)); - @Test - @Category(ValidatesRunner.class) - public void testSessionsCombine() { - PCollection> input = - pipeline - .apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(4L)), - TimestampedValue.of(KV.of("a", 4), new Instant(7L)), - TimestampedValue.of(KV.of("b", 1), new Instant(10L)), - TimestampedValue.of(KV.of("b", 13), new Instant(16L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.into(Sessions.withGapDuration(Duration.millis(5)))); - - PCollection sum = - input.apply(Values.create()).apply(Combine.globally(new SumInts()).withoutDefaults()); + PAssert.that(output).containsInAnyOrder(10); + pipeline.run(); + } + } - PCollection> sumPerKey = input.apply(Combine.perKey(new TestCombineFn())); + /** Tests validating accumulation scenarios. */ + @RunWith(JUnit4.class) + public class AccumulationTests { + @Test + @Category(ValidatesRunner.class) + public void testAccumulatingCombine() { + runTestAccumulatingCombine(Arrays.asList( + KV.of("a", 1), + KV.of("a", 1), + KV.of("a", 4), + KV.of("b", 1), + KV.of("b", 13) + ), 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0))); + } - PAssert.that(sum).containsInAnyOrder(7, 13); - PAssert.that(sumPerKey) - .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13"))); - pipeline.run(); + @Test + @Category(ValidatesRunner.class) + public void testAccumulatingCombineEmpty() { + runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.emptyList()); + } } - @Test - @Category(NeedsRunner.class) - public void testSessionsCombineWithContext() { - PCollection> perKeyInput = - pipeline.apply( - Create.timestamped( - TimestampedValue.of(KV.of("a", 1), new Instant(0L)), - TimestampedValue.of(KV.of("a", 1), new Instant(4L)), - TimestampedValue.of(KV.of("a", 4), new Instant(7L)), - TimestampedValue.of(KV.of("b", 1), new Instant(10L)), - TimestampedValue.of(KV.of("b", 13), new Instant(16L))) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + @SuppressWarnings("unchecked") + private void runTestBasicCombine(List> table, + Set globalUnique, + List>> perKeyUnique) { + PCollection> input = createInput(pipeline, table); - PCollection globallyInput = perKeyInput.apply(Values.create()); + PCollection> unique = + input.apply(Values.create()).apply(Combine.globally(new UniqueInts())); + + PCollection>> uniquePerKey = + input.apply(Combine.perKey(new UniqueInts())); + + PAssert.that(unique).containsInAnyOrder(globalUnique); + PAssert.that(uniquePerKey).containsInAnyOrder(perKeyUnique); - PCollection fixedWindowsSum = - globallyInput - .apply("FixedWindows", Window.into(FixedWindows.of(Duration.millis(5)))) - .apply("Sum", Combine.globally(new SumInts()).withoutDefaults()); - - PCollectionView globallyFixedWindowsView = - fixedWindowsSum.apply(View.asSingleton().withDefaultValue(0)); - - PCollection> sessionsCombinePerKey = - perKeyInput - .apply( - "PerKey Input Sessions", Window.into(Sessions.withGapDuration(Duration.millis(5)))) - .apply( - Combine.perKey( - new TestCombineFnWithContext(globallyFixedWindowsView)) - .withSideInputs(globallyFixedWindowsView)); - - PCollection sessionsCombineGlobally = - globallyInput - .apply( - "Globally Input Sessions", - Window.into(Sessions.withGapDuration(Duration.millis(5)))) - .apply( - Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView)) - .withoutDefaults() - .withSideInputs(globallyFixedWindowsView)); - - PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13); - PAssert.that(sessionsCombinePerKey) - .containsInAnyOrder( - Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13"))); - PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13"); pipeline.run(); } - @Test - @Category(NeedsRunner.class) - public void testWindowedCombineEmpty() { - PCollection mean = - pipeline - .apply(Create.empty(BigEndianIntegerCoder.of())) - .apply(Window.into(FixedWindows.of(Duration.millis(1)))) - .apply(Combine.globally(new MeanInts()).withoutDefaults()); + private void runTestAccumulatingCombine(List> table, + Double globalMean, + List> perKeyMeans) { + PCollection> input = createInput(pipeline, table); - PAssert.that(mean).empty(); + PCollection mean = input.apply(Values.create()).apply(Combine.globally(new MeanInts())); - pipeline.run(); - } + PCollection> meanPerKey = input.apply(Combine.perKey(new MeanInts())); - @Test - @Category(NeedsRunner.class) - public void testAccumulatingCombine() { - runTestAccumulatingCombine(Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - ), 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0))); - } + PAssert.that(mean).containsInAnyOrder(globalMean); + PAssert.that(meanPerKey).containsInAnyOrder(perKeyMeans); - @Test - @Category(NeedsRunner.class) - public void testAccumulatingCombineEmpty() { - runTestAccumulatingCombine(EMPTY_TABLE, 0.0, Collections.emptyList()); + pipeline.run(); } - // Checks that Min, Max, Mean, Sum (operations that pass-through to Combine) have good names. - @Test - public void testCombinerNames() { - Combine.PerKey min = Min.integersPerKey(); - Combine.PerKey max = Max.integersPerKey(); - Combine.PerKey mean = Mean.perKey(); - Combine.PerKey sum = Sum.integersPerKey(); - - assertThat(min.getName(), equalTo("Combine.perKey(MinInteger)")); - assertThat(max.getName(), equalTo("Combine.perKey(MaxInteger)")); - assertThat(mean.getName(), equalTo("Combine.perKey(Mean)")); - assertThat(sum.getName(), equalTo("Combine.perKey(SumInteger)")); + private static class FormatPaneInfo extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element() + ": " + c.pane().isLast()); + } } private static final SerializableFunction hotKeyFanout = @@ -584,36 +939,6 @@ public void testCombinerNames() { private static final SerializableFunction splitHotKeyFanout = input -> Math.random() < 0.5 ? 3 : 0; - @Test - @Category(ValidatesRunner.class) - public void testHotKeyCombining() { - PCollection> input = copy(createInput(pipeline, Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - )), 10); - - CombineFn mean = new MeanInts(); - PCollection> coldMean = input.apply("ColdMean", - Combine.perKey(mean).withHotKeyFanout(0)); - PCollection> warmMean = input.apply("WarmMean", - Combine.perKey(mean).withHotKeyFanout(hotKeyFanout)); - PCollection> hotMean = input.apply("HotMean", - Combine.perKey(mean).withHotKeyFanout(5)); - PCollection> splitMean = input.apply("SplitMean", - Combine.perKey(mean).withHotKeyFanout(splitHotKeyFanout)); - - List> expected = Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)); - PAssert.that(coldMean).containsInAnyOrder(expected); - PAssert.that(warmMean).containsInAnyOrder(expected); - PAssert.that(hotMean).containsInAnyOrder(expected); - PAssert.that(splitMean).containsInAnyOrder(expected); - - pipeline.run(); - } - private static class GetLast extends DoFn { @ProcessElement public void processElement(ProcessContext c) { @@ -623,60 +948,6 @@ public void processElement(ProcessContext c) { } } - @Test - @Category(NeedsRunner.class) - public void testHotKeyCombiningWithAccumulationMode() { - PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5)); - - PCollection output = input - .apply(Window.into(new GlobalWindows()) - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) - .accumulatingFiredPanes() - .withAllowedLateness(new Duration(0), ClosingBehavior.FIRE_ALWAYS)) - .apply(Sum.integersGlobally().withoutDefaults().withFanout(2)) - .apply(ParDo.of(new GetLast())); - - PAssert.that(output) - .satisfies( - input1 -> { - assertThat(input1, hasItem(15)); - return null; - }); - - pipeline.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testBinaryCombineFn() { - PCollection> input = copy(createInput(pipeline, Arrays.asList( - KV.of("a", 1), - KV.of("a", 1), - KV.of("a", 4), - KV.of("b", 1), - KV.of("b", 13) - )), 2); - PCollection> intProduct = - input.apply("IntProduct", Combine.perKey(new TestProdInt())); - PCollection> objProduct = - input.apply("ObjProduct", Combine.perKey(new TestProdObj())); - - List> expected = Arrays.asList(KV.of("a", 16), KV.of("b", 169)); - PAssert.that(intProduct).containsInAnyOrder(expected); - PAssert.that(objProduct).containsInAnyOrder(expected); - - pipeline.run(); - } - - @Test - public void testBinaryCombineFnWithNulls() { - testCombineFn(new NullCombiner(), Arrays.asList(3, 3, 5), 45); - testCombineFn(new NullCombiner(), Arrays.asList(null, 3, 5), 30); - testCombineFn(new NullCombiner(), Arrays.asList(3, 3, null), 18); - testCombineFn(new NullCombiner(), Arrays.asList(null, 3, null), 12); - testCombineFn(new NullCombiner(), Arrays.asList(null, null, null), 8); - } - private static final class TestProdInt extends Combine.BinaryCombineIntegerFn { @Override public int apply(int left, int right) { @@ -706,148 +977,6 @@ public Integer apply(Integer left, Integer right) { } } - @Test - @Category(NeedsRunner.class) - public void testCombineGloballyAsSingletonView() { - final PCollectionView view = pipeline - .apply("CreateEmptySideInput", Create.empty(BigEndianIntegerCoder.of())) - .apply(Sum.integersGlobally().asSingletonView()); - - PCollection output = pipeline - .apply("CreateVoidMainInput", Create.of((Void) null)) - .apply("OutputSideInput", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.sideInput(view)); - } - }).withSideInputs(view)); - - PAssert.thatSingleton(output).isEqualTo(0); - pipeline.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testWindowedCombineGloballyAsSingletonView() { - FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1)); - final PCollectionView view = - pipeline - .apply( - "CreateSideInput", - Create.timestamped( - TimestampedValue.of(1, new Instant(100)), - TimestampedValue.of(3, new Instant(100)))) - .apply("WindowSideInput", Window.into(windowFn)) - .apply("CombineSideInput", Sum.integersGlobally().asSingletonView()); - - TimestampedValue nonEmptyElement = TimestampedValue.of(null, new Instant(100)); - TimestampedValue emptyElement = TimestampedValue.atMinimumTimestamp(null); - PCollection output = - pipeline - .apply( - "CreateMainInput", - Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) - .apply("WindowMainInput", Window.into(windowFn)) - .apply( - "OutputSideInput", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.sideInput(view)); - } - }) - .withSideInputs(view)); - - PAssert.that(output).containsInAnyOrder(4, 0); - PAssert.that(output) - .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp())) - .containsInAnyOrder(4); - PAssert.that(output) - .inWindow(windowFn.assignWindow(emptyElement.getTimestamp())) - .containsInAnyOrder(0); - pipeline.run(); - } - - @Test - public void testCombineGetName() { - assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName()); - assertEquals( - "Combine.GloballyAsSingletonView", - Combine.globally(new SumInts()).asSingletonView().getName()); - assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName()); - assertEquals( - "Combine.perKeyWithFanout(Test)", - Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName()); - } - - @Test - public void testDisplayData() { - UniqueInts combineFn = new UniqueInts() { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("fnMetadata", "foobar")); - } - }; - Combine.Globally combine = Combine.globally(combineFn) - .withFanout(1234); - DisplayData displayData = DisplayData.from(combine); - - assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass())); - assertThat(displayData, hasDisplayItem("emitDefaultOnEmptyInput", true)); - assertThat(displayData, hasDisplayItem("fanout", 1234)); - assertThat(displayData, includesDisplayDataFor("combineFn", combineFn)); - } - - @Test - public void testDisplayDataForWrappedFn() { - UniqueInts combineFn = new UniqueInts() { - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - Combine.PerKey combine = Combine.perKey(combineFn); - DisplayData displayData = DisplayData.from(combine); - - assertThat(displayData, hasDisplayItem("combineFn", combineFn.getClass())); - assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass()))); - } - - @Test - @Category(ValidatesRunner.class) - public void testCombinePerKeyPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - - CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); - PTransform>, ? extends POutput> combine = - Combine.perKey(combineFn); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, - KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - - assertThat("Combine.perKey should include the combineFn in its primitive transform", - displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); - } - - @Test - @Category(ValidatesRunner.class) - public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() { - int hotKeyFanout = 2; - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - - CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); - PTransform>, PCollection>>> combine = - Combine.>perKey(combineFn).withHotKeyFanout(hotKeyFanout); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, - KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - - assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive " - + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); - assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive " - + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout))); - } //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns. @@ -1254,115 +1383,4 @@ public int sum(Iterable integers) { return sum; } } - - /** - * Tests creation of a global {@link Combine} via Java 8 lambda. - */ - @Test - @Category(NeedsRunner.class) - public void testCombineGloballyLambda() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3, 4)) - .apply(Combine.globally(integers -> { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - })); - - PAssert.that(output).containsInAnyOrder(10); - pipeline.run(); - } - - /** - * Tests creation of a global {@link Combine} via a Java 8 method reference. - */ - @Test - @Category(NeedsRunner.class) - public void testCombineGloballyInstanceMethodReference() { - - PCollection output = pipeline - .apply(Create.of(1, 2, 3, 4)) - .apply(Combine.globally(new Summer()::sum)); - - PAssert.that(output).containsInAnyOrder(10); - pipeline.run(); - } - - /** - * Tests creation of a per-key {@link Combine} via a Java 8 lambda. - */ - @Test - @Category(NeedsRunner.class) - public void testCombinePerKeyLambda() { - - PCollection> output = pipeline - .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) - .apply(Combine.perKey(integers -> { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - })); - - PAssert.that(output).containsInAnyOrder( - KV.of("a", 4), - KV.of("b", 2), - KV.of("c", 4)); - pipeline.run(); - } - - /** - * Tests creation of a per-key {@link Combine} via a Java 8 method reference. - */ - @Test - @Category(NeedsRunner.class) - public void testCombinePerKeyInstanceMethodReference() { - - PCollection> output = pipeline - .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) - .apply(Combine.perKey(new Summer()::sum)); - - PAssert.that(output).containsInAnyOrder( - KV.of("a", 4), - KV.of("b", 2), - KV.of("c", 4)); - pipeline.run(); - } - - /** - * Tests that we can serialize {@link Combine.CombineFn CombineFns} constructed from a lambda. - * Lambdas can be problematic because the {@link Class} object is synthetic and cannot be - * deserialized. - */ - @Test - public void testLambdaSerialization() { - SerializableFunction, Object> combiner = xs -> Iterables.getFirst(xs, 0); - - boolean lambdaClassSerializationThrows; - try { - SerializableUtils.clone(combiner.getClass()); - lambdaClassSerializationThrows = false; - } catch (IllegalArgumentException e) { - // Expected - lambdaClassSerializationThrows = true; - } - Assume.assumeTrue("Expected lambda class serialization to fail. " - + "If it's fixed, we can remove special behavior in Combine.", - lambdaClassSerializationThrows); - - - Combine.Globally combine = Combine.globally(combiner); - SerializableUtils.clone(combine); // should not throw. - } - - @Test - public void testLambdaDisplayData() { - Combine.Globally combine = Combine.globally(xs -> Iterables.getFirst(xs, 0)); - DisplayData displayData = DisplayData.from(combine); - MatcherAssert.assertThat(displayData.items(), not(empty())); - } }