New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2937] Basic PGBK combiner lifting. #4290
Conversation
R: @tvalentyn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, although I don't have a lot of context on model internals that this CL touches.
At sdks/python/apache_beam/runners/portability/fn_api_runner.py:173:
def __init__(self, use_grpc=False, sdk_harness_factory=None):
Do we want to document it in the docstring what objects are expected to be passed as sdk_harness_factory?
@BeamTransformFactory.register_urn( | ||
urns.PRECOMBINE_TRANSFORM, beam_runner_api_pb2.CombinePayload) | ||
def create(factory, transform_id, transform_proto, payload, consumers): | ||
# TODO: Combine side inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to link a JIRA with a description of a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're actually talking about removing this in Java, so I don't think it's worth surfacing there for now. The SDK API doesn't even support this for now, this is mostly a marker for if this ever gets added. Changed to a normal comment.
|
||
... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ... | ||
""" | ||
def new_id(existing_ids, prefix=''): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like similar to unique_name at the end of the file. Shall we dedup them or if not, move helpers closer together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done.
|
||
windowed_input_coder = pipeline_components.coders[ | ||
input_pcoll.coder_id] | ||
window_coder_id = windowed_input_coder.component_coder_ids[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What determines the order of objects in windowed_input_coder.component_coder_ids ?
Perhaps this should be encapsulated in some helper method that is aware of [1], [0] indexes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the (albeit undocumented) spec of the various coder urns.
I've changed this to tuple unpacking which, though technically the same, I think makes things more understandable.
windowed_input_coder = pipeline_components.coders[ | ||
input_pcoll.coder_id] | ||
window_coder_id = windowed_input_coder.component_coder_ids[1] | ||
input_coder = pipeline_components.coders[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add a helper variable here something like:
input_coder_id = windowed_input_coder.component_coder_ids[0]
input_coder = pipeline_components.coders[input_coder_id]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this as part of the above change :).
transform.spec.payload, beam_runner_api_pb2.CombinePayload) | ||
|
||
input_pcoll = pipeline_components.pcollections[only_element( | ||
transform.inputs.values())] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does transform.inputs.values() always return one value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does for COMBINE_PER_KEY_TRANSFORM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, PTAL.
|
||
... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ... | ||
""" | ||
def new_id(existing_ids, prefix=''): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done.
transform.spec.payload, beam_runner_api_pb2.CombinePayload) | ||
|
||
input_pcoll = pipeline_components.pcollections[only_element( | ||
transform.inputs.values())] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does for COMBINE_PER_KEY_TRANSFORM.
|
||
windowed_input_coder = pipeline_components.coders[ | ||
input_pcoll.coder_id] | ||
window_coder_id = windowed_input_coder.component_coder_ids[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the (albeit undocumented) spec of the various coder urns.
I've changed this to tuple unpacking which, though technically the same, I think makes things more understandable.
windowed_input_coder = pipeline_components.coders[ | ||
input_pcoll.coder_id] | ||
window_coder_id = windowed_input_coder.component_coder_ids[1] | ||
input_coder = pipeline_components.coders[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this as part of the above change :).
@BeamTransformFactory.register_urn( | ||
urns.PRECOMBINE_TRANSFORM, beam_runner_api_pb2.CombinePayload) | ||
def create(factory, transform_id, transform_proto, payload, consumers): | ||
# TODO: Combine side inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're actually talking about removing this in Java, so I don't think it's worth surfacing there for now. The SDK API doesn't even support this for now, this is mostly a marker for if this ever gets added. Changed to a normal comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.