diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 10fa94699572..6958b2891e81 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -72,7 +72,10 @@ test - org.apache.beam.sdk.testing.RunnableOnService + + org.apache.beam.sdk.testing.RunnableOnService, + org.apache.beam.runners.spark.UsesCheckpointRecovery + org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, @@ -349,6 +352,9 @@ org.apache.maven.plugins maven-surefire-plugin + + org.apache.beam.runners.spark.UsesCheckpointRecovery + 1 false diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java new file mode 100644 index 000000000000..da63d3e3e8f7 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/UsesCheckpointRecovery.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +/** + * Category tag for tests that validate Spark checkpoint recovery. + */ +public interface UsesCheckpointRecovery {} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index ce502d6bf799..5c1963dc89c3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineResult; import org.apache.beam.runners.spark.TestSparkPipelineOptions; +import org.apache.beam.runners.spark.UsesCheckpointRecovery; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; @@ -82,6 +83,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; /** @@ -140,6 +142,7 @@ public void close() { } } @Test + @Category(UsesCheckpointRecovery.class) public void testWithResume() throws Exception { // write to Kafka produce(ImmutableMap.of(