From d3e894f588d6991e1fd095c3ea5c79760dad7066 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 18 Apr 2017 17:25:59 -0700 Subject: [PATCH 1/3] Ensure all Read outputs are consumed in Dataflow Apply a no-op ParDo to any PTransform that is not consumed. --- .../core/construction/UnconsumedReads.java | 68 ++++++++++++ .../construction/UnconsumedReadsTest.java | 105 ++++++++++++++++++ .../beam/runners/dataflow/DataflowRunner.java | 4 + .../runners/dataflow/DataflowRunnerTest.java | 24 ++++ 4 files changed, 201 insertions(+) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java new file mode 100644 index 000000000000..d904deb96333 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; + +/** + * Utilities for ensuring that all {@link Read} {@link PTransform PTransforms} are consumed by some + * {@link PTransform}. + */ +public class UnconsumedReads { + public static void ensureAllReadsConsumed(Pipeline pipeline) { + final Set> unconsumed = new HashSet<>(); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + unconsumed.removeAll(node.getInputs().values()); + } + + @Override + public void visitValue(PValue value, Node producer) { + if (producer.getTransform() instanceof Read.Bounded + || producer.getTransform() instanceof Read.Unbounded) { + unconsumed.add((PCollection) value); + } + } + }); + for (PCollection unconsumedPCollection : unconsumed) { + consume(unconsumedPCollection); + } + } + + private static void consume(PCollection unconsumedPCollection) { + unconsumedPCollection.apply(ParDo.of(new NoOpDoFn())); + } + + private static class NoOpDoFn extends DoFn { + @ProcessElement + public void doNothing(ProcessContext context) {} + } +} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java new file mode 100644 index 000000000000..1966a93ec4d9 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertThat; + +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PValue; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link UnconsumedReads}. + */ +@RunWith(JUnit4.class) +public class UnconsumedReadsTest { + @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Test + public void matcherProducesUnconsumedValueBoundedRead() { + Bounded transform = Read.from(CountingSource.upTo(20L)); + PCollection output = pipeline.apply(transform); + UnconsumedReads.ensureAllReadsConsumed(pipeline); + validateConsumed(); + } + + @Test + public void matcherProducesUnconsumedValueUnboundedRead() { + Unbounded transform = Read.from(CountingSource.unbounded()); + PCollection output = pipeline.apply(transform); + UnconsumedReads.ensureAllReadsConsumed(pipeline); + validateConsumed(); + } + + @Test + public void doesNotConsumeAlreadyConsumedRead() { + Unbounded transform = Read.from(CountingSource.unbounded()); + final PCollection output = pipeline.apply(transform); + final Flatten.PCollections consumer = Flatten.pCollections(); + PCollectionList.of(output).apply(consumer); + UnconsumedReads.ensureAllReadsConsumed(pipeline); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + // The output should only be consumed by a single consumer + if (node.getInputs().values().contains(output)) { + assertThat(node.getTransform(), Matchers.>is(consumer)); + } + } + }); + } + + private void validateConsumed() { + final Set consumedOutputs = new HashSet(); + final Set allReadOutputs = new HashSet(); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + consumedOutputs.addAll(node.getInputs().values()); + } + + @Override + public void visitValue(PValue value, Node producer) { + if (producer.getTransform() instanceof Read.Bounded + || producer.getTransform() instanceof Read.Unbounded) { + allReadOutputs.add(value); + } + } + }); + assertThat(consumedOutputs, Matchers.hasItems(allReadOutputs.toArray(new PValue[0]))); + } +} diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 4eec6b8e6c3a..026e175d3e71 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; @@ -690,6 +691,9 @@ private static Map getEnvironmentVersion(DataflowPipelineOptions @VisibleForTesting void replaceTransforms(Pipeline pipeline) { boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline); + // Ensure all outputs of all reads are consumed before potentially replacing any + // Read PTransforms + UnconsumedReads.ensureAllReadsConsumed(pipeline); pipeline.replaceAll(getOverrides(streaming)); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 79a96e7cbce2..36704bc1c293 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -57,6 +58,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -65,11 +67,13 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.TextIO.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -331,6 +335,26 @@ public void testTextIOWithRuntimeParameters() throws IOException { .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); } + /** + * Tests that all reads are consumed by at least one {@link PTransform}. + */ + @Test + public void testUnconsumedReads() throws IOException { + DataflowPipelineOptions dataflowOptions = buildPipelineOptions(); + RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class); + Pipeline p = buildDataflowPipeline(dataflowOptions); + PCollection unconsumed = p.apply(Read.from(options.getInput()).withoutValidation()); + DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p); + final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean(); + p.traverseTopologically(new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(Node node) { + unconsumedSeenAsInput.set(true); + } + }); + assertThat(unconsumedSeenAsInput.get(), is(true)); + } + @Test public void testRunReturnDifferentRequestId() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); From aaa2895e02b9a5521772c75389c716a89b712e34 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 19 Apr 2017 09:08:28 -0700 Subject: [PATCH 2/3] fixup! Ensure all Read outputs are consumed in Dataflow --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 026e175d3e71..2912fa771438 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -64,8 +64,8 @@ import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; -import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; +import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; From 89a66027d5bdd359224b74a2c71964d30df794bf Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 19 Apr 2017 09:22:10 -0700 Subject: [PATCH 3/3] fixup! Ensure all Read outputs are consumed in Dataflow --- .../runners/core/construction/UnconsumedReads.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java index d904deb96333..c191eeb8617d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java @@ -52,13 +52,17 @@ public void visitValue(PValue value, Node producer) { } } }); + int i = 0; for (PCollection unconsumedPCollection : unconsumed) { - consume(unconsumedPCollection); + consume(unconsumedPCollection, i); + i++; } } - private static void consume(PCollection unconsumedPCollection) { - unconsumedPCollection.apply(ParDo.of(new NoOpDoFn())); + private static void consume(PCollection unconsumedPCollection, int uniq) { + // Multiple applications should never break due to stable unique names. + String uniqueName = "DropInputs" + (uniq == 0 ? "" : uniq); + unconsumedPCollection.apply(uniqueName, ParDo.of(new NoOpDoFn())); } private static class NoOpDoFn extends DoFn {