From 11094142329bae1c434120e41436da5ce63bc976 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Tue, 14 Mar 2017 18:23:19 -0700 Subject: [PATCH 1/3] Simplifies and improves exception unwrapping in TestFlinkRunner --- .../beam/runners/flink/TestFlinkRunner.java | 39 +++++++------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java index ef56b55dc9bd..8f50105a55b9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java @@ -56,36 +56,23 @@ public static TestFlinkRunner create(boolean streaming) { public PipelineResult run(Pipeline pipeline) { try { return delegate.run(pipeline); - } catch (Throwable e) { + } catch (Throwable t) { // Special case hack to pull out assertion errors from PAssert; instead there should // probably be a better story along the lines of UserCodeException. - Throwable cause = e; - Throwable oldCause = e; - PipelineExecutionException executionException = null; - do { - - // find UserCodeException and throw PipelineExecutionException - if (cause instanceof UserCodeException) { - executionException = new PipelineExecutionException(cause.getCause()); - } - - if (cause.getCause() == null) { - break; - } - - oldCause = cause; - cause = cause.getCause(); - - } while (!oldCause.equals(cause)); - if (cause instanceof AssertionError) { - throw (AssertionError) cause; - } else { - if (executionException != null) { - throw executionException; - } else { - throw e; + UserCodeException innermostUserCodeException = null; + Throwable current = t; + for (; current.getCause() != null; current = current.getCause()) { + if (current instanceof UserCodeException) { + innermostUserCodeException = ((UserCodeException) current); } } + if (innermostUserCodeException != null) { + current = innermostUserCodeException.getCause(); + } + if (current instanceof AssertionError) { + throw (AssertionError) current; + } + throw new PipelineExecutionException(current); } } From 56fa07c4fd1fd8281a66e9c27b98c9ce79f700f6 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Tue, 14 Mar 2017 14:31:55 -0700 Subject: [PATCH 2/3] Bump dataflow.container_version to 0314 --- runners/google-cloud-dataflow-java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index ff796815094e..255c32672e3b 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -33,7 +33,7 @@ jar - beam-master-20170307 + beam-master-20170314 6 From 36158f7889634e9c24cce7df5556e992705b6826 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 13 Mar 2017 15:19:00 -0700 Subject: [PATCH 3/3] [BEAM-1260] Another shot at capturing PAssert site This reverts commit 47592f66222e0b8a82d4c94d14cfba38044658f4. --- .../org/apache/beam/sdk/testing/PAssert.java | 209 ++++++++++-------- .../apache/beam/sdk/testing/PAssertTest.java | 44 ++++ 2 files changed, 160 insertions(+), 93 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 259633554b77..f6a409a3446b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -286,7 +286,7 @@ public static IterableAssert that(PCollection actual) { * with the specified reason. */ public static IterableAssert that(String reason, PCollection actual) { - return new PCollectionContentsAssert<>(reason, actual); + return new PCollectionContentsAssert<>(actual, PAssertionSite.capture(reason)); } /** @@ -316,7 +316,8 @@ public static IterableAssert thatSingletonIterable( @SuppressWarnings("unchecked") // Safe covariant cast PCollection> actualIterables = (PCollection>) actual; - return new PCollectionSingletonIterableAssert<>(reason, actualIterables); + return new PCollectionSingletonIterableAssert<>( + actualIterables, PAssertionSite.capture(reason)); } /** @@ -333,7 +334,9 @@ public static SingletonAssert thatSingleton(PCollection actual) { * a singleton. */ public static SingletonAssert thatSingleton(String reason, PCollection actual) { - return new PCollectionViewAssert<>(actual, View.asSingleton(), actual.getCoder(), reason); + return new PCollectionViewAssert<>( + actual, View.asSingleton(), actual.getCoder(), PAssertionSite.capture(reason) + ); } /** @@ -362,7 +365,7 @@ public static SingletonAssert>> thatMultimap( actual, View.asMultimap(), MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())), - reason); + PAssertionSite.capture(reason)); } /** @@ -388,12 +391,36 @@ public static SingletonAssert> thatMap( @SuppressWarnings("unchecked") KvCoder kvCoder = (KvCoder) actual.getCoder(); return new PCollectionViewAssert<>( - actual, View.asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()), - reason); + actual, + View.asMap(), + MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()), + PAssertionSite.capture(reason)); } //////////////////////////////////////////////////////////// + private static class PAssertionSite implements Serializable { + private final String message; + private final StackTraceElement[] creationStackTrace; + + static PAssertionSite capture(String message) { + return new PAssertionSite(message, new Throwable().getStackTrace()); + } + + PAssertionSite(String message, StackTraceElement[] creationStackTrace) { + this.message = message; + this.creationStackTrace = creationStackTrace; + } + + public AssertionError wrap(Throwable t) { + AssertionError res = + new AssertionError( + message.isEmpty() ? t.getMessage() : (message + ": " + t.getMessage()), t); + res.setStackTrace(creationStackTrace); + return res; + } + } + /** * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require * the runner to support side inputs. @@ -402,21 +429,21 @@ private static class PCollectionContentsAssert implements IterableAssert { private final PCollection actual; private final AssertionWindows rewindowingStrategy; private final SimpleFunction>, Iterable> paneExtractor; - private final String reason; + private final PAssertionSite site; - public PCollectionContentsAssert(String reason, PCollection actual) { - this(actual, IntoGlobalWindow.of(), PaneExtractors.allPanes(), reason); + public PCollectionContentsAssert(PCollection actual, PAssertionSite site) { + this(actual, IntoGlobalWindow.of(), PaneExtractors.allPanes(), site); } public PCollectionContentsAssert( PCollection actual, AssertionWindows rewindowingStrategy, SimpleFunction>, Iterable> paneExtractor, - String reason) { + PAssertionSite site) { this.actual = actual; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; - this.reason = reason; + this.site = site; } @Override @@ -451,7 +478,7 @@ private PCollectionContentsAssert withPane( Coder windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); return new PCollectionContentsAssert<>( - actual, IntoStaticWindows.of(windowCoder, window), paneExtractor, reason); + actual, IntoStaticWindows.of(windowCoder, window), paneExtractor, site); } /** @@ -472,7 +499,7 @@ public final PCollectionContentsAssert containsInAnyOrder(T... expectedElemen */ @Override public PCollectionContentsAssert containsInAnyOrder(Iterable expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation(reason), expectedElements); + return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); } @Override @@ -486,7 +513,7 @@ public PCollectionContentsAssert satisfies( SerializableFunction, Void> checkerFn) { actual.apply( nextAssertionName(), - new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site)); return this; } @@ -525,25 +552,23 @@ PCollectionContentsAssert satisfies( // more flexible bounds. @SuppressWarnings({"rawtypes", "unchecked"}) SerializableFunction, Void> checkerFn = - (SerializableFunction) new MatcherCheckerFn<>(reason, matcher); + (SerializableFunction) new MatcherCheckerFn<>(matcher); actual.apply( "PAssert$" + (assertCount++), - new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssert<>(checkerFn, rewindowingStrategy, paneExtractor, site)); return this; } private static class MatcherCheckerFn implements SerializableFunction { - private final String reason; - private final SerializableMatcher matcher; + private SerializableMatcher matcher; - public MatcherCheckerFn(String reason, SerializableMatcher matcher) { - this.reason = reason; + public MatcherCheckerFn(SerializableMatcher matcher) { this.matcher = matcher; } @Override public Void apply(T actual) { - assertThat(reason, actual, matcher); + assertThat(actual, matcher); return null; } } @@ -582,11 +607,12 @@ private static class PCollectionSingletonIterableAssert implements IterableAs private final AssertionWindows rewindowingStrategy; private final SimpleFunction>>, Iterable>> paneExtractor; - private final String reason; + private final PAssertionSite site; - public PCollectionSingletonIterableAssert(String reason, PCollection> actual) { - this(actual, IntoGlobalWindow.>of(), PaneExtractors.>onlyPane(), - reason); + public PCollectionSingletonIterableAssert( + PCollection> actual, PAssertionSite site) { + this( + actual, IntoGlobalWindow.>of(), PaneExtractors.>onlyPane(), site); } public PCollectionSingletonIterableAssert( @@ -594,7 +620,7 @@ public PCollectionSingletonIterableAssert( AssertionWindows rewindowingStrategy, SimpleFunction>>, Iterable>> paneExtractor, - String reason) { + PAssertionSite site) { this.actual = actual; @SuppressWarnings("unchecked") @@ -603,7 +629,7 @@ public PCollectionSingletonIterableAssert( this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; - this.reason = reason; + this.site = site; } @Override @@ -639,7 +665,7 @@ private PCollectionSingletonIterableAssert withPanes( Coder windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); return new PCollectionSingletonIterableAssert<>( - actual, IntoStaticWindows.>of(windowCoder, window), paneExtractor, reason); + actual, IntoStaticWindows.>of(windowCoder, window), paneExtractor, site); } @Override @@ -655,7 +681,7 @@ public PCollectionSingletonIterableAssert empty() { @Override public PCollectionSingletonIterableAssert containsInAnyOrder(Iterable expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation(reason), expectedElements); + return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); } @Override @@ -663,7 +689,7 @@ public PCollectionSingletonIterableAssert satisfies( SerializableFunction, Void> checkerFn) { actual.apply( "PAssert$" + (assertCount++), - new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor)); + new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor, site)); return this; } @@ -686,15 +712,16 @@ private static class PCollectionViewAssert implements SingletonAss private final SimpleFunction>, Iterable> paneExtractor; private final Coder coder; - private final String reason; + private final PAssertionSite site; protected PCollectionViewAssert( PCollection actual, PTransform, PCollectionView> view, Coder coder, - String reason) { - this(actual, view, IntoGlobalWindow.of(), PaneExtractors.onlyPane(), coder, - reason); + PAssertionSite site) { + this( + actual, view, IntoGlobalWindow.of(), PaneExtractors.onlyPane(), coder, site + ); } private PCollectionViewAssert( @@ -703,13 +730,13 @@ private PCollectionViewAssert( AssertionWindows rewindowActuals, SimpleFunction>, Iterable> paneExtractor, Coder coder, - String reason) { + PAssertionSite site) { this.actual = actual; this.view = view; this.rewindowActuals = rewindowActuals; this.paneExtractor = paneExtractor; this.coder = coder; - this.reason = reason; + this.site = site; } @Override @@ -737,17 +764,17 @@ private PCollectionViewAssert inPane( (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window), paneExtractor, coder, - reason); + site); } @Override public PCollectionViewAssert isEqualTo(ViewT expectedValue) { - return satisfies(new AssertIsEqualToRelation(reason), expectedValue); + return satisfies(new AssertIsEqualToRelation(), expectedValue); } @Override public PCollectionViewAssert notEqualTo(ViewT expectedValue) { - return satisfies(new AssertNotEqualToRelation(reason), expectedValue); + return satisfies(new AssertNotEqualToRelation(), expectedValue); } @Override @@ -760,7 +787,8 @@ public PCollectionViewAssert satisfies( new OneSideInputAssert( CreateActual.from(actual, rewindowActuals, paneExtractor, view), rewindowActuals.windowDummy(), - checkerFn)); + checkerFn, + site)); return this; } @@ -983,14 +1011,17 @@ public static class GroupThenAssert extends PTransform, PDone> private final SerializableFunction, Void> checkerFn; private final AssertionWindows rewindowingStrategy; private final SimpleFunction>, Iterable> paneExtractor; + private final PAssertionSite site; private GroupThenAssert( SerializableFunction, Void> checkerFn, AssertionWindows rewindowingStrategy, - SimpleFunction>, Iterable> paneExtractor) { + SimpleFunction>, Iterable> paneExtractor, + PAssertionSite site) { this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; + this.site = site; } @Override @@ -999,7 +1030,7 @@ public PDone expand(PCollection input) { .apply("GroupGlobally", new GroupGlobally(rewindowingStrategy)) .apply("GetPane", MapElements.via(paneExtractor)) .setCoder(IterableCoder.of(input.getCoder())) - .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn))); + .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn, site))); return PDone.in(input.getPipeline()); } @@ -1015,15 +1046,18 @@ public static class GroupThenAssertForSingleton private final AssertionWindows rewindowingStrategy; private final SimpleFunction>>, Iterable>> paneExtractor; + private final PAssertionSite site; private GroupThenAssertForSingleton( SerializableFunction, Void> checkerFn, AssertionWindows rewindowingStrategy, SimpleFunction>>, Iterable>> - paneExtractor) { + paneExtractor, + PAssertionSite site) { this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; + this.site = site; } @Override @@ -1032,7 +1066,7 @@ public PDone expand(PCollection> input) { .apply("GroupGlobally", new GroupGlobally>(rewindowingStrategy)) .apply("GetPane", MapElements.via(paneExtractor)) .setCoder(IterableCoder.of(input.getCoder())) - .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn))); + .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn, site))); return PDone.in(input.getPipeline()); } @@ -1053,14 +1087,17 @@ public static class OneSideInputAssert extends PTransform> createActual; private final transient PTransform, PCollection> windowToken; private final SerializableFunction checkerFn; + private final PAssertionSite site; private OneSideInputAssert( PTransform> createActual, PTransform, PCollection> windowToken, - SerializableFunction checkerFn) { + SerializableFunction checkerFn, + PAssertionSite site) { this.createActual = createActual; this.windowToken = windowToken; this.checkerFn = checkerFn; + this.site = site; } @Override @@ -1072,7 +1109,7 @@ public PDone expand(PBegin input) { .apply("WindowToken", windowToken) .apply( "RunChecks", - ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual))); + ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual, site))); return PDone.in(input.getPipeline()); } @@ -1092,17 +1129,21 @@ private static class SideInputCheckerDoFn extends DoFn { private final Aggregator failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); private final PCollectionView actual; + private final PAssertionSite site; private SideInputCheckerDoFn( - SerializableFunction checkerFn, PCollectionView actual) { + SerializableFunction checkerFn, + PCollectionView actual, + PAssertionSite site) { this.checkerFn = checkerFn; this.actual = actual; + this.site = site; } @ProcessElement public void processElement(ProcessContext c) { ActualT actualContents = c.sideInput(actual); - doChecks(actualContents, checkerFn, success, failure); + doChecks(site, actualContents, checkerFn, success, failure); } } @@ -1119,14 +1160,17 @@ private static class GroupedValuesCheckerDoFn extends DoFn failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); + private final PAssertionSite site; - private GroupedValuesCheckerDoFn(SerializableFunction checkerFn) { + private GroupedValuesCheckerDoFn( + SerializableFunction checkerFn, PAssertionSite site) { this.checkerFn = checkerFn; + this.site = site; } @ProcessElement public void processElement(ProcessContext c) { - doChecks(c.element(), checkerFn, success, failure); + doChecks(site, c.element(), checkerFn, success, failure); } } @@ -1144,19 +1188,23 @@ private static class SingletonCheckerDoFn extends DoFn failure = createAggregator(FAILURE_COUNTER, Sum.ofIntegers()); + private final PAssertionSite site; - private SingletonCheckerDoFn(SerializableFunction checkerFn) { + private SingletonCheckerDoFn( + SerializableFunction checkerFn, PAssertionSite site) { this.checkerFn = checkerFn; + this.site = site; } @ProcessElement public void processElement(ProcessContext c) { ActualT actualContents = Iterables.getOnlyElement(c.element()); - doChecks(actualContents, checkerFn, success, failure); + doChecks(site, actualContents, checkerFn, success, failure); } } private static void doChecks( + PAssertionSite site, ActualT actualContents, SerializableFunction checkerFn, Aggregator successAggregator, @@ -1165,9 +1213,8 @@ private static void doChecks( checkerFn.apply(actualContents); successAggregator.addValue(1); } catch (Throwable t) { - LOG.error("PAssert failed expectations.", t); failureAggregator.addValue(1); - throw t; + throw site.wrap(t); } } @@ -1178,17 +1225,15 @@ private static void doChecks( * value. */ private static class AssertIsEqualTo implements SerializableFunction { - private final String reason; - private final T expected; + private T expected; - public AssertIsEqualTo(String reason, T expected) { - this.reason = reason; + public AssertIsEqualTo(T expected) { this.expected = expected; } @Override public Void apply(T actual) { - assertThat(reason, actual, equalTo(expected)); + assertThat(actual, equalTo(expected)); return null; } } @@ -1198,17 +1243,15 @@ public Void apply(T actual) { * value. */ private static class AssertNotEqualTo implements SerializableFunction { - private String reason; private T expected; - public AssertNotEqualTo(String reason, T expected) { - this.reason = reason; + public AssertNotEqualTo(T expected) { this.expected = expected; } @Override public Void apply(T actual) { - assertThat(reason, actual, not(equalTo(expected))); + assertThat(actual, not(equalTo(expected))); return null; } } @@ -1219,27 +1262,25 @@ public Void apply(T actual) { */ private static class AssertContainsInAnyOrder implements SerializableFunction, Void> { - private final String reason; - private final T[] expected; + private T[] expected; @SafeVarargs - public AssertContainsInAnyOrder(String reason, T... expected) { - this.reason = reason; + public AssertContainsInAnyOrder(T... expected) { this.expected = expected; } @SuppressWarnings("unchecked") - public AssertContainsInAnyOrder(String reason, Collection expected) { - this(reason, (T[]) expected.toArray()); + public AssertContainsInAnyOrder(Collection expected) { + this((T[]) expected.toArray()); } - public AssertContainsInAnyOrder(String reason, Iterable expected) { - this(reason, Lists.newArrayList(expected)); + public AssertContainsInAnyOrder(Iterable expected) { + this(Lists.newArrayList(expected)); } @Override public Void apply(Iterable actual) { - assertThat(reason, actual, containsInAnyOrder(expected)); + assertThat(actual, containsInAnyOrder(expected)); return null; } } @@ -1259,15 +1300,9 @@ private interface AssertRelation extends Serializable { * An {@link AssertRelation} implementing the binary predicate that two objects are equal. */ private static class AssertIsEqualToRelation implements AssertRelation { - private final String reason; - - public AssertIsEqualToRelation(String reason) { - this.reason = reason; - } - @Override public SerializableFunction assertFor(T expected) { - return new AssertIsEqualTo(reason, expected); + return new AssertIsEqualTo(expected); } } @@ -1275,15 +1310,9 @@ public SerializableFunction assertFor(T expected) { * An {@link AssertRelation} implementing the binary predicate that two objects are not equal. */ private static class AssertNotEqualToRelation implements AssertRelation { - private final String reason; - - public AssertNotEqualToRelation(String reason) { - this.reason = reason; - } - @Override public SerializableFunction assertFor(T expected) { - return new AssertNotEqualTo(reason, expected); + return new AssertNotEqualTo(expected); } } @@ -1293,15 +1322,9 @@ public SerializableFunction assertFor(T expected) { */ private static class AssertContainsInAnyOrderRelation implements AssertRelation, Iterable> { - private final String reason; - - public AssertContainsInAnyOrderRelation(String reason) { - this.reason = reason; - } - @Override public SerializableFunction, Void> assertFor(Iterable expectedElements) { - return new AssertContainsInAnyOrder(reason, expectedElements); + return new AssertContainsInAnyOrder(expectedElements); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 1603db55b1d0..dab457a609b1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import java.io.IOException; import java.io.InputStream; @@ -450,6 +451,49 @@ public void testEmptyFalseDefaultReasonString() throws Exception { assertThat(message, containsString("Expected: iterable over [] in any order")); } + @Test + @Category(RunnableOnService.class) + public void testAssertionSiteIsCapturedWithMessage() throws Exception { + PCollection vals = pipeline.apply(CountingInput.upTo(5L)); + assertThatCollectionIsEmptyWithMessage(vals); + + Throwable thrown = runExpectingAssertionFailure(pipeline); + + assertThat( + thrown.getMessage(), + containsString("Should be empty")); + assertThat( + thrown.getMessage(), + containsString("Expected: iterable over [] in any order")); + String stacktrace = Throwables.getStackTraceAsString(thrown); + assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithMessage")); + assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithMessage")); + } + + @Test + @Category(RunnableOnService.class) + public void testAssertionSiteIsCapturedWithoutMessage() throws Exception { + PCollection vals = pipeline.apply(CountingInput.upTo(5L)); + assertThatCollectionIsEmptyWithoutMessage(vals); + + Throwable thrown = runExpectingAssertionFailure(pipeline); + + assertThat( + thrown.getMessage(), + containsString("Expected: iterable over [] in any order")); + String stacktrace = Throwables.getStackTraceAsString(thrown); + assertThat(stacktrace, containsString("testAssertionSiteIsCapturedWithoutMessage")); + assertThat(stacktrace, containsString("assertThatCollectionIsEmptyWithoutMessage")); + } + + private static void assertThatCollectionIsEmptyWithMessage(PCollection vals) { + PAssert.that("Should be empty", vals).empty(); + } + + private static void assertThatCollectionIsEmptyWithoutMessage(PCollection vals) { + PAssert.that(vals).empty(); + } + private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { // We cannot use thrown.expect(AssertionError.class) because the AssertionError // is first caught by JUnit and causes a test failure.