From 6ecbd0038f04115a6413dce57f47f42bd6a716e1 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 18 May 2016 13:37:13 -0700 Subject: [PATCH 1/3] Add CrashingRunner for use in TestPipeline CrashingRunner is a PipelineRunner that crashes on calls to run() with an IllegalArgumentException. As a runner is currently required to construct a Pipeline object, this allows removal of all Pipeline Runners from the core SDK while retaining tests that depend only on the graph construction behavior. --- .../beam/sdk/testing/CrashingRunner.java | 72 ++++++++++++++++++ .../apache/beam/sdk/testing/TestPipeline.java | 10 ++- .../beam/sdk/testing/CrashingRunnerTest.java | 76 +++++++++++++++++++ .../beam/sdk/testing/TestPipelineTest.java | 13 +++- 4 files changed, 168 insertions(+), 3 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java new file mode 100644 index 000000000000..6cf679357b8f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AggregatorRetrievalException; +import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; + +/** + * A {@link PipelineRunner} that applies no overrides and throws an exception on calls to + * {@link Pipeline#run()}. For use in {@link TestPipeline} to construct but not execute pipelines. + */ +class CrashingRunner extends PipelineRunner{ + + public static CrashingRunner fromOptions(PipelineOptions opts) { + return new CrashingRunner(); + } + + @Override + public PipelineResult run(Pipeline pipeline) { + throw new IllegalArgumentException(String.format("Cannot call #run(Pipeline) on an instance " + + "of %s. %s should only be used as the default to construct a Transform Hierarchy " + + "using %s, and cannot execute Pipelines. Instead, specify a %s " + + "by providing Pipeline Options in the environment variable '%s'.", + getClass().getSimpleName(), + getClass().getSimpleName(), + TestPipeline.class.getSimpleName(), + PipelineRunner.class.getSimpleName(), + TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS)); + } + + private static class TestPipelineResult implements PipelineResult { + private TestPipelineResult() { + // Should never be instantiated by the enclosing class + throw new AssertionError(String.format("Forbidden to instantiate %s", + getClass().getSimpleName())); + } + + @Override + public State getState() { + throw new AssertionError(String.format("Forbidden to instantiate %s", + getClass().getSimpleName())); + } + + @Override + public AggregatorValues getAggregatorValues(Aggregator aggregator) + throws AggregatorRetrievalException { + throw new AssertionError(String.format("Forbidden to instantiate %s", + getClass().getSimpleName())); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index a4921d56be0a..4618e33a7204 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -84,7 +84,8 @@ * containing the message from the {@link PAssert} that failed. */ public class TestPipeline extends Pipeline { - private static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner"; private static final ObjectMapper MAPPER = new ObjectMapper(); /** @@ -145,8 +146,13 @@ public static PipelineOptions testingPipelineOptions() { .as(TestPipelineOptions.class); options.as(ApplicationNameOptions.class).setAppName(getAppName()); - // If no options were specified, use a test credential object on all pipelines. + // If no options were specified, set some reasonable defaults if (Strings.isNullOrEmpty(beamTestPipelineOptions)) { + // If there are no provided options, check to see if a dummy runner should be used. + String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER); + if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) { + options.setRunner(CrashingRunner.class); + } options.as(GcpOptions.class).setGcpCredential(new TestCredential()); } options.setStableUniqueNames(CheckEnabled.ERROR); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java new file mode 100644 index 000000000000..041a73ae2d26 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Create; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CrashingRunner}. + */ +@RunWith(JUnit4.class) +public class CrashingRunnerTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void fromOptionsCreatesInstance() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(CrashingRunner.class); + PipelineRunner runner = PipelineRunner.fromOptions(opts); + + assertTrue("Should have created a CrashingRunner", runner instanceof CrashingRunner); + } + + @Test + public void applySucceeds() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(CrashingRunner.class); + + Pipeline p = Pipeline.create(opts); + p.apply(Create.of(1, 2, 3)); + } + + @Test + public void runThrows() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(CrashingRunner.class); + + Pipeline p = Pipeline.create(opts); + p.apply(Create.of(1, 2, 3)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot call #run"); + thrown.expectMessage(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS); + + p.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 8af4ff25bb06..5ba2bd004824 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.testing; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -36,6 +36,7 @@ import org.hamcrest.Description; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,6 +50,7 @@ @RunWith(JUnit4.class) public class TestPipelineTest { @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void testCreationUsingDefaults() { @@ -139,6 +141,15 @@ public void testMatcherSerializationDeserialization() { assertEquals(m2, newOpts.getOnSuccessMatcher()); } + @Test + public void testRunWithDummyEnvironmentVariableFails() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot call #run"); + System.getProperties() + .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true)); + TestPipeline.create().run(); + } + /** * TestMatcher is a matcher designed for testing matcher serialization/deserialization. */ From 13ba54726ef73c04f80837e5c3b58cd290022796 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 19 May 2016 13:41:49 -0700 Subject: [PATCH 2/3] fixup! Add CrashingRunner for use in TestPipeline --- .../org/apache/beam/sdk/testing/CrashingRunner.java | 8 ++++---- .../org/apache/beam/sdk/testing/TestPipelineTest.java | 10 +++++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java index 6cf679357b8f..28eafea66958 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java @@ -39,9 +39,9 @@ public static CrashingRunner fromOptions(PipelineOptions opts) { @Override public PipelineResult run(Pipeline pipeline) { throw new IllegalArgumentException(String.format("Cannot call #run(Pipeline) on an instance " - + "of %s. %s should only be used as the default to construct a Transform Hierarchy " + + "of %s. %s should only be used as the default to construct a Pipeline " + "using %s, and cannot execute Pipelines. Instead, specify a %s " - + "by providing Pipeline Options in the environment variable '%s'.", + + "by providing PipelineOptions in the environment variable '%s'.", getClass().getSimpleName(), getClass().getSimpleName(), TestPipeline.class.getSimpleName(), @@ -52,13 +52,13 @@ public PipelineResult run(Pipeline pipeline) { private static class TestPipelineResult implements PipelineResult { private TestPipelineResult() { // Should never be instantiated by the enclosing class - throw new AssertionError(String.format("Forbidden to instantiate %s", + throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s", getClass().getSimpleName())); } @Override public State getState() { - throw new AssertionError(String.format("Forbidden to instantiate %s", + throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s", getClass().getSimpleName())); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 5ba2bd004824..b741e2ed0e2c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.transforms.Create; import com.fasterxml.jackson.databind.ObjectMapper; @@ -143,11 +144,14 @@ public void testMatcherSerializationDeserialization() { @Test public void testRunWithDummyEnvironmentVariableFails() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Cannot call #run"); System.getProperties() .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true)); - TestPipeline.create().run(); + TestPipeline pipeline = TestPipeline.create(); + pipeline.apply(Create.of(1, 2, 3)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot call #run"); + pipeline.run(); } /** From 9ce86ca5dd31d921809408f71a60b2c8e0664036 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 19 May 2016 15:57:03 -0700 Subject: [PATCH 3/3] fixup! Add CrashingRunner for use in TestPipeline --- .../main/java/org/apache/beam/sdk/testing/CrashingRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java index 28eafea66958..975faccc3fa0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java @@ -30,7 +30,7 @@ * A {@link PipelineRunner} that applies no overrides and throws an exception on calls to * {@link Pipeline#run()}. For use in {@link TestPipeline} to construct but not execute pipelines. */ -class CrashingRunner extends PipelineRunner{ +public class CrashingRunner extends PipelineRunner{ public static CrashingRunner fromOptions(PipelineOptions opts) { return new CrashingRunner();