From a6d6ebb60e05e3d63f580aae9d69b99ec65144ab Mon Sep 17 00:00:00 2001 From: Stas Levin Date: Sun, 2 Apr 2017 14:09:43 +0300 Subject: [PATCH 1/2] [BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's exceptions. --- .../apache/beam/sdk/testing/TestPipeline.java | 64 ++++++++++++------- .../apache/beam/sdk/io/TFRecordIOTest.java | 7 +- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 6a8335e0117f..f0a550c05743 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -106,7 +106,7 @@ private static class PipelineRunEnforcement { protected final Pipeline pipeline; - private boolean runInvoked; + protected boolean runAttempted; private PipelineRunEnforcement(final Pipeline pipeline) { this.pipeline = pipeline; @@ -116,12 +116,14 @@ protected void enableAutoRunIfMissing(final boolean enable) { enableAutoRunIfMissing = enable; } - protected void afterPipelineExecution() { - runInvoked = true; + protected void beforePipelineExecution() { + runAttempted = true; } - protected void afterTestCompletion() { - if (!runInvoked && enableAutoRunIfMissing) { + protected void afterPipelineExecution() {} + + protected void afterUserCodeFinished() { + if (!runAttempted && enableAutoRunIfMissing) { pipeline.run().waitUntilFinish(); } } @@ -174,27 +176,38 @@ private boolean isEmptyPipeline(final Pipeline pipeline) { } private void verifyPipelineExecution() { - final List pipelineNodes = recordPipelineNodes(pipeline); - if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) { - final boolean hasDanglingPAssert = - FluentIterable.from(pipelineNodes) - .filter(Predicates.not(Predicates.in(runVisitedNodes))) - .anyMatch(isPAssertNode); - if (hasDanglingPAssert) { - throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); - } else { - throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); - } - } else if (runVisitedNodes == null && !enableAutoRunIfMissing) { - if (!isEmptyPipeline(pipeline)) { + if (!isEmptyPipeline(pipeline)) { + if (!runAttempted && !enableAutoRunIfMissing) { throw new PipelineRunMissingException( "The pipeline has not been run (runner: " + pipeline.getOptions().getRunner().getSimpleName() + ")"); + + } else { + final List pipelineNodes = recordPipelineNodes(pipeline); + if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) { + final boolean hasDanglingPAssert = + FluentIterable.from(pipelineNodes) + .filter(Predicates.not(Predicates.in(runVisitedNodes))) + .anyMatch(isPAssertNode); + if (hasDanglingPAssert) { + throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); + } else { + throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); + } + } } } } + private boolean visitedAll(final List pipelineNodes) { + return runVisitedNodes.equals(pipelineNodes); + } + + private boolean pipelineRunSucceeded() { + return runVisitedNodes != null; + } + @Override protected void afterPipelineExecution() { runVisitedNodes = recordPipelineNodes(pipeline); @@ -202,8 +215,8 @@ protected void afterPipelineExecution() { } @Override - protected void afterTestCompletion() { - super.afterTestCompletion(); + protected void afterUserCodeFinished() { + super.afterUserCodeFinished(); verifyPipelineExecution(); } } @@ -285,7 +298,7 @@ private void setDeducedEnforcementLevel() { public void evaluate() throws Throwable { setDeducedEnforcementLevel(); statement.evaluate(); - enforcement.get().afterTestCompletion(); + enforcement.get().afterUserCodeFinished(); } }; } @@ -301,8 +314,10 @@ public PipelineResult run() { "Is your TestPipeline declaration missing a @Rule annotation? Usage: " + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();"); + final PipelineResult pipelineResult; try { - return super.run(); + enforcement.get().beforePipelineExecution(); + pipelineResult = super.run(); } catch (RuntimeException exc) { Throwable cause = exc.getCause(); if (cause instanceof AssertionError) { @@ -310,9 +325,10 @@ public PipelineResult run() { } else { throw exc; } - } finally { - enforcement.get().afterPipelineExecution(); } + + enforcement.get().afterPipelineExecution(); + return pipelineResult; } /** diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index 70620fb9ec6a..94530ef0776f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -59,6 +59,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -96,14 +97,16 @@ >>> with open('/tmp/python_foo.tfrecord', 'rb') as f: private static Path tempFolder; - @Rule + public TestPipeline p = TestPipeline.create(); + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TestPipeline p2 = TestPipeline.create(); @Rule - public ExpectedException expectedException = ExpectedException.none(); + public RuleChain ruleChain = RuleChain.outerRule(expectedException).around(p); @BeforeClass public static void setupClass() throws IOException { From caa2239b00e35614a9859675ad757c9033e491d4 Mon Sep 17 00:00:00 2001 From: Stas Levin Date: Wed, 5 Apr 2017 09:22:13 +0300 Subject: [PATCH 2/2] [BEAM-1777] Added some comments. --- .../org/apache/beam/sdk/testing/TestPipeline.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index f0a550c05743..a4ab1963806a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -296,7 +296,17 @@ private void setDeducedEnforcementLevel() { @Override public void evaluate() throws Throwable { + setDeducedEnforcementLevel(); + + // statement.evaluate() essentially runs the user code contained in the unit test at hand. + // Exceptions thrown during the execution of the user's test code will propagate here, + // unless the user explicitly handles them with a "catch" clause in his code. If the + // exception is handled by a user's "catch" clause, is does not interrupt the flow and + // we move on to invoking the configured enforcements. + // If the user does not handle a thrown exception, it will propagate here and interrupt + // the flow, preventing the enforcement(s) from being activated. + // The motivation for this is avoiding enforcements over faulty pipelines. statement.evaluate(); enforcement.get().afterUserCodeFinished(); } @@ -327,6 +337,8 @@ public PipelineResult run() { } } + // If we reach this point, the pipeline has been run and no exceptions have been thrown during + // its execution. enforcement.get().afterPipelineExecution(); return pipelineResult; }