From a95eda80e4f314feef9949fee2aa13b5399afc57 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 30 Nov 2016 12:55:45 -0800 Subject: [PATCH] Category for tests using splittable DoFn --- runners/apex/pom.xml | 1 + runners/flink/runner/pom.xml | 1 + runners/google-cloud-dataflow-java/pom.xml | 1 + runners/spark/pom.xml | 1 + .../beam/sdk/testing/UsesSplittableParDo.java | 25 ++++++++++++++++++ .../sdk/transforms}/SplittableDoFnTest.java | 26 +++++++------------ 6 files changed, 39 insertions(+), 16 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java rename {runners/direct-java/src/test/java/org/apache/beam/runners/direct => sdks/java/core/src/test/java/org/apache/beam/sdk/transforms}/SplittableDoFnTest.java (95%) diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 84185b8ec592..983781d3803f 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -186,6 +186,7 @@ org.apache.beam.sdk.testing.RunnableOnService org.apache.beam.sdk.testing.UsesStatefulParDo + org.apache.beam.sdk.testing.UsesSplittableParDo none true diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 18bf6a72a7de..3e3dd7ec0ac5 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -54,6 +54,7 @@ org.apache.beam.sdk.testing.RunnableOnService org.apache.beam.sdk.testing.UsesStatefulParDo + org.apache.beam.sdk.testing.UsesSplittableParDo none true diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 59276e486022..85474995c1d2 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -78,6 +78,7 @@ runnable-on-service-tests org.apache.beam.sdk.testing.UsesStatefulParDo + org.apache.beam.sdk.testing.UsesSplittableParDo org.apache.beam.sdk.transforms.FlattenTest diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index da7a72a37a5b..dc000bfa8574 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -73,6 +73,7 @@ org.apache.beam.sdk.testing.RunnableOnService org.apache.beam.sdk.testing.UsesStatefulParDo + org.apache.beam.sdk.testing.UsesSplittableParDo 1 false true diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java new file mode 100644 index 000000000000..209936f3614f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDo.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.transforms.ParDo; + +/** + * Category tag for validation tests which utilize splittable {@link ParDo}. + */ +public interface UsesSplittableParDo {} diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java similarity index 95% rename from runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index f9e833f3fb5b..82bd3a3c14c9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.direct; +package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -32,14 +32,10 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.testing.UsesSplittableParDo; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -56,14 +52,12 @@ import org.joda.time.Instant; import org.joda.time.MutableDateTime; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests for splittable {@link DoFn} behavior. */ @RunWith(JUnit4.class) public class SplittableDoFnTest { @@ -155,9 +149,9 @@ public void process(ProcessContext c) { } @Test + @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testPairWithIndexBasic() { Pipeline p = TestPipeline.create(); - p.getOptions().setRunner(DirectRunner.class); PCollection> res = p.apply(Create.of("a", "bb", "ccccc")) .apply(ParDo.of(new PairStringWithIndexToLength())) @@ -179,11 +173,11 @@ public void testPairWithIndexBasic() { } @Test + @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testPairWithIndexWindowedTimestamped() { // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps // of elements in the input collection. Pipeline p = TestPipeline.create(); - p.getOptions().setRunner(DirectRunner.class); MutableDateTime mutableNow = Instant.now().toMutableDateTime(); mutableNow.setMillisOfSecond(0); @@ -267,9 +261,9 @@ public OffsetRangeTracker newTracker(OffsetRange range) { } @Test + @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testSideInputsAndOutputs() throws Exception { Pipeline p = TestPipeline.create(); - p.getOptions().setRunner(DirectRunner.class); PCollectionView sideInput = p.apply("side input", Create.of("foo")).apply(View.asSingleton()); @@ -294,9 +288,9 @@ public void testSideInputsAndOutputs() throws Exception { } @Test + @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testLateData() throws Exception { Pipeline p = TestPipeline.create(); - p.getOptions().setRunner(DirectRunner.class); Instant base = Instant.now(); @@ -389,9 +383,9 @@ public void tearDown() { } @Test + @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testLifecycleMethods() throws Exception { Pipeline p = TestPipeline.create(); - p.getOptions().setRunner(DirectRunner.class); PCollection res = p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));