Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs and propagate restriction coders. #11185

Merged
merged 7 commits into from Apr 6, 2020

Conversation

chamikaramj
Copy link
Contributor

@chamikaramj chamikaramj commented Mar 20, 2020

These are needed for for runners that need to build a Python object graph from a runner API proto with external transforms (for example, Dataflow).

Testing - cross-language test suite [1] works for Dataflow with these changes (will be enabled separately).

[1] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L51


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@chamikaramj
Copy link
Contributor Author

cc: @robertwb @lukecwik

Still addressing some unit test failures but sharing for any early comments.

raise ValueError((
'Expected an instance of ElementTypeHolder'
', but got a %s' % typehint))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yapf -ir path/to/files/...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

]
side_inputs = [si for _, si in sorted(indexed_side_inputs)]
if python_indexed_side_inputs:
# Ordering is important here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all seems rather fragile. Would it be possible to just make side_inputs a dict everywhere in the internal SDK representation? (Or is there introspection with the legacy worker code that would make this hard?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly reverted some of the code here to preserve the old behavior for Python. New changes are basically to preserve input tags for remote SDKs instead of letting Python override tags (which breaks Java).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is still has to work around the existing ugliness, but looks much better now.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R: @robertwb

Thanks Robert. PTAL.

@chamikaramj
Copy link
Contributor Author

Run XVR_Flink PostCommit

@chamikaramj
Copy link
Contributor Author

Run XVR_Spark PostCommit

@chamikaramj
Copy link
Contributor Author

Run Python2_PVR_Flink PreCommit

@chamikaramj
Copy link
Contributor Author

