Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id. #11670

Merged
merged 3 commits into from May 13, 2020

Conversation

lukecwik
Copy link
Member


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

…d runner implemented transforms) have an environment id.
@lukecwik
Copy link
Member Author

lukecwik commented May 11, 2020

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved for Go.

@lukecwik
Copy link
Member Author

Run Go Flink ValidatesRunner

@lukecwik
Copy link
Member Author

Run Go Flink ValidatesRunner

@lukecwik
Copy link
Member Author

Run Python PreCommit

public class PipelineTrimmer {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class);
/**
* TrivialNativeTransformExpander is used to replace transforms with known URNs with their native
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "Trivial" mean here? Are there "Native" transforms that are not "Trivial"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not currently but the "trivial" is to imply that you don't need anything other then payload on the transform itself and that you don't need to inspect the transform or any of its children when constructing the native transform.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "trivial" is meant to apply to the expander part.

@@ -375,6 +375,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
payload := &pipepb.WindowIntoPayload{
WindowFn: makeWindowFn(edge.Edge.WindowFn),
}
transformEnvID = m.addDefaultEnv()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be native as well, right ?

Copy link
Member Author

@lukecwik lukecwik May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Window.intorequires execution of assignWindows that is part of the windowing fn (which could be a well known window fn or a custom user window fn).

It could be lifted into the runner if it understands the windowing fn but that could break fusion since Window.into may occur between ParDos and far away from GroupByKey.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. I read this as GBK by mistake. Can we do this in a common place and skip the two known runner implemented transforms similar to other SDKs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

if (knownUrns.contains(
pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
// Skip over previously removed transforms from the original pipeline.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should expand the comment here to describe why this is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -375,6 +375,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
payload := &pipepb.WindowIntoPayload{
WindowFn: makeWindowFn(edge.Edge.WindowFn),
}
transformEnvID = m.addDefaultEnv()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. I read this as GBK by mistake. Can we do this in a common place and skip the two known runner implemented transforms similar to other SDKs ?

@@ -123,20 +123,16 @@ class Pipeline(object):
should be used to designate new names
(e.g. ``input | "label" >> my_transform``).
"""

# TODO: BEAM-9001 - set environment ID in all transforms and allow runners to
# override.
@classmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an assert similar to PipelineValidator.java ?

Copy link
Member Author

@lukecwik lukecwik May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you suggest the assert go?

The PipelineValidator does the assertion since it is used by the Runner once the entire pipeline is constructed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can introduce a new visitor and update runners to use that similar to following ?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L557

But this is not a blocker.

@lukecwik
Copy link
Member Author

Run Java PreCommit

@lukecwik
Copy link
Member Author

Run Python PreCommit

1 similar comment
@lukecwik
Copy link
Member Author

Run Python PreCommit

@@ -213,6 +213,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
transform := &pipepb.PTransform{
UniqueName: s.Scope.Name,
Subtransforms: subtransforms,
EnvironmentId: m.addDefaultEnv(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it's now the case that Composite Transforms should have environments?

Copy link
Member Author

@lukecwik lukecwik May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

It was to cover the case where the runner knows what the composite transform means such as a combiner and lifts it appropriately.

@lukecwik
Copy link
Member Author

Run Python PreCommit

@aaltay
Copy link
Member

aaltay commented May 13, 2020

Is this ready to be merged? This is the only blocker left on the release.

This change is large for a cherry pick. What would be impacted, what tests need to be run? If we cherry pick this to the release branch there will be a risk of introducing other issues.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

@@ -123,20 +123,16 @@ class Pipeline(object):
should be used to designate new names
(e.g. ``input | "label" >> my_transform``).
"""

# TODO: BEAM-9001 - set environment ID in all transforms and allow runners to
# override.
@classmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can introduce a new visitor and update runners to use that similar to following ?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L557

But this is not a blocker.

@lukecwik
Copy link
Member Author

Is this ready to be merged? This is the only blocker left on the release.

This change is large for a cherry pick. What would be impacted, what tests need to be run? If we cherry pick this to the release branch there will be a risk of introducing other issues.

We only would need to cherry pick the Python portion of this change which is one file. The Java/Go portion aren't important for 2.21.

@lukecwik lukecwik merged commit a5b2046 into apache:master May 13, 2020
@aaltay
Copy link
Member

aaltay commented May 13, 2020

We only would need to cherry pick the Python portion of this change which is one file. The Java/Go portion aren't important for 2.21.

Thank you!

ibzib pushed a commit to ibzib/beam that referenced this pull request May 13, 2020
…d runner implemented transforms) have an environment id. (apache#11670)

* [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id.

* fixup! Fix native transform expander to not reinsert deleted transforms.

* fixup! Address chamikara's PR comments
mxm pushed a commit to lyft/beam that referenced this pull request Jul 8, 2020
…d runner implemented transforms) have an environment id. (apache#11670)

* [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id.

* fixup! Fix native transform expander to not reinsert deleted transforms.

* fixup! Address chamikara's PR comments
yirutang pushed a commit to yirutang/beam that referenced this pull request Jul 23, 2020
…d runner implemented transforms) have an environment id. (apache#11670)

* [BEAM-9001, BEAM-6327] Ensure that all transforms (except for required runner implemented transforms) have an environment id.

* fixup! Fix native transform expander to not reinsert deleted transforms.

* fixup! Address chamikara's PR comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants