From bd0bbdf725cc76121db222e29317a40c69ed8dfa Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 8 Jun 2016 15:07:52 -0700 Subject: [PATCH 1/7] Base PAssert on GBK instead of side inputs Previously PAssert - hence all RunnableOnService/NeedsRunner tests - required side input support. This created a very steep on ramp for new runners. GroupByKey is a bit more fundamental and most backends will be able to group by key in the global window very quickly. So switching the primitive used to gather all the contents of a PCollection for assertions should make it a bit easier to get early feedback during runner development. --- .../testing/TestDataflowPipelineRunner.java | 3 +- .../org/apache/beam/sdk/testing/PAssert.java | 693 +++++++++--------- .../apache/beam/sdk/testing/PAssertTest.java | 27 - 3 files changed, 333 insertions(+), 390 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java index 3e8d9033cc2f..c940e9a5b7d0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java @@ -166,7 +166,8 @@ public Optional call() throws Exception { public OutputT apply( PTransform transform, InputT input) { if (transform instanceof PAssert.OneSideInputAssert - || transform instanceof PAssert.TwoSideInputAssert) { + || transform instanceof PAssert.GroupThenAssert + || transform instanceof PAssert.GroupThenAssertForSingleton) { expectedNumberOfAssertions += 1; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index c2cd598cce0f..b708d36894eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -34,11 +34,14 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; @@ -48,13 +51,13 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -64,16 +67,12 @@ import java.util.NoSuchElementException; /** - * An assertion on the contents of a {@link PCollection} - * incorporated into the pipeline. Such an assertion - * can be checked no matter what kind of {@link PipelineRunner} is - * used. + * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an + * assertion can be checked no matter what kind of {@link PipelineRunner} is used. * - *

Note that the {@code PAssert} call must precede the call - * to {@link Pipeline#run}. + *

Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}. * - *

Examples of use: - *

{@code
+ * 

Examples of use:

{@code
  * Pipeline p = TestPipeline.create();
  * ...
  * PCollection output =
@@ -107,23 +106,42 @@ public class PAssert {
   private PAssert() {}
 
   /**
-   * Constructs an {@link IterableAssert} for the elements of the provided
-   * {@link PCollection}.
+   * Builder interface for assertions applicable to iterables and PCollection contents.
+   */
+  public interface IterableAssert {
+    IterableAssert containsInAnyOrder(T... expectedElements);
+
+    IterableAssert containsInAnyOrder(Iterable expectedElements);
+
+    IterableAssert empty();
+
+    IterableAssert satisfies(SerializableFunction, Void> checkerFn);
+  }
+
+  /**
+   * Builder interface for assertions applicable to a single value.
+   */
+  public interface SingletonAssert {
+    SingletonAssert isEqualTo(T expected);
+
+    SingletonAssert notEqualTo(T notExpected);
+
+    SingletonAssert satisfies(SerializableFunction checkerFn);
+  }
+
+  /**
+   * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}.
    */
   public static  IterableAssert that(PCollection actual) {
-    return new IterableAssert<>(
-        new CreateActual>(actual, View.asIterable()),
-         actual.getPipeline())
-         .setCoder(actual.getCoder());
+    return new PCollectionContentsAssert<>(actual);
   }
 
   /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@link PCollection} which must contain a single {@code Iterable}
-   * value.
+   * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which
+   * must contain a single {@code Iterable} value.
    */
-  public static  IterableAssert
-      thatSingletonIterable(PCollection> actual) {
+  public static  IterableAssert thatSingletonIterable(
+      PCollection> actual) {
 
     List> maybeElementCoder = actual.getCoder().getCoderArguments();
     Coder tCoder;
@@ -141,19 +159,7 @@ public static  IterableAssert that(PCollection actual) {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection> actualIterables = (PCollection>) actual;
 
-    return new IterableAssert<>(
-        new CreateActual, Iterable>(
-            actualIterables, View.>asSingleton()),
-        actual.getPipeline())
-        .setCoder(tCoder);
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@code PCollectionView PCollectionView>}.
-   */
-  public static  IterableAssert thatIterable(PCollectionView> actual) {
-    return new IterableAssert<>(new PreExisting>(actual), actual.getPipeline());
+    return new PCollectionSingletonIterableAssert<>(actualIterables);
   }
 
   /**
@@ -161,92 +167,102 @@ public static  IterableAssert thatIterable(PCollectionView> ac
    * {@code PCollection PCollection}, which must be a singleton.
    */
   public static  SingletonAssert thatSingleton(PCollection actual) {
-    return new SingletonAssert<>(
-        new CreateActual(actual, View.asSingleton()), actual.getPipeline())
-        .setCoder(actual.getCoder());
+    return new PCollectionViewAssert<>(actual, View.asSingleton(), actual.getCoder());
   }
 
   /**
    * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
    *
-   * 

Note that the actual value must be coded by a {@link KvCoder}, - * not just any {@code Coder}. + *

Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder}. */ - public static SingletonAssert>> - thatMultimap(PCollection> actual) { + public static SingletonAssert>> thatMultimap( + PCollection> actual) { @SuppressWarnings("unchecked") KvCoder kvCoder = (KvCoder) actual.getCoder(); - - return new SingletonAssert<>( - new CreateActual<>(actual, View.asMultimap()), actual.getPipeline()) - .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); + return new PCollectionViewAssert<>( + actual, + View.asMultimap(), + MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); } /** - * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, - * which must have at most one value per key. + * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which + * must have at most one value per key. * - *

Note that the actual value must be coded by a {@link KvCoder}, - * not just any {@code Coder}. + *

Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder}. */ public static SingletonAssert> thatMap(PCollection> actual) { @SuppressWarnings("unchecked") KvCoder kvCoder = (KvCoder) actual.getCoder(); - - return new SingletonAssert<>( - new CreateActual<>(actual, View.asMap()), actual.getPipeline()) - .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); + return new PCollectionViewAssert<>( + actual, View.asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); } //////////////////////////////////////////////////////////// /** - * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable}. + * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require + * the runner to support side inputs. */ - public static class IterableAssert implements Serializable { - private final Pipeline pipeline; - private final PTransform>> createActual; - private Optional> coder; + private static class PCollectionContentsAssert implements IterableAssert { + private final PCollection actual; - protected IterableAssert( - PTransform>> createActual, Pipeline pipeline) { - this.createActual = createActual; - this.pipeline = pipeline; - this.coder = Optional.absent(); + public PCollectionContentsAssert(PCollection actual) { + this.actual = actual; } /** - * Sets the coder to use for elements of type {@code T}, as needed for internal purposes. + * Checks that the {@code Iterable} contains the expected elements, in any order. * *

Returns this {@code IterableAssert}. */ - public IterableAssert setCoder(Coder coderOrNull) { - this.coder = Optional.fromNullable(coderOrNull); - return this; + @Override + @SafeVarargs + public final PCollectionContentsAssert containsInAnyOrder(T... expectedElements) { + return containsInAnyOrder(Arrays.asList(expectedElements)); } /** - * Gets the coder, which may yet be absent. + * Checks that the {@code Iterable} contains the expected elements, in any order. + * + *

Returns this {@code IterableAssert}. */ - public Coder getCoder() { - if (coder.isPresent()) { - return coder.get(); - } else { - throw new IllegalStateException( - "Attempting to access the coder of an IterableAssert" - + " that has not been set yet."); - } + public PCollectionContentsAssert containsInAnyOrder(Iterable expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); } /** - * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}. + * Checks that the {@code Iterable} contains elements that match the provided matchers, in any + * order. * *

Returns this {@code IterableAssert}. */ - public IterableAssert satisfies(SerializableFunction, Void> checkerFn) { - pipeline.apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert>(createActual, checkerFn)); + @SafeVarargs + final PCollectionContentsAssert containsInAnyOrder( + SerializableMatcher... elementMatchers) { + return satisfies(SerializableMatchers.containsInAnyOrder(elementMatchers)); + } + + /** + * Checks that the {@link Iterable} is empty. + * + *

Returns this {@link IterableAssert}. + */ + public PCollectionContentsAssert empty() { + return containsInAnyOrder(Collections.emptyList()); + } + + /** + * Applies a {@link SerializableFunction} to check the elements of the {@link PCollection}. + * + *

Returns this {@link IterableAssert}. + */ + @Override + public PCollectionContentsAssert satisfies( + SerializableFunction, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); return this; } @@ -255,17 +271,11 @@ public IterableAssert satisfies(SerializableFunction, Void> check * *

Returns this {@code IterableAssert}. */ - public IterableAssert satisfies( - AssertRelation, Iterable> relation, - final Iterable expectedElements) { - pipeline.apply( - "PAssert$" + (assertCount++), - new TwoSideInputAssert, Iterable>( - createActual, - new CreateExpected>(expectedElements, coder, View.asIterable()), - relation)); - - return this; + private PCollectionContentsAssert satisfies( + AssertRelation, Iterable> relation, Iterable expectedElements) { + return satisfies( + new CheckRelationAgainstExpected>( + relation, expectedElements, IterableCoder.of(actual.getCoder()))); } /** @@ -273,15 +283,14 @@ public IterableAssert satisfies( * *

Returns this {@code IterableAssert}. */ - IterableAssert satisfies(final SerializableMatcher> matcher) { + PCollectionContentsAssert satisfies( + final SerializableMatcher> matcher) { // Safe covariant cast. Could be elided by changing a lot of this file to use // more flexible bounds. @SuppressWarnings({"rawtypes", "unchecked"}) SerializableFunction, Void> checkerFn = - (SerializableFunction) new MatcherCheckerFn<>(matcher); - pipeline.apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert>(createActual, checkerFn)); + (SerializableFunction) new MatcherCheckerFn<>(matcher); + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); return this; } @@ -299,20 +308,10 @@ public Void apply(T actual) { } } - /** - * Checks that the {@code Iterable} is empty. - * - *

Returns this {@code IterableAssert}. - */ - public IterableAssert empty() { - return satisfies(new AssertContainsInAnyOrderRelation(), Collections.emptyList()); - } - /** * @throws UnsupportedOperationException always - * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. - * If you meant to test object equality, use a variant of {@link #containsInAnyOrder} - * instead. + * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant + * to test object equality, use a variant of {@link #containsInAnyOrder} instead. */ @Deprecated @Override @@ -331,169 +330,140 @@ public int hashCode() { throw new UnsupportedOperationException( String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName())); } + } - /** - * Checks that the {@code Iterable} contains the expected elements, in any - * order. - * - *

Returns this {@code IterableAssert}. - */ - public IterableAssert containsInAnyOrder(Iterable expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); - } + /** + * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}. + * This does not require the runner to support side inputs. + */ + private static class PCollectionSingletonIterableAssert implements IterableAssert { + private final PCollection> actual; + private final Coder elementCoder; - /** - * Checks that the {@code Iterable} contains the expected elements, in any - * order. - * - *

Returns this {@code IterableAssert}. - */ - @SafeVarargs - public final IterableAssert containsInAnyOrder(T... expectedElements) { - return satisfies( - new AssertContainsInAnyOrderRelation(), - Arrays.asList(expectedElements)); + public PCollectionSingletonIterableAssert(PCollection> actual) { + this.actual = actual; + this.elementCoder = (Coder) actual.getCoder().getCoderArguments().get(0); } - /** - * Checks that the {@code Iterable} contains elements that match the provided matchers, - * in any order. - * - *

Returns this {@code IterableAssert}. - */ - @SafeVarargs - final IterableAssert containsInAnyOrder( - SerializableMatcher... elementMatchers) { - return satisfies(SerializableMatchers.containsInAnyOrder(elementMatchers)); + @Override + public PCollectionSingletonIterableAssert containsInAnyOrder(T... expectedElements) { + return containsInAnyOrder(Arrays.asList(expectedElements)); } - } - /** - * An assertion about the single value of type {@code T} - * associated with a {@link PCollectionView}. - */ - public static class SingletonAssert implements Serializable { - private final Pipeline pipeline; - private final CreateActual createActual; - private Optional> coder; - - protected SingletonAssert( - CreateActual createActual, Pipeline pipeline) { - this.pipeline = pipeline; - this.createActual = createActual; - this.coder = Optional.absent(); + @Override + public PCollectionSingletonIterableAssert empty() { + return containsInAnyOrder(Collections.emptyList()); } - /** - * Always throws an {@link UnsupportedOperationException}: users are probably looking for - * {@link #isEqualTo}. - */ - @Deprecated @Override - public boolean equals(Object o) { - throw new UnsupportedOperationException( - String.format( - "tests for Java equality of the %s object, not the PCollection in question. " - + "Call a test method, such as isEqualTo.", - getClass().getSimpleName())); + public PCollectionSingletonIterableAssert containsInAnyOrder(Iterable expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); } - /** - * @throws UnsupportedOperationException always. - * @deprecated {@link Object#hashCode()} is not supported on PAssert objects. - */ - @Deprecated @Override - public int hashCode() { - throw new UnsupportedOperationException( - String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); + public PCollectionSingletonIterableAssert satisfies( + SerializableFunction, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn)); + return this; } - /** - * Sets the coder to use for elements of type {@code T}, as needed - * for internal purposes. - * - *

Returns this {@code IterableAssert}. - */ - public SingletonAssert setCoder(Coder coderOrNull) { - this.coder = Optional.fromNullable(coderOrNull); - return this; + private PCollectionSingletonIterableAssert satisfies( + AssertRelation, Iterable> relation, Iterable expectedElements) { + return satisfies( + new CheckRelationAgainstExpected>( + relation, expectedElements, IterableCoder.of(elementCoder))); } + } - /** - * Gets the coder, which may yet be absent. - */ - public Coder getCoder() { - if (coder.isPresent()) { - return coder.get(); - } else { - throw new IllegalStateException( - "Attempting to access the coder of an IterableAssert that has not been set yet."); - } + /** + * An assertion about the contents of a {@link PCollection} when it is viewed as a single value + * of type {@code ViewT}. This requires side input support from the runner. + */ + private static class PCollectionViewAssert implements SingletonAssert { + private final PCollection actual; + private final PTransform, PCollectionView> view; + private final Coder coder; + + protected PCollectionViewAssert( + PCollection actual, + PTransform, PCollectionView> view, + Coder coder) { + this.actual = actual; + this.view = view; + this.coder = coder; } /** - * Applies a {@link SerializableFunction} to check the value of this - * {@code SingletonAssert}'s view. + * Checks that the value of this {@code SingletonAssert}'s view is equal to the expected value. * *

Returns this {@code SingletonAssert}. */ - public SingletonAssert satisfies(SerializableFunction checkerFn) { - pipeline.apply( - "PAssert$" + (assertCount++), new OneSideInputAssert(createActual, checkerFn)); - return this; + public PCollectionViewAssert isEqualTo(ViewT expectedValue) { + return satisfies(new AssertIsEqualToRelation(), expectedValue); } /** - * Applies an {@link AssertRelation} to check the provided relation against the - * value of this assert and the provided expected value. + * Checks that the value of this {@code SingletonAssert}'s view is not equal to the expected + * value. * *

Returns this {@code SingletonAssert}. */ - public SingletonAssert satisfies( - AssertRelation relation, - final T expectedValue) { - pipeline.apply( - "PAssert$" + (assertCount++), - new TwoSideInputAssert( - createActual, - new CreateExpected(Arrays.asList(expectedValue), coder, View.asSingleton()), - relation)); - - return this; + public PCollectionViewAssert notEqualTo(ViewT expectedValue) { + return satisfies(new AssertNotEqualToRelation(), expectedValue); } /** - * Checks that the value of this {@code SingletonAssert}'s view is equal - * to the expected value. + * Applies a {@link SerializableFunction} to check the value of this {@code SingletonAssert}'s + * view. * *

Returns this {@code SingletonAssert}. */ - public SingletonAssert isEqualTo(T expectedValue) { - return satisfies(new AssertIsEqualToRelation(), expectedValue); + @Override + public PCollectionViewAssert satisfies( + SerializableFunction checkerFn) { + actual + .getPipeline() + .apply( + "PAssert$" + (assertCount++), + new OneSideInputAssert(CreateActual.from(actual, view), checkerFn)); + return this; } /** - * Checks that the value of this {@code SingletonAssert}'s view is not equal - * to the expected value. + * Applies an {@link AssertRelation} to check the provided relation against the value of this + * assert and the provided expected value. * *

Returns this {@code SingletonAssert}. */ - public SingletonAssert notEqualTo(T expectedValue) { - return satisfies(new AssertNotEqualToRelation(), expectedValue); + private PCollectionViewAssert satisfies( + AssertRelation relation, final ViewT expectedValue) { + return satisfies(new CheckRelationAgainstExpected(relation, expectedValue, coder)); } /** - * Checks that the value of this {@code SingletonAssert}'s view is equal to - * the expected value. - * - * @deprecated replaced by {@link #isEqualTo} + * Always throws an {@link UnsupportedOperationException}: users are probably looking for + * {@link #isEqualTo}. */ @Deprecated - public SingletonAssert is(T expectedValue) { - return isEqualTo(expectedValue); + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException( + String.format( + "tests for Java equality of the %s object, not the PCollection in question. " + + "Call a test method, such as isEqualTo.", + getClass().getSimpleName())); } + /** + * @throws UnsupportedOperationException always. + * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects. + */ + @Deprecated + @Override + public int hashCode() { + throw new UnsupportedOperationException( + String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); + } } //////////////////////////////////////////////////////////////////////// @@ -504,8 +474,13 @@ private static class CreateActual private final transient PCollection actual; private final transient PTransform, PCollectionView> actualView; - private CreateActual(PCollection actual, - PTransform, PCollectionView> actualView) { + public static CreateActual from( + PCollection actual, PTransform, PCollectionView> actualView) { + return new CreateActual<>(actual, actualView); + } + + private CreateActual( + PCollection actual, PTransform, PCollectionView> actualView) { this.actual = actual; this.actualView = actualView; } @@ -515,73 +490,146 @@ public PCollectionView apply(PBegin input) { final Coder coder = actual.getCoder(); return actual .apply(Window.into(new GlobalWindows())) - .apply(ParDo.of(new DoFn() { - @Override - public void processElement(ProcessContext context) throws CoderException { - context.output(CoderUtils.clone(coder, context.element())); - } - })) + .apply( + ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext context) throws CoderException { + context.output(CoderUtils.clone(coder, context.element())); + } + })) .apply(actualView); } } - private static class CreateExpected - extends PTransform> { - - private final Iterable elements; - private final Optional> coder; - private final transient PTransform, PCollectionView> view; + /** + * A partially applied {@link AssertRelation}, where one value is provided along with a coder to + * serialize/deserialize them. + */ + private static class CheckRelationAgainstExpected implements SerializableFunction { + private final AssertRelation relation; + private final byte[] encodedExpected; + private final Coder coder; - private CreateExpected(Iterable elements, Optional> coder, - PTransform, PCollectionView> view) { - this.elements = elements; + public CheckRelationAgainstExpected(AssertRelation relation, T expected, Coder coder) { + this.relation = relation; this.coder = coder; - this.view = view; + + try { + this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected); + } catch (IOException coderException) { + throw new RuntimeException(coderException); + } } @Override - public PCollectionView apply(PBegin input) { - Create.Values createTransform = Create.of(elements); - if (coder.isPresent()) { - createTransform = createTransform.withCoder(coder.get()); + public Void apply(T actual) { + try { + T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected); + return relation.assertFor(expected).apply(actual); + } catch (IOException coderException) { + throw new RuntimeException(coderException); } - return input.apply(createTransform).apply(view); } } - private static class PreExisting extends PTransform> { + /** + * A transform that gathers the contents of a {@link PCollection} into a single main input + * iterable in the global window. This requires a runner to support {@link GroupByKey} in the + * global window, but not side inputs or other windowing or triggers. + */ + private static class GroupGlobally extends PTransform, PCollection>> + implements Serializable { - private final PCollectionView view; + public GroupGlobally() {} - private PreExisting(PCollectionView view) { - this.view = view; + @Override + public PCollection> apply(PCollection input) { + return input + .apply("GloballyWindow", Window.into(new GlobalWindows())) + .apply("DummyKey", WithKeys.of(0)) + .apply("GroupByKey", GroupByKey.create()) + .apply("GetOnlyValue", Values.>create()); + } + } + + /** + * A transform that applies an assertion-checking function over iterables of {@code ActualT} to + * the entirety of the contents of its input. + */ + public static class GroupThenAssert extends PTransform, PDone> + implements Serializable { + private final SerializableFunction, Void> checkerFn; + + private GroupThenAssert(SerializableFunction, Void> checkerFn) { + this.checkerFn = checkerFn; } @Override - public PCollectionView apply(PBegin input) { - return view; + public PDone apply(PCollection input) { + input + .apply("GroupGlobally", new GroupGlobally()) + .apply( + "RunChecks", + ParDo.of( + new DoFn, Void>() { + @Override + public void processElement(ProcessContext context) { + checkerFn.apply(context.element()); + } + })); + + return PDone.in(input.getPipeline()); } } /** - * An assertion checker that takes a single - * {@link PCollectionView PCollectionView<ActualT>} - * and an assertion over {@code ActualT}, and checks it within a dataflow - * pipeline. + * A transform that applies an assertion-checking function to a single iterable contained as the + * sole element of a {@link PCollection}. + */ + public static class GroupThenAssertForSingleton + extends PTransform>, PDone> implements Serializable { + private final SerializableFunction, Void> checkerFn; + + private GroupThenAssertForSingleton(SerializableFunction, Void> checkerFn) { + this.checkerFn = checkerFn; + } + + @Override + public PDone apply(PCollection> input) { + input + .apply("GroupGlobally", new GroupGlobally>()) + .apply( + "RunChecks", + ParDo.of( + new DoFn>, Void>() { + @Override + public void processElement(ProcessContext context) { + checkerFn.apply(Iterables.getOnlyElement(context.element())); + } + })); + + return PDone.in(input.getPipeline()); + } + } + + /** + * An assertion checker that takes a single {@link PCollectionView + * PCollectionView<ActualT>} and an assertion over {@code ActualT}, and checks it within a + * dataflow pipeline. * - *

Note that the entire assertion must be serializable. If - * you need to make assertions involving multiple inputs - * that are each not serializable, use TwoSideInputAssert. + *

Note that the entire assertion must be serializable. If you need to make assertions + * involving multiple inputs that are each not serializable, use TwoSideInputAssert. * - *

This is generally useful for assertion functions that - * are serializable but whose underlying data may not have a coder. + *

This is generally useful for assertion functions that are serializable but whose underlying + * data may not have a coder. */ - public static class OneSideInputAssert - extends PTransform implements Serializable { + public static class OneSideInputAssert extends PTransform + implements Serializable { private final transient PTransform> createActual; private final SerializableFunction checkerFn; - public OneSideInputAssert( + private OneSideInputAssert( PTransform> createActual, SerializableFunction checkerFn) { this.createActual = createActual; @@ -594,16 +642,18 @@ public PDone apply(PBegin input) { input .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply(ParDo.named("RunChecks").withSideInputs(actual) - .of(new CheckerDoFn<>(checkerFn, actual))); + .apply( + ParDo.named("RunChecks") + .withSideInputs(actual) + .of(new CheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); } } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of - * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a + * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * *

The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. @@ -617,8 +667,7 @@ private static class CheckerDoFn extends DoFn { private final PCollectionView actual; private CheckerDoFn( - SerializableFunction checkerFn, - PCollectionView actual) { + SerializableFunction checkerFn, PCollectionView actual) { this.checkerFn = checkerFn; this.actual = actual; } @@ -640,88 +689,11 @@ public void processElement(ProcessContext c) { } } - /** - * An assertion checker that takes a {@link PCollectionView PCollectionView<ActualT>}, - * a {@link PCollectionView PCollectionView<ExpectedT>}, a relation - * over {@code A} and {@code B}, and checks that the relation holds - * within a dataflow pipeline. - * - *

This is useful when either/both of {@code A} and {@code B} - * are not serializable, but have coders (provided - * by the underlying {@link PCollection}s). - */ - public static class TwoSideInputAssert - extends PTransform implements Serializable { - - private final transient PTransform> createActual; - private final transient PTransform> createExpected; - private final AssertRelation relation; - - protected TwoSideInputAssert( - PTransform> createActual, - PTransform> createExpected, - AssertRelation relation) { - this.createActual = createActual; - this.createExpected = createExpected; - this.relation = relation; - } - - @Override - public PDone apply(PBegin input) { - final PCollectionView actual = input.apply("CreateActual", createActual); - final PCollectionView expected = input.apply("CreateExpected", createExpected); - - input - .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply("RunChecks", ParDo.withSideInputs(actual, expected) - .of(new CheckerDoFn<>(relation, actual, expected))); - - return PDone.in(input.getPipeline()); - } - - /** - * Input is ignored, but is {@link Integer} for runners that do not support null values. - */ - private static class CheckerDoFn extends DoFn { - private final Aggregator success = - createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); - private final Aggregator failure = - createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); - private final AssertRelation relation; - private final PCollectionView actual; - private final PCollectionView expected; - - private CheckerDoFn(AssertRelation relation, - PCollectionView actual, PCollectionView expected) { - this.relation = relation; - this.actual = actual; - this.expected = expected; - } - - @Override - public void processElement(ProcessContext c) { - try { - ActualT actualContents = c.sideInput(actual); - ExpectedT expectedContents = c.sideInput(expected); - relation.assertFor(expectedContents).apply(actualContents); - success.addValue(1); - } catch (Throwable t) { - LOG.error("PAssert failed expectations.", t); - failure.addValue(1); - // TODO: allow for metrics to propagate on failure when running a streaming pipeline - if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { - throw t; - } - } - } - } - } - ///////////////////////////////////////////////////////////////////////////// /** - * A {@link SerializableFunction} that verifies that an actual value is equal to an - * expected value. + * A {@link SerializableFunction} that verifies that an actual value is equal to an expected + * value. */ private static class AssertIsEqualTo implements SerializableFunction { private T expected; @@ -738,8 +710,8 @@ public Void apply(T actual) { } /** - * A {@link SerializableFunction} that verifies that an actual value is not equal to an - * expected value. + * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected + * value. */ private static class AssertNotEqualTo implements SerializableFunction { private T expected; @@ -756,8 +728,8 @@ public Void apply(T actual) { } /** - * A {@link SerializableFunction} that verifies that an {@code Iterable} contains - * expected items in any order. + * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items + * in any order. */ private static class AssertContainsInAnyOrder implements SerializableFunction, Void> { @@ -787,10 +759,9 @@ public Void apply(Iterable actual) { //////////////////////////////////////////////////////////// /** - * A binary predicate between types {@code Actual} and {@code Expected}. - * Implemented as a method {@code assertFor(Expected)} which returns - * a {@code SerializableFunction} - * that should verify the assertion.. + * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method + * {@code assertFor(Expected)} which returns a {@code SerializableFunction} that + * should verify the assertion.. */ private static interface AssertRelation extends Serializable { public SerializableFunction assertFor(ExpectedT input); @@ -799,8 +770,7 @@ private static interface AssertRelation extends Serializable /** * An {@link AssertRelation} implementing the binary predicate that two objects are equal. */ - private static class AssertIsEqualToRelation - implements AssertRelation { + private static class AssertIsEqualToRelation implements AssertRelation { @Override public SerializableFunction assertFor(T expected) { return new AssertIsEqualTo(expected); @@ -810,8 +780,7 @@ public SerializableFunction assertFor(T expected) { /** * An {@link AssertRelation} implementing the binary predicate that two objects are not equal. */ - private static class AssertNotEqualToRelation - implements AssertRelation { + private static class AssertNotEqualToRelation implements AssertRelation { @Override public SerializableFunction assertFor(T expected) { return new AssertNotEqualTo(expected); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index f540948267ec..fdc871905677 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -17,9 +17,6 @@ */ package org.apache.beam.sdk.testing; -import static org.apache.beam.sdk.testing.SerializableMatchers.anything; -import static org.apache.beam.sdk.testing.SerializableMatchers.not; - import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -150,30 +147,6 @@ public Void apply(Iterable contents) { pipeline.run(); } - /** - * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}. - */ - @Test - @Category(RunnableOnService.class) - public void testBasicMatcherSuccess() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection pcollection = pipeline.apply(Create.of(42)); - PAssert.that(pcollection).containsInAnyOrder(anything()); - pipeline.run(); - } - - /** - * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}. - */ - @Test - @Category(RunnableOnService.class) - public void testBasicMatcherFailure() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection pcollection = pipeline.apply(Create.of(42)); - PAssert.that(pcollection).containsInAnyOrder(not(anything())); - runExpectingAssertionFailure(pipeline); - } - /** * Test that we throw an error at pipeline construction time when the user mistakenly uses * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}. From 1707d2eecf184a2c1a68105e4cbcca31621d26c7 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 8 Jun 2016 16:22:34 -0700 Subject: [PATCH 2/7] Fix TriggerExampleTest --- .../examples/cookbook/TriggerExampleTest.java | 61 +++++++++++++------ 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java index fe75d147c738..cddce7ff2aa2 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java @@ -34,6 +34,8 @@ import org.apache.beam.sdk.values.TimestampedValue; import com.google.api.services.bigquery.model.TableRow; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.joda.time.Duration; import org.joda.time.Instant; @@ -44,7 +46,9 @@ import org.junit.runners.JUnit4; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; /** * Unit Tests for {@link TriggerExample}. @@ -70,21 +74,27 @@ public class TriggerExampleTest { + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0" + ",,,,,0,,,,,0", new Instant(1))); - private static final TableRow OUT_ROW_1 = new TableRow() - .set("trigger_type", "default") - .set("freeway", "5").set("total_flow", 30) - .set("number_of_records", 1) - .set("isFirst", true).set("isLast", true) - .set("timing", "ON_TIME") - .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)"); - - private static final TableRow OUT_ROW_2 = new TableRow() - .set("trigger_type", "default") - .set("freeway", "110").set("total_flow", 90) - .set("number_of_records", 2) - .set("isFirst", true).set("isLast", true) - .set("timing", "ON_TIME") - .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"); + private static final TableRow OUT_ROW_1 = + new TableRow() + .set("trigger_type", "default") + .set("freeway", "5") + .set("total_flow", 30) + .set("number_of_records", 1) + .set("isFirst", true) + .set("isLast", true) + .set("timing", "ON_TIME") + .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)"); + + private static final TableRow OUT_ROW_2 = + new TableRow() + .set("trigger_type", "default") + .set("freeway", "110") + .set("total_flow", 90) + .set("number_of_records", 2) + .set("isFirst", true) + .set("isLast", true) + .set("timing", "ON_TIME") + .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"); @Test public void testExtractTotalFlow() throws Exception { @@ -112,15 +122,26 @@ public void testTotalFlow () { .apply(Window.>into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(new TotalFlow("default")); - PCollection results = totalFlow.apply(ParDo.of(new FormatResults())); + PCollection results = totalFlow.apply(ParDo.of(new FormatResults())); - - PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2); + PAssert.that(results) + .containsInAnyOrder(canonicalFormat(OUT_ROW_1), canonicalFormat(OUT_ROW_2)); pipeline.run(); } - static class FormatResults extends DoFn { + // Sort the fields and toString() the values, since TableRow has a bit of a dynamically + // typed API and equals()/hashCode() are not appropriate for matching in tests + static String canonicalFormat(TableRow row) { + List entries = Lists.newArrayListWithCapacity(row.size()); + for (Map.Entry entry : row.entrySet()) { + entries.add(entry.getKey() + ":" + entry.getValue()); + } + Collections.sort(entries); + return Joiner.on(",").join(entries); + } + + static class FormatResults extends DoFn { @Override public void processElement(ProcessContext c) throws Exception { TableRow element = c.element(); @@ -133,7 +154,7 @@ public void processElement(ProcessContext c) throws Exception { .set("isLast", element.get("isLast")) .set("timing", element.get("timing")) .set("window", element.get("window")); - c.output(row); + c.output(canonicalFormat(row)); } } } From 56b922fd740b51f169b843bb78cc830588511006 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 8 Jun 2016 18:11:07 -0700 Subject: [PATCH 3/7] Switch Spark streaming tests to custom assertions The current use of PAssert in the streaming tests for the Spark runner work via coincidence. PAssert does not truly support non-global windowing. The switch from side inputs to GBK, with no change in semantics but hopefully an easier on-ramp for new runners, incidentally broke these tests. Soon, PAssert will support windowing, triggers, and unbounded PCollections. Until then, this change writes a slightly custom assertion transform for these tests. --- .../streaming/FlattenStreamingTest.java | 7 +---- .../streaming/KafkaStreamingTest.java | 13 ++------ .../SimpleStreamingWordCountTest.java | 18 +++-------- .../streaming/utils/PAssertStreaming.java | 31 ++++++++++++++++++- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 15b2f3903067..976c7c268507 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -25,9 +25,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; @@ -77,13 +75,10 @@ public void testRun() throws Exception { PCollectionList list = PCollectionList.of(windowedW1).and(windowedW2); PCollection union = list.apply(Flatten.pCollections()); - PAssert.thatIterable(union.apply(View.asIterable())) - .containsInAnyOrder(EXPECTED_UNION); + PAssertStreaming.assertContents(union, EXPECTED_UNION); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - - PAssertStreaming.assertNoFailures(res); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index fd75e7435529..53293fbbca2d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -27,17 +27,14 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -52,7 +49,6 @@ import java.util.Collections; import java.util.Map; import java.util.Properties; -import java.util.Set; import kafka.serializer.StringDecoder; /** @@ -68,9 +64,7 @@ public class KafkaStreamingTest { private static final Map KAFKA_MESSAGES = ImmutableMap.of( "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" ); - private static final Set EXPECTED = ImmutableSet.of( - "k1,v1", "k2,v2", "k3,v3", "k4,v4" - ); + private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; private static final long TEST_TIMEOUT_MSEC = 1000L; @BeforeClass @@ -116,13 +110,10 @@ public void testRun() throws Exception { PCollection formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn())); - PAssert.thatIterable(formattedKV.apply(View.asIterable())) - .containsInAnyOrder(EXPECTED); + PAssertStreaming.assertContents(formattedKV, EXPECTED); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - - PAssertStreaming.assertNoFailures(res); } @AfterClass diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 28133ca9b158..6dc9a084069d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.spark.translation.streaming; + import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -26,33 +27,28 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; -import com.google.common.collect.ImmutableSet; - import org.joda.time.Duration; import org.junit.Test; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Set; /** * Simple word count streaming test. */ -public class SimpleStreamingWordCountTest { +public class SimpleStreamingWordCountTest implements Serializable { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; private static final List> WORDS_QUEUE = Collections.>singletonList(Arrays.asList(WORDS_ARRAY)); - private static final Set EXPECTED_COUNT_SET = - ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 2", "bob: 2"}; private static final long TEST_TIMEOUT_MSEC = 1000L; @Test @@ -71,12 +67,8 @@ public void testRun() throws Exception { PCollection output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - PAssert.thatIterable(output.apply(View.asIterable())) - .containsInAnyOrder(EXPECTED_COUNT_SET); - + PAssertStreaming.assertContents(output, EXPECTED_COUNTS); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - - PAssertStreaming.assertNoFailures(res); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 3d8fc32aa42a..54e4dae8dcca 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -17,15 +17,26 @@ */ package org.apache.beam.runners.spark.translation.streaming.utils; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.PCollection; import org.junit.Assert; +import java.io.Serializable; + /** * Since PAssert doesn't propagate assert exceptions, use Aggregators to assert streaming * success/failure counters. */ -public final class PAssertStreaming { +public final class PAssertStreaming implements Serializable { /** * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}. @@ -40,4 +51,22 @@ public static void assertNoFailures(EvaluationResult res) { int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class); Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); } + + public static void assertContents(PCollection actual, final T[] expected) { + // Because PAssert does not support non-global windowing, but all our data is in one window, + // we set up the assertion directly. + actual + .apply(WithKeys.of("dummy")) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply( + MapElements.via( + new SimpleFunction, Void>() { + @Override + public Void apply(Iterable input) { + assertThat(input, containsInAnyOrder(expected)); + return null; + } + })); + } } From 18fd6d7eb354633139a3c1bf6f1630c05f74ee3a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 9 Jun 2016 09:15:39 -0700 Subject: [PATCH 4/7] Spark runner: Assign windows when re-windowing into global window Previously, window assignment was elided when the window was the global window. But when the source windows are not the global window, this elision is not correct. Now window assignment is run except when both the source *and* the destination window are the global window (which remains a common case in globally windowed batch tests using PAssert). --- .../runners/spark/translation/TransformTranslator.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b462d35f5db7..ebceb6bae632 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -699,13 +699,16 @@ public void evaluate(Window.Bound transform, EvaluationContext context) { JavaRDDLike, ?> inRDD = (JavaRDDLike, ?>) context.getInputRDD(transform); WindowFn windowFn = WINDOW_FG.get("windowFn", transform); - if (windowFn instanceof GlobalWindows) { + // Avoid running assign windows if both source and destination are global window + if (context.getInput(transform).getWindowingStrategy().getWindowFn() + instanceof GlobalWindows + && windowFn instanceof GlobalWindows) { context.setOutputRDD(transform, inRDD); } else { @SuppressWarnings("unchecked") DoFn addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); DoFnFunction dofn = - new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); + new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); } } From cb5b8f21e812a246d17100a67fa4f489727d4a66 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 9 Jun 2016 13:38:06 -0700 Subject: [PATCH 5/7] fixup! Switch Spark streaming tests --- .../spark/translation/streaming/utils/PAssertStreaming.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 54e4dae8dcca..f85c4409df09 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -52,6 +52,11 @@ public static void assertNoFailures(EvaluationResult res) { Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); } + /** + * Adds a pipeline run-time assertion that the contents of {@code actual} are {@code expected}. + * Note that it is oblivious to windowing, so the assertion will apply indiscriminately to all + * windows. + */ public static void assertContents(PCollection actual, final T[] expected) { // Because PAssert does not support non-global windowing, but all our data is in one window, // we set up the assertion directly. From 9f7f86c47d06379842ac6a27b551ef92d85a5191 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 9 Jun 2016 13:42:36 -0700 Subject: [PATCH 6/7] fixup! Base PAssert on GBK instead of side inputs --- .../org/apache/beam/sdk/testing/PAssert.java | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index b708d36894eb..3362eeeeca27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -109,12 +109,34 @@ private PAssert() {} * Builder interface for assertions applicable to iterables and PCollection contents. */ public interface IterableAssert { + + /** + * Asserts that the iterable in question contains the provided elements. + * + * @return the same {@link IterableAssert} builder for further assertions + */ IterableAssert containsInAnyOrder(T... expectedElements); + /** + * Asserts that the iterable in question contains the provided elements. + * + * @return the same {@link IterableAssert} builder for further assertions + */ IterableAssert containsInAnyOrder(Iterable expectedElements); + /** + * Asserts that the iterable in question is empty. + * + * @return the same {@link IterableAssert} builder for further assertions + */ IterableAssert empty(); + /** + * Applies the provided checking function (presumably containing assertions) to the + * iterable in question. + * + * @return the same {@link IterableAssert} builder for further assertions + */ IterableAssert satisfies(SerializableFunction, Void> checkerFn); } @@ -122,10 +144,28 @@ public interface IterableAssert { * Builder interface for assertions applicable to a single value. */ public interface SingletonAssert { + /** + * Asserts that the value in question is equal to the provided value, according to + * {@link Object#equals}. + * + * @return the same {@link SingletonAssert} builder for further assertions + */ SingletonAssert isEqualTo(T expected); + /** + * Asserts that the value in question is not equal to the provided value, according + * to {@link Object#equals}. + * + * @return the same {@link SingletonAssert} builder for further assertions + */ SingletonAssert notEqualTo(T notExpected); + /** + * Applies the provided checking function (presumably containing assertions) to the + * value in question. + * + * @return the same {@link SingletonAssert} builder for further assertions + */ SingletonAssert satisfies(SerializableFunction checkerFn); } @@ -616,10 +656,9 @@ public void processElement(ProcessContext context) { /** * An assertion checker that takes a single {@link PCollectionView * PCollectionView<ActualT>} and an assertion over {@code ActualT}, and checks it within a - * dataflow pipeline. + * Beam pipeline. * - *

Note that the entire assertion must be serializable. If you need to make assertions - * involving multiple inputs that are each not serializable, use TwoSideInputAssert. + *

Note that the entire assertion must be serializable. * *

This is generally useful for assertion functions that are serializable but whose underlying * data may not have a coder. From 27ad3bd570c6668b8975a6fa2b36d3f2be80eaf8 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 9 Jun 2016 13:48:14 -0700 Subject: [PATCH 7/7] fixup! Base PAssert on GBK instead of side inputs --- .../org/apache/beam/sdk/testing/PAssert.java | 66 ++++++------------- 1 file changed, 21 insertions(+), 45 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 3362eeeeca27..9db1b3d22a51 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -183,12 +183,7 @@ public static IterableAssert that(PCollection actual) { public static IterableAssert thatSingletonIterable( PCollection> actual) { - List> maybeElementCoder = actual.getCoder().getCoderArguments(); - Coder tCoder; try { - @SuppressWarnings("unchecked") - Coder tCoderTmp = (Coder) Iterables.getOnlyElement(maybeElementCoder); - tCoder = tCoderTmp; } catch (NoSuchElementException | IllegalArgumentException exc) { throw new IllegalArgumentException( "PAssert.thatSingletonIterable requires a PCollection>" @@ -269,10 +264,23 @@ public final PCollectionContentsAssert containsInAnyOrder(T... expectedElemen * *

Returns this {@code IterableAssert}. */ + @Override public PCollectionContentsAssert containsInAnyOrder(Iterable expectedElements) { return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); } + @Override + public PCollectionContentsAssert empty() { + return containsInAnyOrder(Collections.emptyList()); + } + + @Override + public PCollectionContentsAssert satisfies( + SerializableFunction, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); + return this; + } + /** * Checks that the {@code Iterable} contains elements that match the provided matchers, in any * order. @@ -285,27 +293,6 @@ final PCollectionContentsAssert containsInAnyOrder( return satisfies(SerializableMatchers.containsInAnyOrder(elementMatchers)); } - /** - * Checks that the {@link Iterable} is empty. - * - *

Returns this {@link IterableAssert}. - */ - public PCollectionContentsAssert empty() { - return containsInAnyOrder(Collections.emptyList()); - } - - /** - * Applies a {@link SerializableFunction} to check the elements of the {@link PCollection}. - * - *

Returns this {@link IterableAssert}. - */ - @Override - public PCollectionContentsAssert satisfies( - SerializableFunction, Void> checkerFn) { - actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); - return this; - } - /** * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}. * @@ -382,11 +369,15 @@ private static class PCollectionSingletonIterableAssert implements IterableAs public PCollectionSingletonIterableAssert(PCollection> actual) { this.actual = actual; - this.elementCoder = (Coder) actual.getCoder().getCoderArguments().get(0); + + @SuppressWarnings("unchecked") + Coder typedCoder = (Coder) actual.getCoder().getCoderArguments().get(0); + this.elementCoder = typedCoder; } @Override - public PCollectionSingletonIterableAssert containsInAnyOrder(T... expectedElements) { + @SafeVarargs + public final PCollectionSingletonIterableAssert containsInAnyOrder(T... expectedElements) { return containsInAnyOrder(Arrays.asList(expectedElements)); } @@ -433,31 +424,16 @@ protected PCollectionViewAssert( this.coder = coder; } - /** - * Checks that the value of this {@code SingletonAssert}'s view is equal to the expected value. - * - *

Returns this {@code SingletonAssert}. - */ + @Override public PCollectionViewAssert isEqualTo(ViewT expectedValue) { return satisfies(new AssertIsEqualToRelation(), expectedValue); } - /** - * Checks that the value of this {@code SingletonAssert}'s view is not equal to the expected - * value. - * - *

Returns this {@code SingletonAssert}. - */ + @Override public PCollectionViewAssert notEqualTo(ViewT expectedValue) { return satisfies(new AssertNotEqualToRelation(), expectedValue); } - /** - * Applies a {@link SerializableFunction} to check the value of this {@code SingletonAssert}'s - * view. - * - *

Returns this {@code SingletonAssert}. - */ @Override public PCollectionViewAssert satisfies( SerializableFunction checkerFn) {