New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7824] Sets a default environment for Dataflow runner #9165
Conversation
Run Python PostCommit |
Run Python 2 PostCommit |
@@ -952,7 +969,8 @@ def _get_required_container_version(job_type=None): | |||
current version of the SDK. | |||
""" | |||
if 'dev' in beam_version.__version__: | |||
if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': | |||
if (job_type == JOB_TYPE_PYTHON_BATCH or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python-batch or python-fnapi-streaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
@@ -927,7 +944,7 @@ def get_default_container_image_for_current_sdk(job_type): | |||
% str(sys.version_info[0:2])) | |||
|
|||
# TODO(tvalentyn): Use enumerated type instead of strings for job types. | |||
if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': | |||
if job_type == JOB_TYPE_PYTHON_BATCH or job_type == JOB_TYPE_FNAPI_STREAMING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FNAPI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
from apache_beam.portability.api import beam_runner_api_pb2 | ||
default_container_image = ( | ||
apiclient.get_default_container_image_for_current_sdk( | ||
apiclient.get_job_type(options))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This value is only used for FnApi. Additionally, FnApi doesn't have different containers for streaming vs. batch. It's too bad we have to go through the (irrelevant for portability) job type to get this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated function to take a bool (use_fnapi) instead of job type.
Thanks Robert. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just one minor comment.
|
||
def proto(self): | ||
"""Runner API payload for a `PTransform`""" | ||
return self._proto | ||
|
||
def to_runner_api(self, context, has_parts=False): | ||
id_to_proto_map = self._context.environments.get_id_to_proto_map() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit fragile in that we're assuming that if the ids match, the protos match (which could be bad for auto-generated names like env0). Could you add a check for this.
It also seems that we're copying too much (every environment from the context, not just the one(s) referenced from this proto), but perhaps there's no good way to get around that. Could you at least add a TODO referencing the JIRA about making environment a top-level attribute of transforms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check for environments.
Created https://issues.apache.org/jira/browse/BEAM-7850 for making Environment a top level attribute of PTransform.
1568d2f
to
80a99ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
|
||
def proto(self): | ||
"""Runner API payload for a `PTransform`""" | ||
return self._proto | ||
|
||
def to_runner_api(self, context, has_parts=False): | ||
id_to_proto_map = self._context.environments.get_id_to_proto_map() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check for environments.
Created https://issues.apache.org/jira/browse/BEAM-7850 for making Environment a top level attribute of PTransform.
Run Python_PVR_Flink PreCommit |
Flink failure seems to be unrelated. |
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.