From 530ef12db34c1145a6a34749f1f6814ef9c8618f Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 11 Apr 2017 18:17:02 -0700 Subject: [PATCH 1/2] Free PTransform Names if they are being Replaced Naming is based on what's in the graph, not what once was there. --- .../java/org/apache/beam/sdk/Pipeline.java | 26 +++++-- .../org/apache/beam/sdk/PipelineTest.java | 77 +++++++++++++++++++ 2 files changed, 97 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 11d781d61309..3b919e5c4ac1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -230,25 +229,43 @@ public void visitPrimitiveTransform(Node node) { private void replace(final PTransformOverride override) { final Collection matches = new ArrayList<>(); + final Set freedNames = new HashSet<>(); transforms.visit( new PipelineVisitor.Defaults() { + Node clearingNamesFor = null; + @Override public CompositeBehavior enterCompositeTransform(Node node) { + if (clearingNamesFor != null) { + freedNames.add(node.getFullName()); + return CompositeBehavior.ENTER_TRANSFORM; + } if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) { matches.add(node); // This node will be replaced. It should not be visited. - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + clearingNamesFor = node; + freedNames.add(node.getFullName()); } return CompositeBehavior.ENTER_TRANSFORM; } + public void leaveCompositeTransform(Node node) { + if (clearingNamesFor == node) { + clearingNamesFor = null; + } + } + @Override public void visitPrimitiveTransform(Node node) { - if (override.getMatcher().matches(node.toAppliedPTransform())) { + if (clearingNamesFor != null) { + freedNames.add(node.getFullName()); + } else if (override.getMatcher().matches(node.toAppliedPTransform())) { matches.add(node); + freedNames.add(node.getFullName()); } } }); + this.usedFullNames.removeAll(freedNames); for (Node match : matches) { applyReplacement(match, override.getOverrideFactory()); } @@ -486,9 +503,6 @@ private OutputT applyInternal( void applyReplacement( Node original, PTransformOverrideFactory replacementFactory) { - // Names for top-level transforms have been assigned. Any new collisions are within a node - // and its replacement. - getOptions().setStableUniqueNames(CheckEnabled.OFF); PTransform replacement = replacementFactory.getReplacementTransform((TransformT) original.getTransform()); if (replacement == original.getTransform()) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index 0a5746b8e858..6ce016d8af14 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.not; @@ -29,8 +30,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.io.CountingInput; @@ -51,6 +54,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; @@ -384,6 +388,79 @@ public boolean matches(AppliedPTransform application) { new UnboundedCountingInputOverride()))); } + @Test + public void testReplacedNames() { + final PCollection originalInput = pipeline.apply(Create.of("foo", "bar", "baz")); + class OriginalTransform extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input.apply("custom_name", Count.globally()); + } + } + class ReplacementTransform extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input.apply("custom_name", Count.globally()); + } + } + class ReplacementOverrideFactory + implements PTransformOverrideFactory< + PCollection, PCollection, OriginalTransform> { + + @Override + public PTransform, PCollection> getReplacementTransform( + OriginalTransform transform) { + return new ReplacementTransform(); + } + + @Override + public PCollection getInput(Map, PValue> inputs, Pipeline p) { + return originalInput; + } + + @Override + public Map mapOutputs( + Map, PValue> outputs, PCollection newOutput) { + return Collections.singletonMap( + newOutput, + ReplacementOutput.of( + TaggedPValue.ofExpandedValue( + Iterables.getOnlyElement(outputs.values())), + TaggedPValue.ofExpandedValue(newOutput))); + } + } + + class OriginalMatcher implements PTransformMatcher { + @Override + public boolean matches(AppliedPTransform application) { + return application.getTransform() instanceof OriginalTransform; + } + } + + originalInput.apply("original_application", new OriginalTransform()); + pipeline.replaceAll( + Collections.singletonList( + PTransformOverride.of(new OriginalMatcher(), new ReplacementOverrideFactory()))); + final Set names = new HashSet<>(); + pipeline.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void leaveCompositeTransform(Node node) { + if (!node.isRootNode()) { + names.add(node.getFullName()); + } + } + + @Override + public void visitPrimitiveTransform(Node node) { + names.add(node.getFullName()); + } + }); + + assertThat(names, hasItem("original_application/custom_name")); + assertThat(names, not(hasItem("original_application/custom_name2"))); + } + static class BoundedCountingInputOverride implements PTransformOverrideFactory, BoundedCountingInput> { @Override From 9d42c8fa45021617ee2063da22c04c06638cca0a Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Fri, 14 Apr 2017 09:00:23 -0700 Subject: [PATCH 2/2] fixup! Free PTransform Names if they are being Replaced --- .../java/org/apache/beam/sdk/Pipeline.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 3b919e5c4ac1..791166e2a8e1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -228,44 +228,38 @@ public void visitPrimitiveTransform(Node node) { } private void replace(final PTransformOverride override) { - final Collection matches = new ArrayList<>(); - final Set freedNames = new HashSet<>(); + final Set matches = new HashSet<>(); + final Set freedNodes = new HashSet<>(); transforms.visit( new PipelineVisitor.Defaults() { - Node clearingNamesFor = null; - @Override public CompositeBehavior enterCompositeTransform(Node node) { - if (clearingNamesFor != null) { - freedNames.add(node.getFullName()); + if (!node.isRootNode() && freedNodes.contains(node.getEnclosingNode())) { + // This node will be freed because its parent will be freed. + freedNodes.add(node); return CompositeBehavior.ENTER_TRANSFORM; } if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) { matches.add(node); - // This node will be replaced. It should not be visited. - clearingNamesFor = node; - freedNames.add(node.getFullName()); + // This node will be freed. When we visit any of its children, they will also be freed + freedNodes.add(node); } return CompositeBehavior.ENTER_TRANSFORM; } - public void leaveCompositeTransform(Node node) { - if (clearingNamesFor == node) { - clearingNamesFor = null; - } - } - @Override public void visitPrimitiveTransform(Node node) { - if (clearingNamesFor != null) { - freedNames.add(node.getFullName()); + if (freedNodes.contains(node.getEnclosingNode())) { + freedNodes.add(node); } else if (override.getMatcher().matches(node.toAppliedPTransform())) { matches.add(node); - freedNames.add(node.getFullName()); + freedNodes.add(node); } } }); - this.usedFullNames.removeAll(freedNames); + for (Node freedNode : freedNodes) { + usedFullNames.remove(freedNode.getFullName()); + } for (Node match : matches) { applyReplacement(match, override.getOverrideFactory()); }