From 85eeb10ff1f788e52028aafce42f80b2731dc8b8 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 10 Feb 2020 11:34:35 +0100 Subject: [PATCH 1/2] [BEAM-9273] Explicitly disable @RequiresTimeSortedInput on unsupported runners --- .../apex/translation/ParDoTranslator.java | 15 +- .../core/construction/DoFnFeatures.java | 82 ++++++ .../core/construction/DoFnFeaturesTest.java | 276 ++++++++++++++++++ .../ParDoMultiOutputTranslator.java | 9 + .../dataflow/BatchStatefulParDoOverrides.java | 4 +- .../dataflow/DataflowPipelineTranslator.java | 10 +- .../beam/runners/dataflow/DataflowRunner.java | 42 ++- .../org/apache/beam/runners/jet/Utils.java | 10 +- .../samza/runtime/SamzaDoFnRunners.java | 3 +- .../ParDoBoundMultiTranslator.java | 12 +- .../batch/ParDoTranslatorBatch.java | 17 +- 11 files changed, 429 insertions(+), 51 deletions(-) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index cf5d11bdf434..fbe858f3b438 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -31,13 +31,12 @@ import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; @@ -58,15 +57,19 @@ class ParDoTranslator @Override public void translate(ParDo.MultiOutput transform, TranslationContext context) { DoFn doFn = transform.getFn(); - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.processElement().isSplittable()) { + if (DoFnFeatures.isSplittable(doFn)) { throw new UnsupportedOperationException( String.format( "%s does not support splittable DoFn: %s", ApexRunner.class.getSimpleName(), doFn)); } - - if (signature.timerDeclarations().size() > 0) { + if (DoFnFeatures.requiresTimeSortedInput(doFn)) { + throw new UnsupportedOperationException( + String.format( + "%s doesn't currently support @RequiresTimeSortedInput", + ApexRunner.class.getSimpleName())); + } + if (DoFnFeatures.usesTimers(doFn)) { throw new UnsupportedOperationException( String.format( "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java new file mode 100644 index 000000000000..7782b33305a5 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Features a {@link DoFn} can posses. Each runner might implement a different (sub)set of this + * features. + */ +public class DoFnFeatures { + + public static boolean isSplittable(DoFn doFn) { + return DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(); + } + + public static boolean isStateful(DoFn doFn) { + return usesState(doFn) || usesTimers(doFn); + } + + public static boolean usesMapState(DoFn doFn) { + return usesGivenStateClass(doFn, MapState.class); + } + + public static boolean usesSetState(DoFn doFn) { + return usesGivenStateClass(doFn, SetState.class); + } + + public static boolean usesValueState(DoFn doFn) { + return usesGivenStateClass(doFn, ValueState.class) || requiresTimeSortedInput(doFn); + } + + public static boolean usesBagState(DoFn doFn) { + return usesGivenStateClass(doFn, BagState.class) || requiresTimeSortedInput(doFn); + } + + public static boolean usesWatermarkHold(DoFn doFn) { + return usesGivenStateClass(doFn, WatermarkHoldState.class) || requiresTimeSortedInput(doFn); + } + + public static boolean usesTimers(DoFn doFn) { + return DoFnSignatures.signatureForDoFn(doFn).usesTimers() || requiresTimeSortedInput(doFn); + } + + public static boolean usesState(DoFn doFn) { + return DoFnSignatures.signatureForDoFn(doFn).usesState() || requiresTimeSortedInput(doFn); + } + + public static boolean requiresTimeSortedInput(DoFn doFn) { + return DoFnSignatures.signatureForDoFn(doFn).processElement().requiresTimeSortedInput(); + } + + private static boolean usesGivenStateClass(DoFn doFn, Class stateClass) { + return DoFnSignatures.signatureForDoFn(doFn).stateDeclarations().values().stream() + .anyMatch(d -> d.stateType().isSubtypeOf(TypeDescriptor.of(stateClass))); + } + + private DoFnFeatures() {} +} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java new file mode 100644 index 000000000000..29ce654d0096 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.apache.beam.sdk.testing.SerializableMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.junit.Test; + +/** Test suite for {@link DoFnFeatures}. */ +public class DoFnFeaturesTest { + + private interface FeatureTest { + void test(); + } + + private static class StatelessDoFn extends DoFn implements FeatureTest { + @ProcessElement + public void process(@Element String input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(false)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); + assertThat(DoFnFeatures.usesState(this), equalTo(false)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + } + + private static class StatefulWithValueState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec> state = StateSpecs.value(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); + assertThat(DoFnFeatures.usesState(this), equalTo(true)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(true)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + } + + private static class StatefulWithTimers extends DoFn, String> + implements FeatureTest { + @TimerId("timer") + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(true)); + assertThat(DoFnFeatures.usesState(this), equalTo(false)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + + @OnTimer("timer") + public void onTimer() {} + } + + private static class StatefulWithTimersAndValueState extends DoFn, String> + implements FeatureTest { + @TimerId("timer") + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @StateId("state") + private final StateSpec> state = StateSpecs.set(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(true)); + assertThat(DoFnFeatures.usesState(this), equalTo(true)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(true)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + + @OnTimer("timer") + public void onTimer() {} + } + + private static class StatefulWithSetState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec> spec = StateSpecs.set(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); + assertThat(DoFnFeatures.usesState(this), equalTo(true)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(true)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + } + + private static class StatefulWithMapState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec> spec = StateSpecs.map(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); + assertThat(DoFnFeatures.usesState(this), equalTo(true)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(true)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + } + + private static class StatefulWithWatermarkHoldState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec spec = + StateSpecs.watermarkStateInternal(TimestampCombiner.LATEST); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); + assertThat(DoFnFeatures.usesState(this), equalTo(true)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(true)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + } + + private static class RequiresTimeSortedInput extends DoFn, String> + implements FeatureTest { + @ProcessElement + @RequiresTimeSortedInput + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); + assertThat(DoFnFeatures.isStateful(this), equalTo(true)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(true)); + assertThat(DoFnFeatures.usesState(this), equalTo(true)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(true)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(true)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(true)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(true)); + } + } + + private static class Splittable extends DoFn, String> implements FeatureTest { + @ProcessElement + public void process(ProcessContext c, RestrictionTracker tracker) {} + + @GetInitialRestriction + public OffsetRange getInitialRange(@Element KV element) { + return new OffsetRange(0L, element.getValue()); + } + + @Override + public void test() { + assertThat(DoFnFeatures.isSplittable(this), equalTo(true)); + assertThat(DoFnFeatures.isStateful(this), equalTo(false)); + assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); + assertThat(DoFnFeatures.usesState(this), equalTo(false)); + assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); + assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); + assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); + assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); + assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); + assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); + } + } + + private final List tests = + Lists.newArrayList( + new StatelessDoFn(), + new StatefulWithValueState(), + new StatefulWithTimers(), + new StatefulWithTimersAndValueState(), + new StatefulWithSetState(), + new StatefulWithMapState(), + new StatefulWithWatermarkHoldState(), + new RequiresTimeSortedInput(), + new Splittable()); + + @Test + public void testAllDoFnFeatures() { + tests.forEach(FeatureTest::test); + } +} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java index fb5202b44b74..53ffeec2b885 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java @@ -24,7 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.gearpump.GearpumpRunner; import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.coders.Coder; @@ -80,6 +82,13 @@ public void translate(ParDo.MultiOutput transform, TranslationC Map> sideInputMapping = ParDoTranslation.getSideInputMapping(context.getCurrentTransform()); + if (DoFnFeatures.requiresTimeSortedInput(transform.getFn())) { + throw new UnsupportedOperationException( + String.format( + "%s doesn't currently support @RequiresTimeSortedInput annotation", + GearpumpRunner.class.getSimpleName())); + } + JavaStream outputStream = TranslatorUtils.toList(unionStream) .flatMap( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 82cf753832d6..58c73f491dbf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -176,7 +176,7 @@ ParDo.SingleOutput, OutputT> getOriginalParDo() { public PCollection expand(PCollection> input) { DoFn, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); - DataflowRunner.verifyStateSupported(fn); + DataflowRunner.verifyDoFnSupported(fn, false); DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy()); if (isFnApi) { @@ -212,7 +212,7 @@ static class StatefulMultiOutputParDo public PCollectionTuple expand(PCollection> input) { DoFn, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); - DataflowRunner.verifyStateSupported(fn); + DataflowRunner.verifyDoFnSupported(fn, false); DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy()); if (isFnApi) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0d100ad5a89e..fd59284c0698 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -51,6 +51,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; @@ -1224,10 +1225,10 @@ private static void translateFn( Map, Coder> outputCoders, DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - if (signature.usesState() || signature.usesTimers()) { - DataflowRunner.verifyStateSupported(fn); + boolean isStateful = DoFnFeatures.isStateful(fn); + if (isStateful) { + DataflowRunner.verifyDoFnSupported(fn, context.getPipelineOptions().isStreaming()); DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy); } @@ -1255,8 +1256,7 @@ private static void translateFn( // Setting USES_KEYED_STATE will cause an ungrouped shuffle, which works // in streaming but does not work in batch - if (context.getPipelineOptions().isStreaming() - && (signature.usesState() || signature.usesTimers())) { + if (context.getPipelineOptions().isStreaming() && isStateful) { stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true"); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 367f6e089aa4..c28225e8304b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -60,6 +60,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -132,8 +133,6 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; @@ -151,7 +150,6 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; @@ -1937,26 +1935,26 @@ static String getContainerImageForJob(DataflowPipelineOptions options) { } } - static void verifyStateSupported(DoFn fn) { - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - - for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) { - - // https://issues.apache.org/jira/browse/BEAM-1474 - if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s", - DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName())); - } - + static void verifyDoFnSupported(DoFn fn, boolean streaming) { + if (DoFnFeatures.usesSetState(fn)) { // https://issues.apache.org/jira/browse/BEAM-1479 - if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s", - DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName())); - } + throw new UnsupportedOperationException( + String.format( + "%s does not currently support %s", + DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName())); + } + if (DoFnFeatures.usesMapState(fn)) { + // https://issues.apache.org/jira/browse/BEAM-1474 + throw new UnsupportedOperationException( + String.format( + "%s does not currently support %s", + DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName())); + } + if (streaming && DoFnFeatures.requiresTimeSortedInput(fn)) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support @RequiresTimeSortedInput in streaming mode.", + DataflowRunner.class.getSimpleName())); } } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java index 7489bea643e6..e3943abf8f12 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java @@ -35,6 +35,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.annotation.Nonnull; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.sdk.Pipeline; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -168,11 +168,17 @@ static boolean usesStateOrTimers(AppliedPTransform appliedTransform) { static DoFn getDoFn(AppliedPTransform appliedTransform) { try { DoFn doFn = ParDoTranslation.getDoFn(appliedTransform); - if (DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable()) { + if (DoFnFeatures.isSplittable(doFn)) { throw new IllegalStateException( "Not expected to directly translate splittable DoFn, should have been overridden: " + doFn); // todo } + if (DoFnFeatures.requiresTimeSortedInput(doFn)) { + throw new UnsupportedOperationException( + String.format( + "%s doesn't current support @RequiresTimeSortedInput annotation.", + JetRunner.class.getSimpleName())); + } return doFn; } catch (IOException e) { throw new RuntimeException(e); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index 6cbf2694ca0f..195d9ff41519 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; import org.apache.beam.runners.fnexecution.control.RemoteBundle; @@ -85,7 +86,7 @@ public static DoFnRunner create( final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); - if (signature.usesState()) { + if (DoFnFeatures.isStateful(doFn)) { keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory); stateInternals = keyedInternals.stateInternals(); timerInternals = keyedInternals.timerInternals(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index 805af2cea1de..fd50e1e3c563 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; @@ -96,13 +97,16 @@ private static void doTranslate( .collect( Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); - final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - final Coder keyCoder = - signature.usesState() ? ((KvCoder) input.getCoder()).getKeyCoder() : null; + boolean isStateful = DoFnFeatures.isStateful(transform.getFn()); + final Coder keyCoder = isStateful ? ((KvCoder) input.getCoder()).getKeyCoder() : null; - if (signature.processElement().isSplittable()) { + if (DoFnFeatures.isSplittable(transform.getFn())) { throw new UnsupportedOperationException("Splittable DoFn is not currently supported"); } + if (DoFnFeatures.requiresTimeSortedInput(transform.getFn())) { + throw new UnsupportedOperationException( + "@RequiresTimeSortedInput annotation is not currently supported"); + } final MessageStream> inputStream = ctx.getMessageStream(input); final List>> sideInputStreams = diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 3509126d0560..ef9709d4548f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsContainerStepMapAccumulator; @@ -38,8 +39,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -72,17 +71,17 @@ public void translateTransform( // TODO: add support of Splittable DoFn DoFn doFn = getDoFn(context); checkState( - !DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(), + !DoFnFeatures.isSplittable(doFn), "Not expected to directly translate splittable DoFn, should have been overridden: %s", doFn); // TODO: add support of states and timers - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - boolean stateful = - signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0 - || signature.timerFamilyDeclarations().size() > 0; - checkState(!stateful, "States and timers are not supported for the moment."); + checkState( + !DoFnFeatures.isStateful(doFn), "States and timers are not supported for the moment."); + + checkState( + !DoFnFeatures.requiresTimeSortedInput(doFn), + "@RequiresTimeSortedInput is not " + "supported for the moment"); DoFnSchemaInformation doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform()); From 9e45f9d8693366d3cf47e26bb031eccef5e2b598 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Wed, 12 Feb 2020 22:03:59 +0100 Subject: [PATCH 2/2] [BEAM-9273] code review - to be squashed --- .../apex/translation/ParDoTranslator.java | 8 +- .../core/construction/DoFnFeatures.java | 82 ------ .../core/construction/DoFnFeaturesTest.java | 276 ------------------ .../ParDoMultiOutputTranslator.java | 4 +- .../dataflow/BatchStatefulParDoOverrides.java | 4 +- .../dataflow/DataflowPipelineTranslator.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 16 +- .../org/apache/beam/runners/jet/Utils.java | 6 +- .../samza/runtime/SamzaDoFnRunners.java | 3 +- .../ParDoBoundMultiTranslator.java | 7 +- .../batch/ParDoTranslatorBatch.java | 8 +- .../transforms/reflect/DoFnSignatures.java | 50 ++++ .../reflect/DoFnSignaturesTest.java | 240 +++++++++++++++ 13 files changed, 322 insertions(+), 385 deletions(-) delete mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java delete mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index fbe858f3b438..9a35a72d3356 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -31,12 +31,12 @@ import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; @@ -58,18 +58,18 @@ class ParDoTranslator public void translate(ParDo.MultiOutput transform, TranslationContext context) { DoFn doFn = transform.getFn(); - if (DoFnFeatures.isSplittable(doFn)) { + if (DoFnSignatures.isSplittable(doFn)) { throw new UnsupportedOperationException( String.format( "%s does not support splittable DoFn: %s", ApexRunner.class.getSimpleName(), doFn)); } - if (DoFnFeatures.requiresTimeSortedInput(doFn)) { + if (DoFnSignatures.requiresTimeSortedInput(doFn)) { throw new UnsupportedOperationException( String.format( "%s doesn't currently support @RequiresTimeSortedInput", ApexRunner.class.getSimpleName())); } - if (DoFnFeatures.usesTimers(doFn)) { + if (DoFnSignatures.usesTimers(doFn)) { throw new UnsupportedOperationException( String.format( "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java deleted file mode 100644 index 7782b33305a5..000000000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DoFnFeatures.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.State; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * Features a {@link DoFn} can posses. Each runner might implement a different (sub)set of this - * features. - */ -public class DoFnFeatures { - - public static boolean isSplittable(DoFn doFn) { - return DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(); - } - - public static boolean isStateful(DoFn doFn) { - return usesState(doFn) || usesTimers(doFn); - } - - public static boolean usesMapState(DoFn doFn) { - return usesGivenStateClass(doFn, MapState.class); - } - - public static boolean usesSetState(DoFn doFn) { - return usesGivenStateClass(doFn, SetState.class); - } - - public static boolean usesValueState(DoFn doFn) { - return usesGivenStateClass(doFn, ValueState.class) || requiresTimeSortedInput(doFn); - } - - public static boolean usesBagState(DoFn doFn) { - return usesGivenStateClass(doFn, BagState.class) || requiresTimeSortedInput(doFn); - } - - public static boolean usesWatermarkHold(DoFn doFn) { - return usesGivenStateClass(doFn, WatermarkHoldState.class) || requiresTimeSortedInput(doFn); - } - - public static boolean usesTimers(DoFn doFn) { - return DoFnSignatures.signatureForDoFn(doFn).usesTimers() || requiresTimeSortedInput(doFn); - } - - public static boolean usesState(DoFn doFn) { - return DoFnSignatures.signatureForDoFn(doFn).usesState() || requiresTimeSortedInput(doFn); - } - - public static boolean requiresTimeSortedInput(DoFn doFn) { - return DoFnSignatures.signatureForDoFn(doFn).processElement().requiresTimeSortedInput(); - } - - private static boolean usesGivenStateClass(DoFn doFn, Class stateClass) { - return DoFnSignatures.signatureForDoFn(doFn).stateDeclarations().values().stream() - .anyMatch(d -> d.stateType().isSubtypeOf(TypeDescriptor.of(stateClass))); - } - - private DoFnFeatures() {} -} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java deleted file mode 100644 index 29ce654d0096..000000000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DoFnFeaturesTest.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import static org.apache.beam.sdk.testing.SerializableMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - -import java.util.List; -import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.state.TimerSpec; -import org.apache.beam.sdk.state.TimerSpecs; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.junit.Test; - -/** Test suite for {@link DoFnFeatures}. */ -public class DoFnFeaturesTest { - - private interface FeatureTest { - void test(); - } - - private static class StatelessDoFn extends DoFn implements FeatureTest { - @ProcessElement - public void process(@Element String input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(false)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); - assertThat(DoFnFeatures.usesState(this), equalTo(false)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - } - - private static class StatefulWithValueState extends DoFn, String> - implements FeatureTest { - @StateId("state") - private final StateSpec> state = StateSpecs.value(); - - @ProcessElement - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); - assertThat(DoFnFeatures.usesState(this), equalTo(true)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(true)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - } - - private static class StatefulWithTimers extends DoFn, String> - implements FeatureTest { - @TimerId("timer") - private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @ProcessElement - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(true)); - assertThat(DoFnFeatures.usesState(this), equalTo(false)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - - @OnTimer("timer") - public void onTimer() {} - } - - private static class StatefulWithTimersAndValueState extends DoFn, String> - implements FeatureTest { - @TimerId("timer") - private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @StateId("state") - private final StateSpec> state = StateSpecs.set(); - - @ProcessElement - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(true)); - assertThat(DoFnFeatures.usesState(this), equalTo(true)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(true)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - - @OnTimer("timer") - public void onTimer() {} - } - - private static class StatefulWithSetState extends DoFn, String> - implements FeatureTest { - @StateId("state") - private final StateSpec> spec = StateSpecs.set(); - - @ProcessElement - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); - assertThat(DoFnFeatures.usesState(this), equalTo(true)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(true)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - } - - private static class StatefulWithMapState extends DoFn, String> - implements FeatureTest { - @StateId("state") - private final StateSpec> spec = StateSpecs.map(); - - @ProcessElement - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); - assertThat(DoFnFeatures.usesState(this), equalTo(true)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(true)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - } - - private static class StatefulWithWatermarkHoldState extends DoFn, String> - implements FeatureTest { - @StateId("state") - private final StateSpec spec = - StateSpecs.watermarkStateInternal(TimestampCombiner.LATEST); - - @ProcessElement - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); - assertThat(DoFnFeatures.usesState(this), equalTo(true)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(true)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - } - - private static class RequiresTimeSortedInput extends DoFn, String> - implements FeatureTest { - @ProcessElement - @RequiresTimeSortedInput - public void process(@Element KV input) {} - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(false)); - assertThat(DoFnFeatures.isStateful(this), equalTo(true)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(true)); - assertThat(DoFnFeatures.usesState(this), equalTo(true)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(true)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(true)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(true)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(true)); - } - } - - private static class Splittable extends DoFn, String> implements FeatureTest { - @ProcessElement - public void process(ProcessContext c, RestrictionTracker tracker) {} - - @GetInitialRestriction - public OffsetRange getInitialRange(@Element KV element) { - return new OffsetRange(0L, element.getValue()); - } - - @Override - public void test() { - assertThat(DoFnFeatures.isSplittable(this), equalTo(true)); - assertThat(DoFnFeatures.isStateful(this), equalTo(false)); - assertThat(DoFnFeatures.usesTimers(this), equalTo(false)); - assertThat(DoFnFeatures.usesState(this), equalTo(false)); - assertThat(DoFnFeatures.usesBagState(this), equalTo(false)); - assertThat(DoFnFeatures.usesMapState(this), equalTo(false)); - assertThat(DoFnFeatures.usesSetState(this), equalTo(false)); - assertThat(DoFnFeatures.usesValueState(this), equalTo(false)); - assertThat(DoFnFeatures.usesWatermarkHold(this), equalTo(false)); - assertThat(DoFnFeatures.requiresTimeSortedInput(this), equalTo(false)); - } - } - - private final List tests = - Lists.newArrayList( - new StatelessDoFn(), - new StatefulWithValueState(), - new StatefulWithTimers(), - new StatefulWithTimersAndValueState(), - new StatefulWithSetState(), - new StatefulWithMapState(), - new StatefulWithWatermarkHoldState(), - new RequiresTimeSortedInput(), - new Splittable()); - - @Test - public void testAllDoFnFeatures() { - tests.forEach(FeatureTest::test); - } -} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java index 53ffeec2b885..5b538e552f7c 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.gearpump.GearpumpRunner; import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; @@ -33,6 +32,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -82,7 +82,7 @@ public void translate(ParDo.MultiOutput transform, TranslationC Map> sideInputMapping = ParDoTranslation.getSideInputMapping(context.getCurrentTransform()); - if (DoFnFeatures.requiresTimeSortedInput(transform.getFn())) { + if (DoFnSignatures.requiresTimeSortedInput(transform.getFn())) { throw new UnsupportedOperationException( String.format( "%s doesn't currently support @RequiresTimeSortedInput annotation", diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 58c73f491dbf..3cdd1348ac36 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -176,7 +176,7 @@ ParDo.SingleOutput, OutputT> getOriginalParDo() { public PCollection expand(PCollection> input) { DoFn, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); - DataflowRunner.verifyDoFnSupported(fn, false); + DataflowRunner.verifyDoFnSupportedBatch(fn); DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy()); if (isFnApi) { @@ -212,7 +212,7 @@ static class StatefulMultiOutputParDo public PCollectionTuple expand(PCollection> input) { DoFn, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); - DataflowRunner.verifyDoFnSupported(fn, false); + DataflowRunner.verifyDoFnSupportedBatch(fn); DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy()); if (isFnApi) { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index fd59284c0698..57b47c492468 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -51,7 +51,6 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; @@ -1226,7 +1225,7 @@ private static void translateFn( DoFnSchemaInformation doFnSchemaInformation, Map> sideInputMapping) { - boolean isStateful = DoFnFeatures.isStateful(fn); + boolean isStateful = DoFnSignatures.isStateful(fn); if (isStateful) { DataflowRunner.verifyDoFnSupported(fn, context.getPipelineOptions().isStreaming()); DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c28225e8304b..29fdce42de1b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -60,7 +60,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -133,6 +132,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; @@ -1935,22 +1935,30 @@ static String getContainerImageForJob(DataflowPipelineOptions options) { } } + static void verifyDoFnSupportedBatch(DoFn fn) { + verifyDoFnSupported(fn, false); + } + + static void verifyDoFnSupportedStreaming(DoFn fn) { + verifyDoFnSupported(fn, true); + } + static void verifyDoFnSupported(DoFn fn, boolean streaming) { - if (DoFnFeatures.usesSetState(fn)) { + if (DoFnSignatures.usesSetState(fn)) { // https://issues.apache.org/jira/browse/BEAM-1479 throw new UnsupportedOperationException( String.format( "%s does not currently support %s", DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName())); } - if (DoFnFeatures.usesMapState(fn)) { + if (DoFnSignatures.usesMapState(fn)) { // https://issues.apache.org/jira/browse/BEAM-1474 throw new UnsupportedOperationException( String.format( "%s does not currently support %s", DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName())); } - if (streaming && DoFnFeatures.requiresTimeSortedInput(fn)) { + if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) { throw new UnsupportedOperationException( String.format( "%s does not currently support @RequiresTimeSortedInput in streaming mode.", diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java index e3943abf8f12..c4707fcfbe37 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/Utils.java @@ -35,7 +35,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.annotation.Nonnull; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.sdk.Pipeline; @@ -47,6 +46,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -168,12 +168,12 @@ static boolean usesStateOrTimers(AppliedPTransform appliedTransform) { static DoFn getDoFn(AppliedPTransform appliedTransform) { try { DoFn doFn = ParDoTranslation.getDoFn(appliedTransform); - if (DoFnFeatures.isSplittable(doFn)) { + if (DoFnSignatures.isSplittable(doFn)) { throw new IllegalStateException( "Not expected to directly translate splittable DoFn, should have been overridden: " + doFn); // todo } - if (DoFnFeatures.requiresTimeSortedInput(doFn)) { + if (DoFnSignatures.requiresTimeSortedInput(doFn)) { throw new UnsupportedOperationException( String.format( "%s doesn't current support @RequiresTimeSortedInput annotation.", diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index 195d9ff41519..0d8aa081bb0f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -27,7 +27,6 @@ import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; import org.apache.beam.runners.fnexecution.control.RemoteBundle; @@ -86,7 +85,7 @@ public static DoFnRunner create( final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); - if (DoFnFeatures.isStateful(doFn)) { + if (DoFnSignatures.isStateful(doFn)) { keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory); stateInternals = keyedInternals.stateInternals(); timerInternals = keyedInternals.timerInternals(); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java index fd50e1e3c563..a289cf76bd1b 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode; import org.apache.beam.runners.core.construction.graph.QueryablePipeline; @@ -97,13 +96,13 @@ private static void doTranslate( .collect( Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); - boolean isStateful = DoFnFeatures.isStateful(transform.getFn()); + boolean isStateful = DoFnSignatures.isStateful(transform.getFn()); final Coder keyCoder = isStateful ? ((KvCoder) input.getCoder()).getKeyCoder() : null; - if (DoFnFeatures.isSplittable(transform.getFn())) { + if (DoFnSignatures.isSplittable(transform.getFn())) { throw new UnsupportedOperationException("Splittable DoFn is not currently supported"); } - if (DoFnFeatures.requiresTimeSortedInput(transform.getFn())) { + if (DoFnSignatures.requiresTimeSortedInput(transform.getFn())) { throw new UnsupportedOperationException( "@RequiresTimeSortedInput annotation is not currently supported"); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index ef9709d4548f..e165f9403141 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.construction.DoFnFeatures; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsContainerStepMapAccumulator; @@ -39,6 +38,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -71,16 +71,16 @@ public void translateTransform( // TODO: add support of Splittable DoFn DoFn doFn = getDoFn(context); checkState( - !DoFnFeatures.isSplittable(doFn), + !DoFnSignatures.isSplittable(doFn), "Not expected to directly translate splittable DoFn, should have been overridden: %s", doFn); // TODO: add support of states and timers checkState( - !DoFnFeatures.isStateful(doFn), "States and timers are not supported for the moment."); + !DoFnSignatures.isStateful(doFn), "States and timers are not supported for the moment."); checkState( - !DoFnFeatures.requiresTimeSortedInput(doFn), + !DoFnSignatures.requiresTimeSortedInput(doFn), "@RequiresTimeSortedInput is not " + "supported for the moment"); DoFnSchemaInformation doFnSchemaInformation = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 48f5adfed528..eb78ace1d59d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -41,12 +41,17 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerMap; import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -2026,4 +2031,49 @@ public static TimerSpec getTimerFamilySpecOrThrow( timerFamilyDeclaration.field().getName())); } } + + public static boolean isSplittable(DoFn doFn) { + return signatureForDoFn(doFn).processElement().isSplittable(); + } + + public static boolean isStateful(DoFn doFn) { + return usesState(doFn) || usesTimers(doFn); + } + + public static boolean usesMapState(DoFn doFn) { + return usesGivenStateClass(doFn, MapState.class); + } + + public static boolean usesSetState(DoFn doFn) { + return usesGivenStateClass(doFn, SetState.class); + } + + public static boolean usesValueState(DoFn doFn) { + return usesGivenStateClass(doFn, ValueState.class) || requiresTimeSortedInput(doFn); + } + + public static boolean usesBagState(DoFn doFn) { + return usesGivenStateClass(doFn, BagState.class) || requiresTimeSortedInput(doFn); + } + + public static boolean usesWatermarkHold(DoFn doFn) { + return usesGivenStateClass(doFn, WatermarkHoldState.class) || requiresTimeSortedInput(doFn); + } + + public static boolean usesTimers(DoFn doFn) { + return signatureForDoFn(doFn).usesTimers() || requiresTimeSortedInput(doFn); + } + + public static boolean usesState(DoFn doFn) { + return signatureForDoFn(doFn).usesState() || requiresTimeSortedInput(doFn); + } + + public static boolean requiresTimeSortedInput(DoFn doFn) { + return signatureForDoFn(doFn).processElement().requiresTimeSortedInput(); + } + + private static boolean usesGivenStateClass(DoFn doFn, Class stateClass) { + return signatureForDoFn(doFn).stateDeclarations().values().stream() + .anyMatch(d -> d.stateType().isSubtypeOf(TypeDescriptor.of(stateClass))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index fedccf417502..8bdd2a885238 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -35,10 +35,13 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.GroupingState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.TimeDomain; @@ -47,6 +50,7 @@ import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; +import org.apache.beam.sdk.testing.SerializableMatchers; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; @@ -64,12 +68,15 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -1192,6 +1199,239 @@ public void bar( assertThat(params.get(4), instanceOf(TaggedOutputReceiverParameter.class)); } + private interface FeatureTest { + void test(); + } + + private static class StatelessDoFn extends DoFn implements FeatureTest { + @ProcessElement + public void process(@Element String input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + } + + private static class StatefulWithValueState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec> state = StateSpecs.value(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + } + + private static class StatefulWithTimers extends DoFn, String> + implements FeatureTest { + @TimerId("timer") + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + + @OnTimer("timer") + public void onTimer() {} + } + + private static class StatefulWithTimersAndValueState extends DoFn, String> + implements FeatureTest { + @TimerId("timer") + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @StateId("state") + private final StateSpec> state = StateSpecs.set(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + + @OnTimer("timer") + public void onTimer() {} + } + + private static class StatefulWithSetState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec> spec = StateSpecs.set(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + } + + private static class StatefulWithMapState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec> spec = StateSpecs.map(); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + } + + private static class StatefulWithWatermarkHoldState extends DoFn, String> + implements FeatureTest { + @StateId("state") + private final StateSpec spec = + StateSpecs.watermarkStateInternal(TimestampCombiner.LATEST); + + @ProcessElement + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + } + + private static class RequiresTimeSortedInput extends DoFn, String> + implements FeatureTest { + @ProcessElement + @RequiresTimeSortedInput + public void process(@Element KV input) {} + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(true)); + } + } + + private static class Splittable extends DoFn, String> implements FeatureTest { + @ProcessElement + public void process(ProcessContext c, RestrictionTracker tracker) {} + + @GetInitialRestriction + public OffsetRange getInitialRange(@Element KV element) { + return new OffsetRange(0L, element.getValue()); + } + + @Override + public void test() { + assertThat(DoFnSignatures.isSplittable(this), SerializableMatchers.equalTo(true)); + assertThat(DoFnSignatures.isStateful(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesTimers(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesBagState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesMapState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesSetState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesValueState(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.usesWatermarkHold(this), SerializableMatchers.equalTo(false)); + assertThat(DoFnSignatures.requiresTimeSortedInput(this), SerializableMatchers.equalTo(false)); + } + } + + private final List tests = + Lists.newArrayList( + new StatelessDoFn(), + new StatefulWithValueState(), + new StatefulWithTimers(), + new StatefulWithTimersAndValueState(), + new StatefulWithSetState(), + new StatefulWithMapState(), + new StatefulWithWatermarkHoldState(), + new RequiresTimeSortedInput(), + new Splittable()); + + @Test + public void testAllDoFnFeatures() { + tests.forEach(FeatureTest::test); + } + private Matcher mentionsTimers() { return anyOf(containsString("timer"), containsString("Timer")); }