From 5467e95fc401db25713ef0982efe27a050657800 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 21 Jun 2017 18:09:48 -0700 Subject: [PATCH 1/2] Java Dataflow runner harness compatibility. --- .../runners/portability/fn_api_runner.py | 6 ++++- .../apache_beam/runners/worker/sdk_worker.py | 26 ++++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index dabb7d687db5..6074c2425f12 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -17,6 +17,7 @@ """A PipelineRunner using the SDK harness. """ +import base64 import collections import json import logging @@ -204,11 +205,14 @@ def get_outputs(op_ix): else: # Otherwise serialize the source and execute it there. # TODO: Use SDFs with an initial impulse. + # The Java runner harness strips the base64 encoding. do the same + # here until we get the same thign back that we sent in. transform_spec = beam_runner_api_pb2.FunctionSpec( urn=sdk_worker.PYTHON_SOURCE_URN, parameter=proto_utils.pack_Any( wrappers_pb2.BytesValue( - value=pickler.dumps(operation.source.source)))) + value=base64.b64decode( + pickler.dumps(operation.source.source))))) elif isinstance(operation, operation_specs.WorkerDoFn): # Record the contents of each side input for access via the state api. diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index fd7ecc4325a8..684d2de87abd 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -21,6 +21,7 @@ from __future__ import division from __future__ import print_function +import base64 import collections import json import logging @@ -195,7 +196,7 @@ def pack_function_spec_data(value, urn, id=None): # pylint: enable=redefined-builtin -# TODO(vikasrk): move this method to ``coders.py`` in the SDK. +# TODO(vikasrk): Consistently use same format everywhere. def load_compressed(compressed_data): """Returns a decompressed and deserialized python object.""" # Note: SDK uses ``pickler.dumps`` to serialize certain python objects @@ -259,6 +260,10 @@ def process_requests(): try: response = self.worker.do_instruction(work_request) except Exception: # pylint: disable=broad-except + logging.error( + 'Error processing instruction %s', + work_request.instruction_id, + exc_info=True) response = beam_fn_api_pb2.InstructionResponse( instruction_id=work_request.instruction_id, error=traceback.format_exc()) @@ -319,10 +324,10 @@ def initial_source_split(self, request, unused_instruction_id=None): return response def create_execution_tree(self, descriptor): - if descriptor.primitive_transform: - return self.create_execution_tree_from_fn_api(descriptor) - else: + if descriptor.transforms: return self.create_execution_tree_from_runner_api(descriptor) + else: + return self.create_execution_tree_from_fn_api(descriptor) def create_execution_tree_from_runner_api(self, descriptor): # TODO(robertwb): Figure out the correct prefix to use for output counters @@ -551,7 +556,15 @@ def create_operation(self, transform_id, consumers): return creator(self, transform_id, transform_proto, parameter, consumers) def get_coder(self, coder_id): - return self.context.coders.get_by_id(coder_id) + coder_proto = self.descriptor.codersyyy[coder_id] + if coder_proto.spec.spec.urn: + return self.context.coders.get_by_id(coder_id) + else: + # No URN, assume cloud object encoding json bytes. + return operation_specs.get_coder_from_spec( + json.loads( + proto_utils.unpack_Any(coder_proto.spec.spec.parameter, + wrappers_pb2.BytesValue).value)) def get_output_coders(self, transform_proto): return { @@ -618,7 +631,8 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers): @BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue) def create(factory, transform_id, transform_proto, parameter, consumers): - source = pickler.loads(parameter.value) + # The Java runner harness strips the base64 encoding. + source = pickler.loads(base64.b64encode(parameter.value)) spec = operation_specs.WorkerRead( iobase.SourceBundle(1.0, source, None, None), [WindowedValueCoder(source.default_output_coder())]) From 5110b91eb19ecc16c222cb1d37cde235793e998f Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 22 Jun 2017 12:33:26 -0700 Subject: [PATCH 2/2] fixup: comments --- sdks/python/apache_beam/runners/portability/fn_api_runner.py | 4 ++-- sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 6074c2425f12..a27e29369698 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -205,8 +205,8 @@ def get_outputs(op_ix): else: # Otherwise serialize the source and execute it there. # TODO: Use SDFs with an initial impulse. - # The Java runner harness strips the base64 encoding. do the same - # here until we get the same thign back that we sent in. + # The Dataflow runner harness strips the base64 encoding. do the same + # here until we get the same thing back that we sent in. transform_spec = beam_runner_api_pb2.FunctionSpec( urn=sdk_worker.PYTHON_SOURCE_URN, parameter=proto_utils.pack_Any( diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 684d2de87abd..a2c9f424bbf3 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -631,7 +631,7 @@ def create(factory, transform_id, transform_proto, grpc_port, consumers): @BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue) def create(factory, transform_id, transform_proto, parameter, consumers): - # The Java runner harness strips the base64 encoding. + # The Dataflow runner harness strips the base64 encoding. source = pickler.loads(base64.b64encode(parameter.value)) spec = operation_specs.WorkerRead( iobase.SourceBundle(1.0, source, None, None),