From c5a49a6af102e29c35f95b0217ff9009e3bf23bd Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 24 Aug 2017 11:01:20 -0700 Subject: [PATCH 1/2] Wrap unknown coders in LengthPrefixCoder. --- sdks/python/apache_beam/coders/coders.py | 10 ++ .../runners/portability/fn_api_runner.py | 94 +++++++++++++++++-- 2 files changed, 95 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index e204369b3103..10fb07b6a34f 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -707,6 +707,16 @@ def __eq__(self, other): def __hash__(self): return hash(self._coders) + def to_runner_api_parameter(self, context): + if self.is_kv_coder(): + return urns.KV_CODER, None, self.coders() + else: + return super(TupleCoder, self).to_runner_api_parameter(context) + + @Coder.register_urn(urns.KV_CODER, None) + def from_runner_api_parameter(unused_payload, components, unused_context): + return TupleCoder(components) + class TupleSequenceCoder(FastCoder): """Coder of homogeneous tuple objects.""" diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 7c0c06fe1110..41c49d401de4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -122,7 +122,7 @@ def process(self, source): class _GroupingBuffer(object): """Used to accumulate groupded (shuffled) results.""" def __init__(self, pre_grouped_coder, post_grouped_coder): - self._key_coder = pre_grouped_coder.value_coder().key_coder() + self._key_coder = pre_grouped_coder.key_coder() self._pre_grouped_coder = pre_grouped_coder self._post_grouped_coder = post_grouped_coder self._table = collections.defaultdict(list) @@ -249,13 +249,75 @@ def deduplicate_read(self): # Now define the "optimization" phases. + safe_coders = {} def expand_gbk(stages): """Transforms each GBK into a write followed by a read. """ + good_coder_urns = set(beam.coders.Coder._known_urns.keys()) - set([ + urns.PICKLED_CODER]) + coders = pipeline_components.coders + + for coder_id, coder_proto in coders.items(): + if coder_proto.spec.spec.urn == urns.BYTES_CODER: + bytes_coder_id = coder_id + else: + bytes_coder_id = unique_name(coders, 'bytes_coder') + pipeline_components.coders[bytes_coder_id].CopyFrom( + beam.coders.BytesCoder().to_runner_api(None)) + + coder_substitutions = {} + def wrap_unknown_coders(coder_id, with_bytes): + if (coder_id, with_bytes) not in coder_substitutions: + wrapped_coder_id = None + coder_proto = coders[coder_id] + if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER: + coder_substitutions[coder_id, with_bytes] = ( + bytes_coder_id if with_bytes else coder_id) + elif (coder_proto.spec.spec.urn in good_coder_urns + and not coder_proto.component_coder_ids): + # Use as is. + coder_substitutions[coder_id, with_bytes] = coder_id + else: + wrapped_coder_id = unique_name( + coders, coder_id + ("_bytes" if with_bytes else "_len_prefix")) + if coder_proto.spec.spec.urn in good_coder_urns: + # Must have components. + coders[wrapped_coder_id].CopyFrom(coder_proto) + coders[wrapped_coder_id].component_coder_ids[:] = [ + wrap_unknown_coders(c, with_bytes) + for c in coder_proto.component_coder_ids] + coder_substitutions[coder_id, with_bytes] = wrapped_coder_id + else: + # Not a known coder. + if with_bytes: + coder_substitutions[coder_id, with_bytes] = bytes_coder_id + else: + len_prefix_coder_proto = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=urns.LENGTH_PREFIX_CODER)), + component_coder_ids=[coder_id]) + coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto) + coder_substitutions[coder_id, with_bytes] = wrapped_coder_id + # This operation is idempotent. + if wrapped_coder_id in coders: + coder_substitutions[wrapped_coder_id, with_bytes] = wrapped_coder_id + return coder_substitutions[coder_id, with_bytes] + + def fix_pcoll_coder(pcoll): + new_coder_id = wrap_unknown_coders(pcoll.coder_id, False) + safe_coders[new_coder_id] = wrap_unknown_coders(pcoll.coder_id, True) + pcoll.coder_id = new_coder_id + for stage in stages: assert len(stage.transforms) == 1 transform = stage.transforms[0] if transform.spec.urn == urns.GROUP_BY_KEY_ONLY_TRANSFORM: + for pcoll_id in transform.inputs.values(): + fix_pcoll_coder(pipeline_components.pcollections[pcoll_id]) + for pcoll_id in transform.outputs.values(): + fix_pcoll_coder(pipeline_components.pcollections[pcoll_id]) + # This is used later to correlate the read and write. param = str("group:%s" % stage.name) gbk_write = Stage( @@ -547,9 +609,9 @@ def process(stage): logging.debug('Stages: %s', [str(s) for s in stages]) # Return the (possibly mutated) context and ordered set of stages. - return pipeline_components, stages + return pipeline_components, stages, safe_coders - def run_stages(self, pipeline_components, stages, direct=True): + def run_stages(self, pipeline_components, stages, safe_coders, direct=True): if direct: controller = FnApiRunner.DirectController() @@ -559,13 +621,15 @@ def run_stages(self, pipeline_components, stages, direct=True): try: pcoll_buffers = collections.defaultdict(list) for stage in stages: - self.run_stage(controller, pipeline_components, stage, pcoll_buffers) + self.run_stage( + controller, pipeline_components, stage, pcoll_buffers, safe_coders) finally: controller.close() return maptask_executor_runner.WorkerRunnerResult(PipelineState.DONE) - def run_stage(self, controller, pipeline_components, stage, pcoll_buffers): + def run_stage( + self, controller, pipeline_components, stage, pcoll_buffers, safe_coders): coders = pipeline_context.PipelineContext(pipeline_components).coders data_operation_spec = controller.data_operation_spec() @@ -666,10 +730,10 @@ def extract_endpoints(stage): original_gbk_transform] input_pcoll = only_element(transform_proto.inputs.values()) output_pcoll = only_element(transform_proto.outputs.values()) - pre_gbk_coder = coders[ - pipeline_components.pcollections[input_pcoll].coder_id] - post_gbk_coder = coders[ - pipeline_components.pcollections[output_pcoll].coder_id] + pre_gbk_coder = coders[safe_coders[ + pipeline_components.pcollections[input_pcoll].coder_id]] + post_gbk_coder = coders[safe_coders[ + pipeline_components.pcollections[output_pcoll].coder_id]] pcoll_buffers[pcoll_id] = _GroupingBuffer( pre_gbk_coder, post_gbk_coder) pcoll_buffers[pcoll_id].append(output.data) @@ -1000,3 +1064,15 @@ def close(self): def only_element(iterable): element, = iterable return element + + +def unique_name(existing, prefix): + if prefix in existing: + counter = 0 + while True: + counter += 1 + prefix_counter = prefix + "_%s" % counter + if prefix_counter not in existing: + return prefix_counter + else: + return prefix \ No newline at end of file From a2bf3e3b0bb005f8d8efe73a5ef7cb0cba16ce5d Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 25 Aug 2017 15:11:40 -0700 Subject: [PATCH 2/2] Further simplifications. --- .../runners/portability/fn_api_runner.py | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 41c49d401de4..c9b3d9a5e831 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -250,6 +250,7 @@ def deduplicate_read(self): # Now define the "optimization" phases. safe_coders = {} + def expand_gbk(stages): """Transforms each GBK into a write followed by a read. """ @@ -260,12 +261,14 @@ def expand_gbk(stages): for coder_id, coder_proto in coders.items(): if coder_proto.spec.spec.urn == urns.BYTES_CODER: bytes_coder_id = coder_id + break else: bytes_coder_id = unique_name(coders, 'bytes_coder') pipeline_components.coders[bytes_coder_id].CopyFrom( beam.coders.BytesCoder().to_runner_api(None)) coder_substitutions = {} + def wrap_unknown_coders(coder_id, with_bytes): if (coder_id, with_bytes) not in coder_substitutions: wrapped_coder_id = None @@ -273,34 +276,36 @@ def wrap_unknown_coders(coder_id, with_bytes): if coder_proto.spec.spec.urn == urns.LENGTH_PREFIX_CODER: coder_substitutions[coder_id, with_bytes] = ( bytes_coder_id if with_bytes else coder_id) - elif (coder_proto.spec.spec.urn in good_coder_urns - and not coder_proto.component_coder_ids): - # Use as is. - coder_substitutions[coder_id, with_bytes] = coder_id - else: - wrapped_coder_id = unique_name( - coders, coder_id + ("_bytes" if with_bytes else "_len_prefix")) - if coder_proto.spec.spec.urn in good_coder_urns: - # Must have components. + elif coder_proto.spec.spec.urn in good_coder_urns: + wrapped_components = [wrap_unknown_coders(c, with_bytes) + for c in coder_proto.component_coder_ids] + if wrapped_components == list(coder_proto.component_coder_ids): + # Use as is. + coder_substitutions[coder_id, with_bytes] = coder_id + else: + wrapped_coder_id = unique_name( + coders, + coder_id + ("_bytes" if with_bytes else "_len_prefix")) coders[wrapped_coder_id].CopyFrom(coder_proto) coders[wrapped_coder_id].component_coder_ids[:] = [ wrap_unknown_coders(c, with_bytes) for c in coder_proto.component_coder_ids] coder_substitutions[coder_id, with_bytes] = wrapped_coder_id + else: + # Not a known coder. + if with_bytes: + coder_substitutions[coder_id, with_bytes] = bytes_coder_id else: - # Not a known coder. - if with_bytes: - coder_substitutions[coder_id, with_bytes] = bytes_coder_id - else: - len_prefix_coder_proto = beam_runner_api_pb2.Coder( - spec=beam_runner_api_pb2.SdkFunctionSpec( - spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.LENGTH_PREFIX_CODER)), - component_coder_ids=[coder_id]) - coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto) - coder_substitutions[coder_id, with_bytes] = wrapped_coder_id + wrapped_coder_id = unique_name(coders, coder_id + "_len_prefix") + len_prefix_coder_proto = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=urns.LENGTH_PREFIX_CODER)), + component_coder_ids=[coder_id]) + coders[wrapped_coder_id].CopyFrom(len_prefix_coder_proto) + coder_substitutions[coder_id, with_bytes] = wrapped_coder_id # This operation is idempotent. - if wrapped_coder_id in coders: + if wrapped_coder_id: coder_substitutions[wrapped_coder_id, with_bytes] = wrapped_coder_id return coder_substitutions[coder_id, with_bytes] @@ -1075,4 +1080,4 @@ def unique_name(existing, prefix): if prefix_counter not in existing: return prefix_counter else: - return prefix \ No newline at end of file + return prefix