[BEAM-7850] Makes environment ID a top level attribute of PTransform.#10183
Conversation
|
The proto changes makes sense to me. The one bit I'm not sure of is how this will look for cross-language UDFs, but I think that will still look very different than the current SdfFunctionSpecs, and possibly will be modeled as "side" PTransforms which this would be in line with. |
|
Thanks. Yeah, design for cross-language UDFs should be done separately and SdkFunctionSpec is inadequate for this. |
There was a problem hiding this comment.
All other uses of SdkFunctionSpec are part of a transform but finding the envrionment for the windowing strategy that the window_fn is part of when looking at a PCollection is difficult since one needs to find the environment that the upstream assign windows transform is part of. It is possible but annoying. Also this is different then coders which don't have an environment since both the upstream and downstream transforms need to understand the encoding and runners have the length prefixing technique to convert coders from unknown ones to known ones.
For now I'm for not adding an environment id here and forcing that graph traversal for the assign windows transform to find the environment and in the future to truly support custom window fns its likely we will have to come up with techniques like the coder length prefixing to make it so that they can be treated opaquely. (note that rest of the change looks good)
@chamikaramj @robertwb what do you think?
There was a problem hiding this comment.
That makes sense and I'm actually running into this issue while updating SDKs. We probably have to add environment_id to WindowingStrategy since it's not directly attached to PTransform.
There was a problem hiding this comment.
The windowing strategy cannot be coerced into an environment-agnostic one the same way a coder can be, as it involves actually executing user code as oppose to specifying constraints on its behavior. (In some sense, it is in a very real sense a cross-language UDF.)
Having to traverse the graph is ugly, but I suppose possible for now.
There was a problem hiding this comment.
After hearing about some of the difficulties that Cham is running into. I would go either way on whether we add an environment id here or remove it completely.
There was a problem hiding this comment.
Added an environment_id to WindowingStrategy for now (retaining the current behavior). This can be further simplified in the future if needed.
16d484b to
6f57355
Compare
66d8153 to
a636ea0
Compare
|
R: @robertwb I'm trying to get the last set of failing tests to work but I think this is ready to be reviewed now. |
robertwb
left a comment
There was a problem hiding this comment.
Thanks for this cleanup.
There was a problem hiding this comment.
Nit: for consistency, order this as above.
There was a problem hiding this comment.
Seems it'd be better to have a prohibited list. Or possibly call these sdkTransformsWithEnvironment as "known" means lots of different things, and Flatten, GBK, ... would certainly seem to belong to a known this list.
There was a problem hiding this comment.
Renamed to sdkTransformsWithEnvironment since we are trying to maintain current behavior in this PR. In the future I think we should set environment ID in all transforms by default and let runners override for naively implemented transforms. Created https://issues.apache.org/jira/browse/BEAM-9001 for this.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
As for Java, known is probably not the right adjective here.
There was a problem hiding this comment.
This is not needed anymore. Removed.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
(On that note, a set of environment-less ones probably makes more sense, as it's more likely to be a fixed set.)
There was a problem hiding this comment.
Ditto. Renamed the variable and created https://issues.apache.org/jira/browse/BEAM-9001.
There was a problem hiding this comment.
Ditto. Renamed the variable and created https://issues.apache.org/jira/browse/BEAM-9001.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
Why is this needed given the above?
(Also, if it is needed, should it be nested for clarity, i.e.
if not environment_id:
if transform_urn and transform_urn in KNOWN_SDK_TRANSFORMS:
environment_id = context.default_environment_id()
elif ...:
There was a problem hiding this comment.
Only one is needed. Removed the other.
There was a problem hiding this comment.
These are specific to a runner, shouldn't be here.
There was a problem hiding this comment.
Moved back to original location.
There was a problem hiding this comment.
Moved back to original location.
There was a problem hiding this comment.
This feels a bit out of place here, I'd move it back to where it was.
There was a problem hiding this comment.
Moved back to original location.
dedbe59 to
31eddd1
Compare
|
Heads up, in #10393 I updated the Go proto compiler version and rebuilt the generated code.You'll probably need to update the generated Go code before merging this (and make sure you update the protoc-gen-go version to the most recent one this time). If it says the generated code is using ProtoVersion3 instead of 2, then your change is good to go. |
31eddd1 to
dc3fd94
Compare
|
Run Python2_PVR_Flink PreCommit |
|
Run Flink ValidatesRunner |
|
Run Python PreCommit |
chamikaramj
left a comment
There was a problem hiding this comment.
Thanks. PTAL.
There was a problem hiding this comment.
Added an environment_id to WindowingStrategy for now (retaining the current behavior). This can be further simplified in the future if needed.
There was a problem hiding this comment.
Renamed to sdkTransformsWithEnvironment since we are trying to maintain current behavior in this PR. In the future I think we should set environment ID in all transforms by default and let runners override for naively implemented transforms. Created https://issues.apache.org/jira/browse/BEAM-9001 for this.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
This is not needed anymore. Removed.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
Ditto. Renamed the variable and created https://issues.apache.org/jira/browse/BEAM-9001.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
Ditto. Renamed the variable and created https://issues.apache.org/jira/browse/BEAM-9001.
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
Only one is needed. Removed the other.
There was a problem hiding this comment.
Moved back to original location.
There was a problem hiding this comment.
Moved back to original location.
There was a problem hiding this comment.
Moved back to original location.
|
Run PythonLint PreCommit |
|
Run Flink ValidatesRunner |
|
Run Spark ValidatesRunner |
|
Run Dataflow ValidatesRunner |
robertwb
left a comment
There was a problem hiding this comment.
Looks good to me, thanks!
sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
Outdated
Show resolved
Hide resolved
Removes SDKFunctionSpec and replaces all usages of it with FunctionSpec.
084d4d9 to
edff83f
Compare
Removes SDKFunctionSpec and replaces all usages of it with FunctionSpec.
Maintains the current behavior as much as possible. For example, added a list of known transform for which we'll be setting the environment (will leave the default value, empty string, for others). If needed, this behavior can be changed in a future change.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.