All tests pass now. Thanks.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. It would be helpful if you broke the separate fixes into separate commits (e.g. via git commit -p, or at least enumerated them in the description. "Some generalizations" doesn't give much context as to what the specific changes you're trying to make here are. Thanks.


def proto(self):
return self._proto
coder_count = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this referenced half a dozen times below. Perhaps encapsulate the increment+return in a method _unique_id or similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just added for logging/debugging. Removed.

class RunnerAPICoderHolder(Coder):
class ElementTypeHolder(typehints.TypeConstraint):
"""A dummy element type for external coders that cannot be parsed in Python"""
def __init__(self, coder, context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/coder/coder_proto/ to be clear what it is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

kind_str = 'kind:external' + str(ExternalCoder.coder_count)
ExternalCoder.coder_count = ExternalCoder.coder_count + 1
component_encodings = []
if coder_proto.spec.urn == 'beam:coder:kv:v1':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about correctly handling kind:stream? Perhaps others? It seems it would be preferable to instead try to only use external_coder for the unknown leafs rather than duplicate this special-dataflow logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seems to be needed for now (at least to get the test suite working). So I added a TODO here. Also, note that 'kind:extenal' was used to represent Java only Void coder here, so we'll probably continue to need that even if we handle 'kind:stream' here. Lemme know if you think this is inadequate for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I anticipate kind:stream will be needed to handle GBK of unknown types. Others may be needed for other cases, or in the future, and it seems risky to enumerate them here and in the dataflow runner. There may also be cases where we have to go more than one level deep. We should try to return the same thing the external SDK would have returned just to be safe, and that means wrapping only the leaves as external coders. I think that'll clean stuff up as well (e.g. no need for _coerce_to_kv_type_from_external_type).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned this up. Now we only handle unknown types here.


def to_runner_api_parameter(self, context):
if self.element_type_holder.coder.component_coder_ids:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why raise NotImplementedError here instead of returning this in place of the empty tuple below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for component_coder_id in coder_proto.component_coder_ids:
component_coder_proto = (
old_context.coders.get_id_to_proto_map()[component_coder_id])
new_context.coders.get_id_to_proto_map()[component_coder_id] = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new_context.put_proto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self, new_context, old_context, coder_proto):
for component_coder_id in coder_proto.component_coder_ids:
component_coder_proto = (
old_context.coders.get_id_to_proto_map()[component_coder_id])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add a get_proto(id) method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self._recursively_add_external_coders(
context, self.element_type.context, coder_proto)
else:
coder_id = context.coder_id_from_element_type(self.element_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this work generically rather than having an isinstance(self.element_type, ElementTypeHolder) check above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this'd require populating the map above with actual Coders as well as their protos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -208,9 +233,23 @@ def from_runner_api(proto, context):
# deserialization. It will be populated soon after this call, in
# Pipeline.from_runner_api(). This brief period is the only time that
# PCollection.pipeline is allowed to be None.

try:
element_type = context.element_type_from_coder_id(proto.coder_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have this method do the ExternalCoder stuff rather than catching an exception here. Possibly it would not even be needed if the coders map is properly populated beforehand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -380,7 +381,12 @@ def visit_transform(self, transform_node):
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import Flatten
if isinstance(transform_node.transform, Flatten):
output_pcoll = transform_node.outputs[None]
from apache_beam.runners.portability.fn_api_runner.translations \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to use this, let's put it in a shared utils file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation seems to be pretty straightforward. So just added a version to this file.

import only_element
output_pcoll = (
transform_node.outputs[None] if None in transform_node.outputs
else only_element(transform_node.outputs.values()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only_element is fine to use everywhere, no need to branch on the key being None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@chamikaramj chamikaramj changed the title [BEAM-8019] Some generalizations to support cross-language transforms. [BEAM-8019] Updates Python SDK to handle pass-through coders from remote SDKs during expansion Mar 27, 2020
@chamikaramj chamikaramj changed the title [BEAM-8019] Updates Python SDK to handle pass-through coders from remote SDKs during expansion [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs Mar 27, 2020
Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

I updated the description but let me know if you need additional details.

class RunnerAPICoderHolder(Coder):
class ElementTypeHolder(typehints.TypeConstraint):
"""A dummy element type for external coders that cannot be parsed in Python"""
def __init__(self, coder, context):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def proto(self):
return self._proto
coder_count = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just added for logging/debugging. Removed.


def to_runner_api_parameter(self, context):
if self.element_type_holder.coder.component_coder_ids:
raise NotImplementedError
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self, new_context, old_context, coder_proto):
for component_coder_id in coder_proto.component_coder_ids:
component_coder_proto = (
old_context.coders.get_id_to_proto_map()[component_coder_id])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for component_coder_id in coder_proto.component_coder_ids:
component_coder_proto = (
old_context.coders.get_id_to_proto_map()[component_coder_id])
new_context.coders.get_id_to_proto_map()[component_coder_id] = (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -208,9 +233,23 @@ def from_runner_api(proto, context):
# deserialization. It will be populated soon after this call, in
# Pipeline.from_runner_api(). This brief period is the only time that
# PCollection.pipeline is allowed to be None.

try:
element_type = context.element_type_from_coder_id(proto.coder_id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self._recursively_add_external_coders(
context, self.element_type.context, coder_proto)
else:
coder_id = context.coder_id_from_element_type(self.element_type)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import only_element
output_pcoll = (
transform_node.outputs[None] if None in transform_node.outputs
else only_element(transform_node.outputs.values()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -380,7 +381,12 @@ def visit_transform(self, transform_node):
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import Flatten
if isinstance(transform_node.transform, Flatten):
output_pcoll = transform_node.outputs[None]
from apache_beam.runners.portability.fn_api_runner.translations \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation seems to be pretty straightforward. So just added a version to this file.

kind_str = 'kind:external' + str(ExternalCoder.coder_count)
ExternalCoder.coder_count = ExternalCoder.coder_count + 1
component_encodings = []
if coder_proto.spec.urn == 'beam:coder:kv:v1':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seems to be needed for now (at least to get the test suite working). So I added a TODO here. Also, note that 'kind:extenal' was used to represent Java only Void coder here, so we'll probably continue to need that even if we handle 'kind:stream' here. Lemme know if you think this is inadequate for this PR.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've finished going through the full set of changes.

@@ -370,7 +370,8 @@ def from_runner_api(cls, coder_proto, context):
except Exception:
if context.allow_proto_holders:
# ignore this typing scenario for now, since it can't be easily tracked
return RunnerAPICoderHolder(coder_proto) # type: ignore
return ExternalCoder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not do this for arbitrary exceptions, instead let's do this iff coder_proto.spec.urn is not in cls._known_urns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think it'd be cleaner to just put this if instead of the try, and not have a broadly-catching "except Exception" block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

kind_str = 'kind:external' + str(ExternalCoder.coder_count)
ExternalCoder.coder_count = ExternalCoder.coder_count + 1
component_encodings = []
if coder_proto.spec.urn == 'beam:coder:kv:v1':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I anticipate kind:stream will be needed to handle GBK of unknown types. Others may be needed for other cases, or in the future, and it seems risky to enumerate them here and in the dataflow runner. There may also be cases where we have to go more than one level deep. We should try to return the same thing the external SDK would have returned just to be safe, and that means wrapping only the leaves as external coders. I think that'll clean stuff up as well (e.g. no need for _coerce_to_kv_type_from_external_type).

@@ -1102,6 +1105,10 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform]
(transform_urn in Pipeline.sdk_transforms_with_environment())):
environment_id = context.default_environment_id()

def _may_be_preserve_tag(new_tag, pc, input_tags_to_preserve):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe is one word.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# type: (str) -> bool
# As per named_inputs() above.
return tag.startswith('side')
return re.match(SIDE_INPUT_REGEX, tag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll break if a java user names their side inputs coincidentally something that matches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the Python side input prefix from 'side' to 'python_side_input' (SIDE_INPUT_PREFIX) to make this tighter. This is still not 100% fail safe (an external SDK might still start side input tags with 'python_side_input') but I don't think this will be an issue in practice.

@@ -1110,7 +1117,8 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform]
for part in self.parts
],
inputs={
tag: context.pcollections.get_id(pc)
_may_be_preserve_tag(tag, pc, self.input_tags_to_preserve):
context.pcollections.get_id(pc)
for tag,
pc in sorted(self.named_inputs().items())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're here, did yapf not like this on the previous line? Maybe write (tag, pc) (unless it tries to split that too).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# parsed in Python SDK.
if isinstance(element_type, ElementTypeHolder):
coder = self._get_encoding_for_external_environment(element_type)
if window_value:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't duplicate this logic, instead augment _get_typehint_based_encoding (well, really coders.registry.get_coder) to do the natural thing for ElementTypeHolders.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -925,8 +970,11 @@ def run_ParDo(self, transform_node, options):
(transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
use_unified_worker)):
# Patch side input ids to be unique across a given pipeline.
if (label_renames and
transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
# This should not be done for external transforms since external SDKs may
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems cleaner to simply not populate the label_renames map in this case rather than add a branch here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -995,10 +1044,13 @@ def run_ParDo(self, transform_node, options):
# The assumption here is that all outputs will have the same typehint
# and coder as the main output. This is certainly the case right now
# but conceivably it could change in the future.
encoding = self._get_encoded_output_coder(
transform_node,
output_tag=side_tag) if external_transform else step.encoding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to branch on external_transform her?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to. Previously we were using same encoding for all outputs for Python but seems like we can just generalize this without issues.

# type: (str, message.Message) -> str
if id in self._id_to_proto:
if not ignore_duplicates and id in self._id_to_proto:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ignore_duplicates=True, should we ensure that the protos are the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1383,22 +1384,69 @@ def from_runner_api_parameter(payload, components, context):
write_state_threshold=int(payload))


class RunnerAPICoderHolder(Coder):
class ElementTypeHolder(typehints.TypeConstraint):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an ExternalElementType?

Even better, perhaps this should be a CoderElementType that holds a Coder (external or not), and then an ExternalCoder would be a Coder that holds a proto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@robertwb robertwb changed the title [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs [BEAM-8019] Updates Python SDK to handle remote SDK coders and preserve tags added by remote SDKs and propagate restriction coders. Mar 27, 2020
@chamikaramj chamikaramj force-pushed the xlang_generalizations branch 2 times, most recently from e88eba0 to e5dfdc9 Compare April 2, 2020 01:25
Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Robert. PTAL.

@@ -370,7 +370,8 @@ def from_runner_api(cls, coder_proto, context):
except Exception:
if context.allow_proto_holders:
# ignore this typing scenario for now, since it can't be easily tracked
return RunnerAPICoderHolder(coder_proto) # type: ignore
return ExternalCoder(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1383,22 +1384,69 @@ def from_runner_api_parameter(payload, components, context):
write_state_threshold=int(payload))


class RunnerAPICoderHolder(Coder):
class ElementTypeHolder(typehints.TypeConstraint):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1102,6 +1105,10 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform]
(transform_urn in Pipeline.sdk_transforms_with_environment())):
environment_id = context.default_environment_id()

def _may_be_preserve_tag(new_tag, pc, input_tags_to_preserve):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

payload = (
proto_utils.parse_Bytes(
proto.spec.payload, beam_runner_api_pb2.ParDoPayload))
for tag, si in payload.side_inputs.items():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

side_inputs = [si for _, si in sorted(indexed_side_inputs)]

input_tags_to_preserve = {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1110,7 +1117,8 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform]
for part in self.parts
],
inputs={
tag: context.pcollections.get_id(pc)
_may_be_preserve_tag(tag, pc, self.input_tags_to_preserve):
context.pcollections.get_id(pc)
for tag,
pc in sorted(self.named_inputs().items())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

side_input_tags = []
if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
# Preserving side input tags.
from apache_beam.utils import proto_utils
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved proto_utils but left beam_runner_api_pb2 to prevent adding a dependency that to all runners (seems like this is imported similarly in other places in this file I suspect for the same reason).

raise Exception(
'Expected to receive an object of type ElementTypeHolder '
'but received %r' % element)
from apache_beam.coders.coders import ExternalCoder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

If output_tag is not specified, we assume all outputs to have the same
encoding.
"""
from apache_beam.transforms.core import RunnerAPIPTransformHolder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

kind_str = 'kind:external' + str(ExternalCoder.coder_count)
ExternalCoder.coder_count = ExternalCoder.coder_count + 1
component_encodings = []
if coder_proto.spec.urn == 'beam:coder:kv:v1':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned this up. Now we only handle unknown types here.

@chamikaramj
Copy link
Contributor Author

Run Python PreCommit

@chamikaramj
Copy link
Contributor Author

Run Portable_Python PreCommit

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting there; pretty minor comments at this point.

@@ -370,7 +370,8 @@ def from_runner_api(cls, coder_proto, context):
except Exception:
if context.allow_proto_holders:
# ignore this typing scenario for now, since it can't be easily tracked
return RunnerAPICoderHolder(coder_proto) # type: ignore
return ExternalCoder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think it'd be cleaner to just put this if instead of the try, and not have a broadly-catching "except Exception" block.

class CoderElementType(typehints.TypeConstraint):
"""An element type that just holds a coder proto."""
def __init__(self, coder_proto, context):
self.coder_proto = coder_proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically the ElementType should hold the ExternalCoder, and the ExternalCoder should hold the proto. (This will also help some issues where coders are requested manually and then not respected.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for ix, si in enumerate(self.side_inputs)
}
side_inputs = {(SIDE_INPUT_PREFIX + '%s') % ix: si.pvalue
for ix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put (ix, si) to help yapf do the right thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1107,6 +1111,10 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform]
(transform_urn in Pipeline.sdk_transforms_with_environment())):
environment_id = context.default_environment_id()

def _maybe_preserve_tag(new_tag, pc, input_tags_to_preserve):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO(BEAM-1833) to get rid of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

]
side_inputs = [si for _, si in sorted(indexed_side_inputs)]
if python_indexed_side_inputs:
# Ordering is important here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is still has to work around the existing ugliness, but looks much better now.

if isinstance(transform, RunnerAPIPTransformHolder):
# For external transforms that are ParDos, we have to set side-inputs
# manually and preserve input tags.
transform.side_inputs = [pvalue.AsMultiMap(pc) for pc in side_inputs]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's out of date: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L395

We should be able to use pvalue.SideInputData.from_runner_api here.

for tag,
id in proto.inputs.items()
}
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does transform.side_inputs in this path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's done in respective from_runner_api methods. For example,
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1361

But RunnerAPIPTransformHolder is a holder type that is constructed directly so setting it here seemed like the best option (we may be able to move this to constructor but that would require passing in additional parameters to get access to inputs etc. since using Python only pvalue.SideInputData.from_runner_api is not an option).

None if None in transform_node.outputs.keys() else only_element(
transform_node.outputs.keys()))
None if None in transform_node.outputs.keys() else
DataflowRunner._only_element(transform_node.outputs.keys()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditional is a tautology; just return _only_element(transform_node.outputs.keys()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I assume this is because there can be only one output here.

only_element
if len(transform_node.outputs) == 1:
output_tag = only_element(transform_node.outputs.keys())
external_transform = isinstance(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is_external_transform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1180,6 +1180,28 @@ def get_yielded_type(type_hint):
raise ValueError('%s is not iterable' % type_hint)


def _coerce_to_kv_type_from_external_type(element_type_holder):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. Removed :)

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

@@ -370,7 +370,8 @@ def from_runner_api(cls, coder_proto, context):
except Exception:
if context.allow_proto_holders:
# ignore this typing scenario for now, since it can't be easily tracked
return RunnerAPICoderHolder(coder_proto) # type: ignore
return ExternalCoder(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

class CoderElementType(typehints.TypeConstraint):
"""An element type that just holds a coder proto."""
def __init__(self, coder_proto, context):
self.coder_proto = coder_proto
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for ix, si in enumerate(self.side_inputs)
}
side_inputs = {(SIDE_INPUT_PREFIX + '%s') % ix: si.pvalue
for ix,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1107,6 +1111,10 @@ def transform_to_runner_api(transform, # type: Optional[ptransform.PTransform]
(transform_urn in Pipeline.sdk_transforms_with_environment())):
environment_id = context.default_environment_id()

def _maybe_preserve_tag(new_tag, pc, input_tags_to_preserve):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for tag,
id in proto.inputs.items()
}
else:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's done in respective from_runner_api methods. For example,
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1361

But RunnerAPIPTransformHolder is a holder type that is constructed directly so setting it here seemed like the best option (we may be able to move this to constructor but that would require passing in additional parameters to get access to inputs etc. since using Python only pvalue.SideInputData.from_runner_api is not an option).

if isinstance(transform, RunnerAPIPTransformHolder):
# For external transforms that are ParDos, we have to set side-inputs
# manually and preserve input tags.
transform.side_inputs = [pvalue.AsMultiMap(pc) for pc in side_inputs]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately 'pvalue.SideInputData.from_runner_api' is Python only and fails for external side inputs: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pvalue.py#L463

So enumerated the two supported types here and added a TODO.

None if None in transform_node.outputs.keys() else only_element(
transform_node.outputs.keys()))
None if None in transform_node.outputs.keys() else
DataflowRunner._only_element(transform_node.outputs.keys()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I assume this is because there can be only one output here.

only_element
if len(transform_node.outputs) == 1:
output_tag = only_element(transform_node.outputs.keys())
external_transform = isinstance(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1180,6 +1180,28 @@ def get_yielded_type(type_hint):
raise ValueError('%s is not iterable' % type_hint)


def _coerce_to_kv_type_from_external_type(element_type_holder):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. Removed :)

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. LGTM.

@robertwb robertwb merged commit 58c51bf into apache:master Apr 6, 2020
@chamikaramj
Copy link
Contributor Author

Thanks Robert.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants