From 5e9fcebc07725de368391914781e5b4d5f9c4a19 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 19 May 2017 12:57:41 -0700 Subject: [PATCH] Remove Pipeline reference from TransformHierarchy This change removes a direct dependency cycle between Pipeline and TransformHierarchy. There is still an indirect cycle through PValues, but that is slightly less problematic. --- .../translation/ApexPipelineTranslator.java | 4 +- .../apex/translation/TranslationContext.java | 5 +- .../core/construction/SdkComponents.java | 14 +++--- .../core/construction/SdkComponentsTest.java | 7 +-- .../runners/direct/DirectGraphVisitor.java | 3 +- .../direct/KeyedPValueTrackingVisitor.java | 5 +- .../flink/FlinkBatchPipelineTranslator.java | 2 +- .../beam/runners/flink/FlinkRunner.java | 10 +--- .../FlinkStreamingPipelineTranslator.java | 4 +- .../dataflow/DataflowPipelineTranslator.java | 6 +-- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../beam/runners/spark/SparkRunner.java | 2 +- .../streaming/TrackStreamingSourcesTest.java | 14 +++++- .../java/org/apache/beam/sdk/Pipeline.java | 50 +++++++++++++++---- .../beam/sdk/runners/TransformHierarchy.java | 6 +-- .../sdk/runners/TransformHierarchyTest.java | 2 +- 16 files changed, 82 insertions(+), 54 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 32e470f6029a..bda074b0a29b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -49,7 +49,7 @@ * into Apex logical plan {@link DAG}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { +public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class); /** @@ -110,7 +110,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { throw new UnsupportedOperationException( "no translator registered for " + transform); } - translationContext.setCurrentTransform(node); + translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); translator.translate(transform, translationContext); } diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index a5e30281a9e3..aff3863624c4 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -36,7 +36,6 @@ import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.Window; @@ -77,8 +76,8 @@ public InputT getViewInput(PCollectionView view) { this.pipelineOptions = pipelineOptions; } - public void setCurrentTransform(TransformHierarchy.Node treeNode) { - this.currentTransform = treeNode.toAppliedPTransform(); + public void setCurrentTransform(AppliedPTransform transform) { + this.currentTransform = transform; } public ApexPipelineOptions getPipelineOptions() { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index eb29b9a3ae7a..5714fc510481 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -62,10 +62,10 @@ static SdkComponents create() { return new SdkComponents(); } - public static RunnerApi.Pipeline translatePipeline(Pipeline p) { + public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) { final SdkComponents components = create(); final Collection rootIds = new HashSet<>(); - p.traverseTopologically( + pipeline.traverseTopologically( new PipelineVisitor.Defaults() { private final ListMultimap> children = ArrayListMultimap.create(); @@ -77,9 +77,10 @@ public void leaveCompositeTransform(Node node) { rootIds.add(components.getExistingPTransformId(pipelineRoot)); } } else { - children.put(node.getEnclosingNode(), node.toAppliedPTransform()); + children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline())); try { - components.registerPTransform(node.toAppliedPTransform(), children.get(node)); + components.registerPTransform( + node.toAppliedPTransform(getPipeline()), children.get(node)); } catch (IOException e) { throw new RuntimeException(e); } @@ -88,10 +89,11 @@ public void leaveCompositeTransform(Node node) { @Override public void visitPrimitiveTransform(Node node) { - children.put(node.getEnclosingNode(), node.toAppliedPTransform()); + children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline())); try { components.registerPTransform( - node.toAppliedPTransform(), Collections.>emptyList()); + node.toAppliedPTransform(getPipeline()), + Collections.>emptyList()); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 7424886d1009..55702ea22390 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -97,17 +97,12 @@ public void translatePipeline() { final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline); pipeline.traverseTopologically( - new PipelineVisitor() { + new PipelineVisitor.Defaults() { Set transforms = new HashSet<>(); Set> pcollections = new HashSet<>(); Set>> coders = new HashSet<>(); Set> windowingStrategies = new HashSet<>(); - @Override - public CompositeBehavior enterCompositeTransform(Node node) { - return CompositeBehavior.ENTER_TRANSFORM; - } - @Override public void leaveCompositeTransform(Node node) { if (node.isRootNode()) { diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 1ee8ceb9a7e5..01204e3049dd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -41,6 +41,7 @@ * input after the upstream transform has produced and committed output. */ class DirectGraphVisitor extends PipelineVisitor.Defaults { + private Map> producers = new HashMap<>(); private ListMultimap> primitiveConsumers = @@ -101,7 +102,7 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { private AppliedPTransform getAppliedTransform(TransformHierarchy.Node node) { @SuppressWarnings({"rawtypes", "unchecked"}) - AppliedPTransform application = node.toAppliedPTransform(); + AppliedPTransform application = node.toAppliedPTransform(getPipeline()); return application; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 347f313af85a..f9b2daeaa1df 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -44,7 +44,7 @@ */ // TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms // unkeyed -class KeyedPValueTrackingVisitor implements PipelineVisitor { +class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults { private static final Set> PRODUCES_KEYED_OUTPUTS = ImmutableSet.of( @@ -90,9 +90,6 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) { } } - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) {} - @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { boolean inputsAreKeyed = true; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java index 854b67460ae2..50910b5bab5d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java @@ -112,7 +112,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { BatchTransformTranslator typedTranslator = (BatchTransformTranslator) translator; // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(node.toAppliedPTransform()); + batchContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); typedTranslator.translateNode(typedTransform, batchContext); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 80ef7bb32acd..ca12615be03c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PValue; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.client.program.DetachedEnvironment; import org.slf4j.Logger; @@ -199,10 +198,7 @@ private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pip // have just recorded the full names during apply time. if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { final SortedSet ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); - pipeline.traverseTopologically(new Pipeline.PipelineVisitor() { - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) { - } + pipeline.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { @@ -218,10 +214,6 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { } return CompositeBehavior.ENTER_TRANSFORM; } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - } }); LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 53a1fa160ad1..8da68c5fc11e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -188,7 +188,7 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { StreamTransformTranslator typedTranslator = (StreamTransformTranslator) translator; // create the applied PTransform on the streamingContext - streamingContext.setCurrentTransform(node.toAppliedPTransform()); + streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); typedTranslator.translateNode(typedTransform, streamingContext); } @@ -203,7 +203,7 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { @SuppressWarnings("unchecked") StreamTransformTranslator typedTranslator = (StreamTransformTranslator) translator; - streamingContext.setCurrentTransform(node.toAppliedPTransform()); + streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline())); return typedTranslator.canTranslate(typedTransform, streamingContext); } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 840bda846ed6..6d7a0f847ca2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -431,18 +431,18 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { transform, node.getFullName()); LOG.debug("Translating {}", transform); - currentTransform = node.toAppliedPTransform(); + currentTransform = node.toAppliedPTransform(getPipeline()); translator.translate(transform, this); currentTransform = null; } @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { - producers.put(value, producer.toAppliedPTransform()); + producers.put(value, producer.toAppliedPTransform(getPipeline())); LOG.debug("Checking translation of {}", value); if (!producer.isCompositeNode()) { // Primitive transforms are the only ones assigned step names. - asOutputReference(value, producer.toAppliedPTransform()); + asOutputReference(value, producer.toAppliedPTransform(getPipeline())); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 2ef87374956d..cce6ce79b883 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -729,7 +729,7 @@ private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pip if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { final SortedSet ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); pipeline.traverseTopologically( - new PipelineVisitor() { + new PipelineVisitor.Defaults() { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) {} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 8c02f0f0e90d..9e2426ef8381 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -404,7 +404,7 @@ > void doVisitTransform( @SuppressWarnings("unchecked") TransformEvaluator evaluator = translate(node, transform, transformClass); LOG.info("Evaluating {}", transform); - AppliedPTransform appliedTransform = node.toAppliedPTransform(); + AppliedPTransform appliedTransform = node.toAppliedPTransform(getPipeline()); ctxt.setCurrentTransform(appliedTransform); evaluator.evaluate(transform, ctxt); ctxt.setCurrentTransform(null); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index 33a636ad65f8..e8a59510cdb9 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -147,6 +147,12 @@ private void assertSourceIds(List streamingSources) { assertThat(streamingSources, containsInAnyOrder(expected)); } + @Override + public void enterPipeline(Pipeline p) { + super.enterPipeline(p); + evaluator.enterPipeline(p); + } + @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { return evaluator.enterCompositeTransform(node); @@ -156,7 +162,7 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { public void visitPrimitiveTransform(TransformHierarchy.Node node) { PTransform transform = node.getTransform(); if (transform.getClass() == transformClassToAssert) { - AppliedPTransform appliedTransform = node.toAppliedPTransform(); + AppliedPTransform appliedTransform = node.toAppliedPTransform(getPipeline()); ctxt.setCurrentTransform(appliedTransform); //noinspection unchecked Dataset dataset = ctxt.borrowDataset((PTransform) transform); @@ -166,6 +172,12 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { evaluator.visitPrimitiveTransform(node); } } + + @Override + public void leavePipeline(Pipeline p) { + super.leavePipeline(p); + evaluator.leavePipeline(p); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 83496a5302ad..bdf8a12aa2bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; @@ -205,7 +206,7 @@ private void checkNoMoreMatches(final List overrides) { public CompositeBehavior enterCompositeTransform(Node node) { if (!node.isRootNode()) { for (PTransformOverride override : overrides) { - if (override.getMatcher().matches(node.toAppliedPTransform())) { + if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matched.put(node, override); } } @@ -227,7 +228,7 @@ public void leaveCompositeTransform(Node node) { @Override public void visitPrimitiveTransform(Node node) { for (PTransformOverride override : overrides) { - if (override.getMatcher().matches(node.toAppliedPTransform())) { + if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matched.put(node, override); } } @@ -238,7 +239,7 @@ public void visitPrimitiveTransform(Node node) { private void replace(final PTransformOverride override) { final Set matches = new HashSet<>(); final Set freedNodes = new HashSet<>(); - transforms.visit( + traverseTopologically( new PipelineVisitor.Defaults() { @Override public CompositeBehavior enterCompositeTransform(Node node) { @@ -247,7 +248,8 @@ public CompositeBehavior enterCompositeTransform(Node node) { freedNodes.add(node); return CompositeBehavior.ENTER_TRANSFORM; } - if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) { + if (!node.isRootNode() + && override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matches.add(node); // This node will be freed. When we visit any of its children, they will also be freed freedNodes.add(node); @@ -259,7 +261,7 @@ public CompositeBehavior enterCompositeTransform(Node node) { public void visitPrimitiveTransform(Node node) { if (freedNodes.contains(node.getEnclosingNode())) { freedNodes.add(node); - } else if (override.getMatcher().matches(node.toAppliedPTransform())) { + } else if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) { matches.add(node); freedNodes.add(node); } @@ -334,8 +336,14 @@ public void setCoderRegistry(CoderRegistry coderRegistry) { @Internal public interface PipelineVisitor { /** - * Called for each composite transform after all topological predecessors have been visited - * but before any of its component transforms. + * Called before visiting anything values or transforms, as many uses of a visitor require + * access to the {@link Pipeline} object itself. + */ + void enterPipeline(Pipeline p); + + /** + * Called for each composite transform after all topological predecessors have been visited but + * before any of its component transforms. * *

The return value controls whether or not child transforms are visited. */ @@ -359,6 +367,11 @@ public interface PipelineVisitor { */ void visitValue(PValue value, TransformHierarchy.Node producer); + /** + * Called when all values and transforms in a {@link Pipeline} have been visited. + */ + void leavePipeline(Pipeline pipeline); + /** * Control enum for indicating whether or not a traversal should process the contents of * a composite transform or not. @@ -373,6 +386,18 @@ enum CompositeBehavior { * User implementations can override just those methods they are interested in. */ class Defaults implements PipelineVisitor { + + private Pipeline pipeline; + + protected Pipeline getPipeline() { + return pipeline; + } + + @Override + public void enterPipeline(Pipeline pipeline) { + this.pipeline = checkNotNull(pipeline); + } + @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { return CompositeBehavior.ENTER_TRANSFORM; @@ -386,6 +411,11 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { } @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { } + + @Override + public void leavePipeline(Pipeline pipeline) { + this.pipeline = null; + } } } @@ -406,7 +436,9 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) { } */ @Internal public void traverseTopologically(PipelineVisitor visitor) { + visitor.enterPipeline(this); transforms.visit(visitor); + visitor.leavePipeline(this); } /** @@ -444,7 +476,7 @@ OutputT applyTransform(String name, InputT input, ///////////////////////////////////////////////////////////////////////////// // Below here are internal operations, never called by users. - private final TransformHierarchy transforms = new TransformHierarchy(this); + private final TransformHierarchy transforms = new TransformHierarchy(); private Set usedFullNames = new HashSet<>(); private CoderRegistry coderRegistry; private final List unstableNames = new ArrayList<>(); @@ -495,7 +527,7 @@ void applyReplacement( PTransformOverrideFactory replacementFactory) { PTransformReplacement replacement = replacementFactory.getReplacementTransform( - (AppliedPTransform) original.toAppliedPTransform()); + (AppliedPTransform) original.toAppliedPTransform(this)); if (replacement.getTransform() == original.getTransform()) { return; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index fac558bc8432..2f0e8efd7de8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -56,7 +56,6 @@ public class TransformHierarchy { private static final Logger LOG = LoggerFactory.getLogger(TransformHierarchy.class); - private final Pipeline pipeline; private final Node root; private final Map unexpandedInputs; private final Map producers; @@ -65,8 +64,7 @@ public class TransformHierarchy { // Maintain a stack based on the enclosing nodes private Node current; - public TransformHierarchy(Pipeline pipeline) { - this.pipeline = pipeline; + public TransformHierarchy() { producers = new HashMap<>(); producerInput = new HashMap<>(); unexpandedInputs = new HashMap<>(); @@ -453,7 +451,7 @@ public Map, PValue> getOutputs() { /** * Returns the {@link AppliedPTransform} representing this {@link Node}. */ - public AppliedPTransform toAppliedPTransform() { + public AppliedPTransform toAppliedPTransform(Pipeline pipeline) { return AppliedPTransform.of( getFullName(), inputs, outputs, (PTransform) getTransform(), pipeline); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 125e15902d17..1197d1b04eb6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -79,7 +79,7 @@ public class TransformHierarchyTest implements Serializable { @Before public void setup() { - hierarchy = new TransformHierarchy(pipeline); + hierarchy = new TransformHierarchy(); } @Test