From 874d7aca41981089fb82f9447c0aabc417d4f9ef Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 8 Nov 2017 16:24:38 -0800 Subject: [PATCH 1/2] Remove obsolete dependence of FnApiRunner on MapTaskExecutorRunner. --- .../runners/portability/fn_api_runner.py | 234 +----------------- .../runners/portability/fn_api_runner_test.py | 10 +- 2 files changed, 11 insertions(+), 233 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 838ce1ea78d4..174019baa124 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -40,8 +40,7 @@ from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import pipeline_context -from apache_beam.runners.portability import maptask_executor_runner -from apache_beam.runners.runner import PipelineState +from apache_beam.runners import runner from apache_beam.runners.worker import bundle_processor from apache_beam.runners.worker import data_plane from apache_beam.runners.worker import operation_specs @@ -185,7 +184,7 @@ def items(self): yield encoded_window, output_stream.get() -class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): +class FnApiRunner(runner.PipelineRunner): def __init__(self, use_grpc=False, sdk_harness_factory=None): super(FnApiRunner, self).__init__() @@ -195,19 +194,13 @@ def __init__(self, use_grpc=False, sdk_harness_factory=None): raise ValueError('GRPC must be used if a harness factory is provided.') self._sdk_harness_factory = sdk_harness_factory - def has_metrics_support(self): - return False - def _next_uid(self): self._last_uid += 1 return str(self._last_uid) def run(self, pipeline): - MetricsEnvironment.set_metrics_supported(self.has_metrics_support()) - if pipeline._verify_runner_api_compatible(): - return self.run_via_runner_api(pipeline.to_runner_api()) - else: - return super(FnApiRunner, self).run(pipeline) + MetricsEnvironment.set_metrics_supported(False) + return self.run_via_runner_api(pipeline.to_runner_api()) def run_via_runner_api(self, pipeline_proto): return self.run_stages(*self.create_stages(pipeline_proto)) @@ -680,7 +673,7 @@ def run_stages(self, pipeline_components, stages, safe_coders): finally: controller.close() - return RunnerResult(PipelineState.DONE, metrics_by_stage) + return RunnerResult(runner.PipelineState.DONE, metrics_by_stage) def run_stage( self, controller, pipeline_components, stage, pcoll_buffers, safe_coders): @@ -817,218 +810,6 @@ def extract_endpoints(stage): raise NotImplementedError(pcoll_id) return result - # This is the "old" way of executing pipelines. - # TODO(robertwb): Remove once runner API supports side inputs. - - def _map_task_registration(self, map_task, state_handler, - data_operation_spec): - input_data, side_input_data, runner_sinks, process_bundle_descriptor = ( - self._map_task_to_protos(map_task, data_operation_spec)) - # Side inputs will be accessed over the state API. - for key, elements_data in side_input_data.items(): - state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=key) - state_handler.Clear(state_key) - state_handler.Append(state_key, [elements_data]) - return beam_fn_api_pb2.InstructionRequest( - instruction_id=self._next_uid(), - register=beam_fn_api_pb2.RegisterRequest( - process_bundle_descriptor=[process_bundle_descriptor]) - ), runner_sinks, input_data - - def _map_task_to_protos(self, map_task, data_operation_spec): - input_data = {} - side_input_data = {} - runner_sinks = {} - - context = pipeline_context.PipelineContext() - transform_protos = {} - used_pcollections = {} - - def uniquify(*names): - # An injective mapping from string* to string. - return ':'.join("%s:%d" % (name, len(name)) for name in names) - - def pcollection_id(op_ix, out_ix): - if (op_ix, out_ix) not in used_pcollections: - used_pcollections[op_ix, out_ix] = uniquify( - map_task[op_ix][0], 'out', str(out_ix)) - return used_pcollections[op_ix, out_ix] - - def get_inputs(op): - if hasattr(op, 'inputs'): - inputs = op.inputs - elif hasattr(op, 'input'): - inputs = [op.input] - else: - inputs = [] - return {'in%s' % ix: pcollection_id(*input) - for ix, input in enumerate(inputs)} - - def get_outputs(op_ix): - op = map_task[op_ix][1] - return {tag: pcollection_id(op_ix, out_ix) - for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))} - - for op_ix, (stage_name, operation) in enumerate(map_task): - transform_id = uniquify(stage_name) - - if isinstance(operation, operation_specs.WorkerInMemoryWrite): - # Write this data back to the runner. - target_name = only_element(get_inputs(operation).keys()) - runner_sinks[(transform_id, target_name)] = operation - transform_spec = beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.DATA_OUTPUT_URN, - payload=data_operation_spec.SerializeToString() \ - if data_operation_spec is not None else None) - - elif isinstance(operation, operation_specs.WorkerRead): - # A Read from an in-memory source is done over the data plane. - if (isinstance(operation.source.source, - maptask_executor_runner.InMemorySource) - and isinstance(operation.source.source.default_output_coder(), - WindowedValueCoder)): - target_name = only_element(get_outputs(op_ix).keys()) - input_data[(transform_id, target_name)] = self._reencode_elements( - operation.source.source.read(None), - operation.source.source.default_output_coder()) - transform_spec = beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.DATA_INPUT_URN, - payload=data_operation_spec.SerializeToString() \ - if data_operation_spec is not None else None) - - else: - # Otherwise serialize the source and execute it there. - # TODO: Use SDFs with an initial impulse. - # The Dataflow runner harness strips the base64 encoding. do the same - # here until we get the same thing back that we sent in. - source_bytes = base64.b64decode( - pickler.dumps(operation.source.source)) - transform_spec = beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.PYTHON_SOURCE_URN, - payload=source_bytes) - - elif isinstance(operation, operation_specs.WorkerDoFn): - # Record the contents of each side input for access via the state api. - side_input_extras = [] - for si in operation.side_inputs: - assert isinstance(si.source, iobase.BoundedSource) - element_coder = si.source.default_output_coder() - # TODO(robertwb): Actually flesh out the ViewFn API. - side_input_extras.append((si.tag, element_coder)) - side_input_data[ - bundle_processor.side_input_tag(transform_id, si.tag)] = ( - self._reencode_elements( - si.source.read(si.source.get_range_tracker(None, None)), - element_coder)) - augmented_serialized_fn = pickler.dumps( - (operation.serialized_fn, side_input_extras)) - transform_spec = beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.PYTHON_DOFN_URN, - payload=augmented_serialized_fn) - - elif isinstance(operation, operation_specs.WorkerFlatten): - # Flatten is nice and simple. - transform_spec = beam_runner_api_pb2.FunctionSpec( - urn=bundle_processor.IDENTITY_DOFN_URN) - - else: - raise NotImplementedError(operation) - - transform_protos[transform_id] = beam_runner_api_pb2.PTransform( - unique_name=stage_name, - spec=transform_spec, - inputs=get_inputs(operation), - outputs=get_outputs(op_ix)) - - pcollection_protos = { - name: beam_runner_api_pb2.PCollection( - unique_name=name, - coder_id=context.coders.get_id( - map_task[op_id][1].output_coders[out_id])) - for (op_id, out_id), name in used_pcollections.items() - } - # Must follow creation of pcollection_protos to capture used coders. - context_proto = context.to_runner_api() - process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor( - id=self._next_uid(), - transforms=transform_protos, - pcollections=pcollection_protos, - coders=dict(context_proto.coders.items()), - windowing_strategies=dict(context_proto.windowing_strategies.items()), - environments=dict(context_proto.environments.items())) - return input_data, side_input_data, runner_sinks, process_bundle_descriptor - - def _run_map_task( - self, map_task, control_handler, state_handler, data_plane_handler, - data_operation_spec): - registration, sinks, input_data = self._map_task_registration( - map_task, state_handler, data_operation_spec) - control_handler.push(registration) - process_bundle = beam_fn_api_pb2.InstructionRequest( - instruction_id=self._next_uid(), - process_bundle=beam_fn_api_pb2.ProcessBundleRequest( - process_bundle_descriptor_reference=registration.register. - process_bundle_descriptor[0].id)) - - for (transform_id, name), elements in input_data.items(): - data_out = data_plane_handler.output_stream( - process_bundle.instruction_id, beam_fn_api_pb2.Target( - primitive_transform_reference=transform_id, name=name)) - data_out.write(elements) - data_out.close() - - control_handler.push(process_bundle) - while True: - result = control_handler.pull() - if result.instruction_id == process_bundle.instruction_id: - if result.error: - raise RuntimeError(result.error) - expected_targets = [ - beam_fn_api_pb2.Target(primitive_transform_reference=transform_id, - name=output_name) - for (transform_id, output_name), _ in sinks.items()] - for output in data_plane_handler.input_elements( - process_bundle.instruction_id, expected_targets): - target_tuple = ( - output.target.primitive_transform_reference, output.target.name) - if target_tuple not in sinks: - # Unconsumed output. - continue - sink_op = sinks[target_tuple] - coder = sink_op.output_coders[0] - input_stream = create_InputStream(output.data) - elements = [] - while input_stream.size() > 0: - elements.append(coder.get_impl().decode_from_stream( - input_stream, True)) - if not sink_op.write_windowed_values: - elements = [e.value for e in elements] - for e in elements: - sink_op.output_buffer.append(e) - return - - def execute_map_tasks(self, ordered_map_tasks, direct=False): - if direct: - controller = FnApiRunner.DirectController() - else: - controller = FnApiRunner.GrpcController() - - try: - for _, map_task in ordered_map_tasks: - logging.info('Running %s', map_task) - self._run_map_task( - map_task, controller.control_handler, controller.state_handler, - controller.data_plane_handler, controller.data_operation_spec()) - finally: - controller.close() - - @staticmethod - def _reencode_elements(elements, element_coder): - output_stream = create_OutputStream() - for element in elements: - element_coder.get_impl().encode_to_stream(element, output_stream, True) - return output_stream.get() - # These classes are used to interact with the worker. class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer): @@ -1158,11 +939,14 @@ def close(self): self.data_server.stop(5).wait() -class RunnerResult(maptask_executor_runner.WorkerRunnerResult): +class RunnerResult(runner.PipelineResult): def __init__(self, state, metrics_by_stage): super(RunnerResult, self).__init__(state) self._metrics_by_stage = metrics_by_stage + def wait_until_finish(self, duration=None): + pass + def only_element(iterable): element, = iterable diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index ea9ed1a3bf6d..249eeceae1d3 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -32,6 +32,8 @@ DEFAULT_SAMPLING_PERIOD_MS = 0 +# Inherit good model test coverage from +# maptask_executor_runner_test.MapTaskExecutorRunnerTest. class FnApiRunnerTest( maptask_executor_runner_test.MapTaskExecutorRunnerTest): @@ -39,14 +41,6 @@ def create_pipeline(self): return beam.Pipeline( runner=fn_api_runner.FnApiRunner(use_grpc=False)) - def test_combine_per_key(self): - # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API. - pass - - def test_combine_per_key(self): - # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API. - pass - def test_pardo_side_inputs(self): def cross_product(elem, sides): for side in sides: From 0fd28bc0216231b2a59e37a662b873de8fd527f2 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 8 Nov 2017 16:32:27 -0800 Subject: [PATCH 2/2] lint --- sdks/python/apache_beam/runners/portability/fn_api_runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 174019baa124..674d5233bc2c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -17,7 +17,6 @@ """A PipelineRunner using the SDK harness. """ -import base64 import collections import copy import logging @@ -43,7 +42,6 @@ from apache_beam.runners import runner from apache_beam.runners.worker import bundle_processor from apache_beam.runners.worker import data_plane -from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import sdk_worker from apache_beam.transforms import trigger from apache_beam.transforms.window import GlobalWindows