New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2927] Python support for portable side inputs over Fn API #4781
Conversation
200a039
to
e51bdb9
Compare
R: @tvalentyn |
Nice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; the only major comment - how well is the new code covered by existing tests? Do we need to add test coverage?
At sdks/python/apache_beam/typehints/typehints.py:716:
Internally, KV[X, Y] proxies to Tuple[X, Y]. A KV type-hint accepts only
nit: drop accepts only.
from apache_beam.transforms.core import ParDo | ||
|
||
class SideInputVisitor(PipelineVisitor): | ||
"""Ensures input `PCollection` used as a side inputs have a `KV` type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: grammar
Ensures input PCollection
used as a side input has a KV
type. ?
Also: s/SDk/SDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pipeline, | ||
element_type=typehints.KV[ | ||
str, side_input.pvalue.element_type]) | ||
parent = transform_node.parent or pipeline._root_transform() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why is root_transform not defined as a default parent for transforms that don't have a parent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good question, but I did run into this case. (Probably a bug, filed BEAM-3871 for follow-up.)
new_side_inputs = [] | ||
for ix, side_input in enumerate(transform_node.side_inputs): | ||
access_pattern = side_input._side_input_data().access_pattern | ||
if access_pattern == common_urns.ITERABLE_SIDE_INPUT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the branches here currently have any test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like external test coverage for DataflowRunner is pretty sparse... I added a unit test of this explicitly (and also tested manually against the Dataflow runner).
@@ -1098,3 +1098,36 @@ def is_consistent_with(sub, base): | |||
# Nothing but object lives above any type constraints. | |||
return base == object | |||
return issubclass(sub, base) | |||
|
|||
|
|||
def coerce_to_kv_type(element_type, label=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the branches here currently have any test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, and sorry for the delay getting back to this. PTAL.
pipeline, | ||
element_type=typehints.KV[ | ||
str, side_input.pvalue.element_type]) | ||
parent = transform_node.parent or pipeline._root_transform() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good question, but I did run into this case. (Probably a bug, filed BEAM-3871 for follow-up.)
from apache_beam.transforms.core import ParDo | ||
|
||
class SideInputVisitor(PipelineVisitor): | ||
"""Ensures input `PCollection` used as a side inputs have a `KV` type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -1098,3 +1098,36 @@ def is_consistent_with(sub, base): | |||
# Nothing but object lives above any type constraints. | |||
return base == object | |||
return issubclass(sub, base) | |||
|
|||
|
|||
def coerce_to_kv_type(element_type, label=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they do.
new_side_inputs = [] | ||
for ix, side_input in enumerate(transform_node.side_inputs): | ||
access_pattern = side_input._side_input_data().access_pattern | ||
if access_pattern == common_urns.ITERABLE_SIDE_INPUT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like external test coverage for DataflowRunner is pretty sparse... I added a unit test of this explicitly (and also tested manually against the Dataflow runner).
3d74d37
to
99f1ebf
Compare
Revert #4781 which broke Python postsubmits
This reverts commit 1f681bb.
This reverts commit 1f681bb.
DESCRIPTION HERE
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.