Skip to content

[BEAM-3565] Add FusedPipeline#toPipeline#4777

Merged
tgroh merged 2 commits intoapache:masterfrom
tgroh:fused_pipeline_network_topo
Mar 23, 2018
Merged

[BEAM-3565] Add FusedPipeline#toPipeline#4777
tgroh merged 2 commits intoapache:masterfrom
tgroh:fused_pipeline_network_topo

Conversation

@tgroh
Copy link
Member

@tgroh tgroh commented Mar 1, 2018

The FusedPipeline is the physical plan corresponding with an original, logical Pipeline. Converting it back into a proto allows that plan to be manipulated with existing libraries, and for runners to interact with that plan for runner-specific implementation reasons.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@tgroh
Copy link
Member Author

tgroh commented Mar 1, 2018

@bsidhom

@tgroh tgroh force-pushed the fused_pipeline_network_topo branch 3 times, most recently from 40cd510 to 025f9f7 Compare March 6, 2018 18:22
*/
public Components asComponents(Components base) {
Builder newComponents = base.toBuilder().clearTransforms();
private Map<String, PTransform> getTopLevelTransforms(Components base) {
Copy link
Member

@lukecwik lukecwik Mar 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getRunnerExecutedPrimitiveTransforms?

If we go with my suggestion of treating executablestage as a primitive, then this can be called getPrimitiveTransforms or something along those lines. Even asComponents would still make sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getExecutableTransforms is what I am calling this now.

All of the transforms are primitives, but there are non-executable transforms (within a stage) that get merged into the components as well.


/**
* Return a {@link Components} like the {@code base} components, but with the only transforms
* equal to this fused pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but with the only transforms equal to this fused pipeline. -> but with the set of transforms required to be executed by a runner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* PCollections} are not yet modelled by {@link QueryablePipeline}, so the input {@link
* Components} should be treatable as though each node is a primitive.
*/
static QueryablePipeline forComponents(Components components) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense to make ExecutableStage a primitive and have its payload be the subtransforms, coders, pcollections, ...

This will address some of the coder issues and the duality where you want the runner based pipeline representation to differ from the SDK based pipeline segment which will now be completely embedded inside the executable stage.

Copy link
Member Author

@tgroh tgroh Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4844 performs a lot of this change, without embedding the components within the stage; it will still require us to create a partial components, but should ultimately cause this construction to go through the same path as the original queryablePipeline

@tgroh tgroh force-pushed the fused_pipeline_network_topo branch from 025f9f7 to 2641890 Compare March 13, 2018 23:36
@lukecwik
Copy link
Member

Please address all the comments.

@tgroh tgroh force-pushed the fused_pipeline_network_topo branch 3 times, most recently from e2ba80f to 361211e Compare March 20, 2018 21:26
@tgroh
Copy link
Member Author

tgroh commented Mar 21, 2018

PTAL

@tgroh tgroh force-pushed the fused_pipeline_network_topo branch from 361211e to 6e82d2e Compare March 21, 2018 18:13
* <p>The only composites will be the stages returned by {@link #getFusedStages()}.
* <p>The transforms that are present in the returned map are the union of the results of {@link
* #getRunnerExecutedTransforms()} and {@link #getFusedStages()}, where each {@link
* ExecutableStage}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where each executable stage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RM'd

/**
* Return a {@link Components} like the {@code base} components, but with the only transforms
* equal to this fused pipeline.
* Return a {@link Components} like the {@code base} components, but with the set of transforms to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems like it should apply toPipeline and not the private method getExecutableStages

Also, Return a {@link Pipeline}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment still seems out of date since it refers to base components.

/** The {@link PTransform PTransforms} that a runner is responsible for executing. */
public abstract Set<PTransformNode> getRunnerExecutedTransforms();

public RunnerApi.Pipeline toPipeline(Components initialComponents) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were given the Pipeline when we constructed the FusedStage via GreedyPipelineFuser, why do we need initialComponents to be passed in again here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made into a property.

* Get the PCollections which are not consumed by any {@link PTransformNode} in this {@link
* QueryablePipeline}.
*/
private Set<PCollectionNode> getLeafPCollections() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used anywhere, what is the intent of adding it right now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled in accidentally, I think.

fused.getFusedStages().size() == 2,
"Unexpected number of fused stages %s",
fused.getFusedStages());
RunnerApi.Pipeline fusedProto = fused.toPipeline(protoPipeline.getComponents());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fusedProto -> fusedProtoPipeline?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

PCollection.class.getSimpleName(), fusedProto.getRootTransformIds(i)),
producedPCollections,
hasItems(rootTransform.getInputsMap().values().toArray(new String[0])));
for (String consumed : consumedPCollections) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't all roots have zero consumed PCollections so this is just checking that no two roots produce the same PCollection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Pipeline root transform ID's is really the top-level transforms (transforms not contained in any other transform)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, not DAG roots.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address minor comment nits.

/**
* Return a {@link Components} like the {@code base} components, but with the only transforms
* equal to this fused pipeline.
* Return a {@link Components} like the {@code base} components, but with the set of transforms to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment still seems out of date since it refers to base components.

* PTransforms} present in the original Pipeline that this {@link FusedPipeline} was created from,
* plus all of the {@link ExecutableStage ExecutableStages} contained within this {@link
* FusedPipeline}. The Root Transform IDs will contain all of the runner executed transforms and
* all of the ExecutableStages contained within the Pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutableStages -> {@link ExecutableStage executable stages}

* <p>The {@link Components} of the returned pipeline will contain all of the {@link PTransform
* PTransforms} present in the original Pipeline that this {@link FusedPipeline} was created from,
* plus all of the {@link ExecutableStage ExecutableStages} contained within this {@link
* FusedPipeline}. The Root Transform IDs will contain all of the runner executed transforms and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upper casing on Root Transform IDs is strange, would you rather link the Pipeline root transform ids method?

PCollection.class.getSimpleName(), fusedProto.getRootTransformIds(i)),
producedPCollections,
hasItems(rootTransform.getInputsMap().values().toArray(new String[0])));
for (String consumed : consumedPCollections) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, not DAG roots.

tgroh added 2 commits March 21, 2018 16:46
Any pipeline node should be identifiable within the components the
pipeline was built from, and this allows cleaner comparisons based on
the id.
Given a Pipeline Components, this constructs the fused representation,
including a shallow topologically ordered set of root transforms.
@tgroh tgroh force-pushed the fused_pipeline_network_topo branch from e74aa23 to d254651 Compare March 21, 2018 23:46
@tgroh
Copy link
Member Author

tgroh commented Mar 21, 2018

Done to all

@bsidhom bsidhom mentioned this pull request Mar 22, 2018
@tgroh tgroh merged commit 3978b21 into apache:master Mar 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants