From e18a8cc8b8e75104b7440d7b8635c56253bf0f1e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 17 Nov 2016 10:14:07 -0800 Subject: [PATCH 1/5] Separate ParDoTest cases and make them more flexible A number of excessively rigid name tests preclude runner expansion of ParDo. This change makes them into independent unit tests for better signal and makes them more accurate to the intent - the name should have the relevant information, but may have other content. --- .../apache/beam/sdk/transforms/ParDoTest.java | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 26f5570054b2d..3c3e266df1557 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -27,6 +27,7 @@ import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; @@ -826,39 +827,37 @@ public void testParDoWithErrorInFinishBatch() { } @Test - public void testParDoGetName() { + public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() { Pipeline p = TestPipeline.create(); + PCollection output = p.apply(Create.of(1)).apply(ParDo.of(new TestOldDoFn())); + assertThat(output.getName(), containsString("ParDo(Test)")); + } - PCollection input = - p.apply(Create.of(Arrays.asList(3, -42, 666))) - .setName("MyInput"); - - { - PCollection output1 = input.apply(ParDo.of(new TestOldDoFn())); - assertEquals("ParDo(Test).out", output1.getName()); - } - - { - PCollection output2 = input.apply("MyParDo", ParDo.of(new TestOldDoFn())); - assertEquals("MyParDo.out", output2.getName()); - } - - { - PCollection output4 = input.apply("TestOldDoFn", ParDo.of(new TestOldDoFn())); - assertEquals("TestOldDoFn.out", output4.getName()); - } + @Test + public void testParDoOutputNameBasedOnLabel() { + Pipeline p = TestPipeline.create(); + PCollection output = + p.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestOldDoFn())); + assertThat(output.getName(), containsString("MyParDo")); + } - { - PCollection output5 = input.apply(ParDo.of(new StrangelyNamedDoer())); - assertEquals("ParDo(StrangelyNamedDoer).out", - output5.getName()); - } + @Test + public void testParDoOutputNameBasedDoFnWithoutMatchingSuffix() { + Pipeline p = TestPipeline.create(); + PCollection output = p.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer())); + assertThat(output.getName(), containsString("ParDo(StrangelyNamedDoer)")); + } - assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName()); + @Test + public void testParDoTransformNameBasedDoFnWithTrimmedSuffix() { + assertThat(ParDo.of(new PrintingDoFn()).getName(), containsString("ParDo(Printing)")); + } - assertEquals( - "ParMultiDo(SideOutputDummy)", - ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName()); + @Test + public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() { + assertThat( + ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName(), + containsString("ParMultiDo(SideOutputDummy)")); } @Test From e400d49ee539a02d91d1261332f3eb0220eab1ba Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 17 Nov 2016 11:07:32 -0800 Subject: [PATCH 2/5] Do not override type descriptor in WithTimestamps --- .../java/org/apache/beam/sdk/transforms/WithTimestamps.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index 00ac8e4a392db..64e7c450dfa30 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -99,9 +99,8 @@ public Duration getAllowedTimestampSkew() { @Override public PCollection apply(PCollection input) { - return input - .apply("AddTimestamps", ParDo.of(new AddTimestampsDoFn(fn, allowedTimestampSkew))) - .setTypeDescriptorInternal(input.getTypeDescriptor()); + return input.apply( + "AddTimestamps", ParDo.of(new AddTimestampsDoFn(fn, allowedTimestampSkew))); } private static class AddTimestampsDoFn extends DoFn { From 75f0e2a0c9ad2745b7b47b447609ccf2b5108cb0 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 17 Nov 2016 15:50:17 -0800 Subject: [PATCH 3/5] Delegate getAggregators() in various DoFn adapters --- .../java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++++- .../org/apache/beam/sdk/transforms/DoFnAdapters.java | 6 ++++++ .../java/org/apache/beam/sdk/transforms/OldDoFn.java | 7 ++++++- .../org/apache/beam/sdk/transforms/OldDoFnTest.java | 11 +++++++---- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 9978ef4c6288b..221d942ec4a76 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -28,6 +28,8 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -236,7 +238,6 @@ private void setupDelegateAggregator( aggregator.setDelegate(delegate); } - } /** @@ -298,6 +299,10 @@ public Duration getAllowedTimestampSkew() { protected Map> aggregators = new HashMap<>(); + Collection> getAggregators() { + return Collections.>unmodifiableCollection(aggregators.values()); + } + /** * Protects aggregators from being created after initialization. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java index a3466bbe4ef78..1a74ae740db3a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import java.io.IOException; +import java.util.Collection; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.Context; @@ -243,6 +244,11 @@ protected TypeDescriptor getOutputTypeDescriptor() { return fn.getOutputTypeDescriptor(); } + @Override + Collection> getAggregators() { + return fn.getAggregators(); + } + @Override public Duration getAllowedTimestampSkew() { return fn.getAllowedTimestampSkew(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index f16e0b37bed58..9bf9003a6af5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -675,6 +675,11 @@ protected TypeDescriptor getInputTypeDescriptor() { return OldDoFn.this.getInputTypeDescriptor(); } + @Override + Collection> getAggregators() { + return OldDoFn.this.getAggregators(); + } + @Override protected TypeDescriptor getOutputTypeDescriptor() { return OldDoFn.this.getOutputTypeDescriptor(); @@ -683,7 +688,7 @@ protected TypeDescriptor getOutputTypeDescriptor() { /** * A {@link ProcessContext} for an {@link OldDoFn} that implements - * {@link OldDoFn.RequiresWindowAcccess}, via a context for a proper {@link DoFn}. + * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}. */ private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java index e7ae135c5683d..07e3078e3f1ce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.AggregatorValues; @@ -37,6 +36,7 @@ import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -216,14 +216,17 @@ public void testAggregators() throws Exception { Pipeline pipeline = TestPipeline.create(); CountOddsFn countOdds = new CountOddsFn(); - pipeline + PCollection output = pipeline .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100)) .apply(ParDo.of(countOdds)); PipelineResult result = pipeline.run(); AggregatorValues values = result.getAggregatorValues(countOdds.aggregator); - assertThat(values.getValuesAtSteps(), - equalTo((Map) ImmutableMap.of("ParDo(CountOdds)", 4))); + + Map valuesMap = values.getValuesAtSteps(); + + assertThat(valuesMap.size(), equalTo(1)); + assertThat(valuesMap.get(output.getProducingTransformInternal().getFullName()), equalTo(4)); } private static class CountOddsFn extends OldDoFn { From 805c5ca4e51d3e9a1157c1327703a57bf6cbed77 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 17 Nov 2016 11:08:48 -0800 Subject: [PATCH 4/5] Use getNewFn for coder inferences in ParDo --- .../src/main/java/org/apache/beam/sdk/transforms/ParDo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index ac1bccbbc828d..215ae6a675c83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -1094,7 +1094,7 @@ public Coder getDefaultOutputCoder( Coder inputCoder = ((PCollection) input).getCoder(); return input.getPipeline().getCoderRegistry().getDefaultCoder( output.getTypeDescriptor(), - getOldFn().getInputTypeDescriptor(), + getNewFn().getInputTypeDescriptor(), inputCoder); } From 470a1c3b368d7897b95d33aeb47b67eba9f3126f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 16 Nov 2016 15:43:47 -0800 Subject: [PATCH 5/5] DirectRunner: Expand ParDo.Bound into ParDo.BoundMulti --- .../beam/runners/direct/DirectRunner.java | 3 +- .../runners/direct/ParDoEvaluatorFactory.java | 56 +-- .../direct/ParDoMultiEvaluatorHooks.java | 55 --- ...ry.java => ParDoMultiOverrideFactory.java} | 22 +- .../direct/ParDoSingleEvaluatorHooks.java | 58 --- .../ParDoSingleViaMultiOverrideFactory.java | 66 +++ .../direct/TransformEvaluatorRegistry.java | 7 +- .../direct/ParDoMultiEvaluatorHooksTest.java | 439 ------------------ .../direct/ParDoSingleEvaluatorHooksTest.java | 335 ------------- 9 files changed, 98 insertions(+), 943 deletions(-) delete mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java rename runners/direct-java/src/main/java/org/apache/beam/runners/direct/{ParDoOverrideFactory.java => ParDoMultiOverrideFactory.java} (73%) delete mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java delete mode 100644 runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java delete mode 100644 runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index cce73c3563960..0060e847f8257 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -87,7 +87,8 @@ public class DirectRunner .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory()) .put(TestStream.class, new DirectTestStreamFactory()) .put(Write.Bound.class, new WriteWithShardingFactory()) - .put(ParDo.Bound.class, new ParDoOverrideFactory()) + .put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory()) .put( GBKIntoKeyedWorkItems.class, new DirectGBKIntoKeyedWorkItemsOverrideFactory()) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index ee4987f2a28e0..f12600042d4c4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -24,49 +24,22 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PCollectionTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * A {@link TransformEvaluatorFactory} for {@link ParDo}-like primitive {@link PTransform - * PTransforms}, parameterized by some {@link TransformHooks transform-specific handling}. - */ -final class ParDoEvaluatorFactory< - InputT, - OutputT, - TransformOutputT extends POutput, - TransformT extends PTransform, TransformOutputT>> - implements TransformEvaluatorFactory { - interface TransformHooks< - InputT, - OutputT, - TransformOutputT extends POutput, - TransformT extends PTransform, TransformOutputT>> { - /** Returns the {@link DoFn} contained in the given {@link ParDo} transform. */ - DoFn getDoFn(TransformT transform); - - /** Configures and creates a {@link ParDoEvaluator} for the given {@link DoFn}. */ - ParDoEvaluator createParDoEvaluator( - EvaluationContext evaluationContext, - AppliedPTransform, TransformOutputT, TransformT> application, - DirectStepContext stepContext, - DoFn fnLocal); - } +/** A {@link TransformEvaluatorFactory} for {@link ParDo.BoundMulti}. */ +final class ParDoEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); private final LoadingCache, DoFnLifecycleManager> fnClones; private final EvaluationContext evaluationContext; - private final TransformHooks hooks; - ParDoEvaluatorFactory( - EvaluationContext evaluationContext, - TransformHooks hooks) { + ParDoEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; - this.hooks = hooks; fnClones = CacheBuilder.newBuilder() .build( @@ -95,7 +68,8 @@ public void cleanup() throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) private TransformEvaluator createEvaluator( - AppliedPTransform, TransformOutputT, TransformT> application, + AppliedPTransform, PCollectionTuple, BoundMulti> + application, CommittedBundle inputBundle) throws Exception { String stepName = evaluationContext.getStepName(application); @@ -104,12 +78,20 @@ private TransformEvaluator createEvaluator( .getExecutionContext(application, inputBundle.getKey()) .getOrCreateStepContext(stepName, stepName); - DoFnLifecycleManager fnManager = - fnClones.getUnchecked(hooks.getDoFn(application.getTransform())); + DoFnLifecycleManager fnManager = fnClones.getUnchecked(application.getTransform().getNewFn()); try { + ParDo.BoundMulti transform = application.getTransform(); return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( - hooks.createParDoEvaluator( - evaluationContext, application, stepContext, (DoFn) fnManager.get()), + ParDoEvaluator.create( + evaluationContext, + stepContext, + application, + application.getInput().getWindowingStrategy(), + fnManager.get(), + transform.getSideInputs(), + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + application.getOutput().getAll()), fnManager); } catch (Exception e) { try { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java deleted file mode 100644 index f30f2090d8264..0000000000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; - -/** Support for {@link ParDo.BoundMulti} in {@link ParDoEvaluatorFactory}. */ -class ParDoMultiEvaluatorHooks - implements ParDoEvaluatorFactory.TransformHooks< - InputT, OutputT, PCollectionTuple, ParDo.BoundMulti> { - @Override - public DoFn getDoFn(ParDo.BoundMulti transform) { - return transform.getNewFn(); - } - - @Override - public ParDoEvaluator createParDoEvaluator( - EvaluationContext evaluationContext, - AppliedPTransform, PCollectionTuple, ParDo.BoundMulti> - application, - DirectStepContext stepContext, - DoFn fnLocal) { - ParDo.BoundMulti transform = application.getTransform(); - return ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - application.getInput().getWindowingStrategy(), - fnLocal, - transform.getSideInputs(), - transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), - application.getOutput().getAll()); - } -} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java similarity index 73% rename from runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java rename to runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 27941f86ff052..6cc3e6ee1867d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -19,35 +19,33 @@ import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; /** * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo} * in the direct runner. Currently overrides applications of Splittable DoFn. */ -class ParDoOverrideFactory +class ParDoMultiOverrideFactory implements PTransformOverrideFactory< - PCollection, PCollection, ParDo.Bound> { + PCollection, PCollectionTuple, ParDo.BoundMulti> { + @Override @SuppressWarnings("unchecked") - public PTransform, PCollection> override( - ParDo.Bound transform) { - ParDo.Bound that = (ParDo.Bound) transform; - DoFn fn = DoFnAdapters.getDoFn(that.getFn()); - if (fn == null) { - // This is an OldDoFn, hence not splittable. - return transform; - } + public PTransform, PCollectionTuple> override( + ParDo.BoundMulti transform) { + + DoFn fn = transform.getNewFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); if (!signature.processElement().isSplittable()) { return transform; + } else { + return new SplittableParDo(fn); } - return new SplittableParDo(fn); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java deleted file mode 100644 index 6d284c26865dd..0000000000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import com.google.common.collect.ImmutableMap; -import java.util.Collections; -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -/** Support for {@link ParDo.Bound} in {@link ParDoEvaluatorFactory}. */ -class ParDoSingleEvaluatorHooks - implements ParDoEvaluatorFactory.TransformHooks< - InputT, OutputT, PCollection, ParDo.Bound> { - @Override - public DoFn getDoFn(ParDo.Bound transform) { - return transform.getNewFn(); - } - - @Override - public ParDoEvaluator createParDoEvaluator( - EvaluationContext evaluationContext, - AppliedPTransform, PCollection, ParDo.Bound> - application, - DirectStepContext stepContext, - DoFn fnLocal) { - TupleTag mainOutputTag = new TupleTag<>("out"); - ParDo.Bound transform = application.getTransform(); - return ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - application.getInput().getWindowingStrategy(), - fnLocal, - transform.getSideInputs(), - mainOutputTag, - Collections.>emptyList(), - ImmutableMap., PCollection>of(mainOutputTag, application.getOutput())); - } -} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java new file mode 100644 index 0000000000000..ee3dfc5c877c3 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * A {@link PTransformOverrideFactory} that overrides single-output {@link ParDo} to implement + * it in terms of multi-output {@link ParDo}. + */ +class ParDoSingleViaMultiOverrideFactory + implements PTransformOverrideFactory< + PCollection, PCollection, ParDo.Bound> { + @Override + @SuppressWarnings("unchecked") + public PTransform, PCollection> override( + ParDo.Bound transform) { + return new ParDoSingleViaMulti(transform); + } + + static class ParDoSingleViaMulti + extends PTransform, PCollection> { + private static final String MAIN_OUTPUT_TAG = "main"; + + private final ParDo.Bound underlyingParDo; + + public ParDoSingleViaMulti(ParDo.Bound underlyingParDo) { + this.underlyingParDo = underlyingParDo; + } + + @Override + public PCollection apply(PCollection input) { + + // Output tags for ParDo need only be unique up to applied transform + TupleTag mainOutputTag = new TupleTag(MAIN_OUTPUT_TAG); + + PCollectionTuple output = + input.apply( + ParDo.of(underlyingParDo.getNewFn()) + .withSideInputs(underlyingParDo.getSideInputs()) + .withOutputTags(mainOutputTag, TupleTagList.empty())); + + return output.get(mainOutputTag); + } + } +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 51502f7760a47..0514c3af9d2f5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -49,12 +49,7 @@ public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) ImmutableMap., TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) - .put( - ParDo.Bound.class, - new ParDoEvaluatorFactory<>(ctxt, new ParDoSingleEvaluatorHooks<>())) - .put( - ParDo.BoundMulti.class, - new ParDoEvaluatorFactory<>(ctxt, new ParDoMultiEvaluatorHooks<>())) + .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt)) .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt)) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt)) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java deleted file mode 100644 index 6302d37566858..0000000000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.Serializable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ParDoMultiEvaluatorHooks}. - */ -@RunWith(JUnit4.class) -public class ParDoMultiEvaluatorHooksTest implements Serializable { - private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); - - @Test - public void testParDoMultiInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag> mainOutputTag = new TupleTag>() {}; - final TupleTag elementTag = new TupleTag<>(); - final TupleTag lengthTag = new TupleTag<>(); - - BoundMulti> pardo = - ParDo.of( - new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection> mainOutput = outputTuple.get(mainOutputTag); - PCollection elementOutput = outputTuple.get(elementTag); - PCollection lengthOutput = outputTuple.get(lengthTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle elementOutputBundle = bundleFactory.createBundle(elementOutput); - UncommittedBundle lengthOutputBundle = bundleFactory.createBundle(lengthOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle); - - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.>containsInAnyOrder( - lengthOutputBundle, mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - lengthOutputBundle.commit(Instant.now()).getElements(), - Matchers.>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testParDoMultiUndeclaredSideOutput() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag> mainOutputTag = new TupleTag>() {}; - final TupleTag elementTag = new TupleTag<>(); - final TupleTag lengthTag = new TupleTag<>(); - - BoundMulti> pardo = - ParDo.of( - new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection> mainOutput = outputTuple.get(mainOutputTag); - PCollection elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag> mainOutputTag = new TupleTag>() {}; - final TupleTag elementTag = new TupleTag<>(); - - final StateTag> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); - final StateTag> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - BoundMulti> pardo = - ParDo.of( - new DoFn>() { - private static final String STATE_ID = "my-state-id"; - - @StateId(STATE_ID) - private final StateSpec> bagSpec = - StateSpecs.bag(StringUtf8Coder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(STATE_ID) BagState bagState) { - bagState.add(c.element()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection> mainOutput = outputTuple.get(mainOutputTag); - PCollection elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, - null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(20205L))); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag> mainOutputTag = new TupleTag>() {}; - final TupleTag elementTag = new TupleTag<>(); - - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - BoundMulti> pardo = - ParDo.of( - new DoFn>() { - private static final String EVENT_TIME_TIMER = "event-time-timer"; - private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; - - @TimerId(EVENT_TIME_TIMER) - TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @TimerId(SYNC_PROC_TIME_TIMER) - TimerSpec syncProcTimerSpec = - TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, - @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, - @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { - - eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); - syncProcTimeTimer.cancel(); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection> mainOutput = outputTuple.get(mainOutputTag); - PCollection elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getTimerUpdate(), - equalTo( - TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) - .setTimer(addedTimer) - .setTimer(addedTimer) - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .build())); - } -} diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java deleted file mode 100644 index 10cd7c5b8126d..0000000000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.Serializable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ParDoSingleEvaluatorHooks}. - */ -@RunWith(JUnit4.class) -public class ParDoSingleEvaluatorHooksTest implements Serializable { - private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); - - @Test - public void testParDoInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - PCollection collection = - input.apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().length()); - } - })); - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle outputBundle = bundleFactory.createBundle(collection); - when(evaluationContext.createBundle(collection)).thenReturn(outputBundle); - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - org.apache.beam.runners.direct.TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - collection.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat(result.getOutputBundles(), Matchers.>contains(outputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - outputBundle.commit(Instant.now()).getElements(), - Matchers.>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - final TupleTag sideOutputTag = new TupleTag() {}; - PCollection collection = - input.apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element().length()); - } - })); - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle outputBundle = bundleFactory.createBundle(collection); - when(evaluationContext.createBundle(collection)).thenReturn(outputBundle); - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - collection.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), Matchers.>containsInAnyOrder(outputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - - final StateTag> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - ParDo.Bound> pardo = - ParDo.of( - new DoFn>() { - private static final String STATE_ID = "my-state-id"; - - @StateId(STATE_ID) - private final StateSpec> bagSpec = - StateSpecs.bag(StringUtf8Coder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(STATE_ID) BagState bagState) { - bagState.add(c.element()); - } - }); - PCollection> mainOutput = input.apply(pardo); - - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())) - .thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - org.apache.beam.runners.direct.TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection input = p.apply(Create.of("foo", "bara", "bazam")); - - // TODO: this timer data is absolute, but the new API only support relative settings. - // It will require adjustments when @Ignore is removed - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - ParDo.Bound> pardo = - ParDo.of( - new DoFn>() { - private static final String EVENT_TIME_TIMER = "event-time-timer"; - private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; - - @TimerId(EVENT_TIME_TIMER) - TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @TimerId(SYNC_PROC_TIME_TIMER) - TimerSpec syncProcTimerSpec = - TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, - @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, - @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { - eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); - syncProcTimeTimer.cancel(); - } - }); - PCollection> mainOutput = input.apply(pardo); - - StructuralKey key = StructuralKey.of("myKey", StringUtf8Coder.of()); - CommittedBundle inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - key, - null, - null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())) - .thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - - TransformResult result = evaluator.finishBundle(); - assertThat(result.getTimerUpdate(), - equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .build())); - } -}