From c2c9c8e3c948ec3004cef0c43b67ed8f6359bcbf Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Mon, 12 Jun 2017 22:32:39 -0700 Subject: [PATCH 1/4] Python streaming Create override as a composite of Impulse and a DoFn --- .../runners/dataflow/dataflow_runner.py | 68 +++++++++++++++++++ .../runners/dataflow/dataflow_runner_test.py | 16 +++++ sdks/python/apache_beam/transforms/core.py | 11 +-- 3 files changed, 91 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index cc9274ec40c7..d843dca5dd25 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -31,6 +31,7 @@ from apache_beam import error from apache_beam import coders from apache_beam import pvalue +from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.internal.gcp import json_value from apache_beam.pvalue import AsSideInput @@ -370,6 +371,73 @@ def _add_singleton_step(self, label, full_label, tag, input_step): PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) return step + def apply_Create(self, transform, pcoll): + standard_options = pcoll.pipeline._options.view_as(StandardOptions) # pylint: disable=protected-access + if standard_options.streaming: + # Imported here to avoid circular dependencies. + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.transforms.core import DoFn, PTransform + from apache_beam.transforms.core import Windowing + from apache_beam.transforms.window import GlobalWindows + + class DecodeAndEmitDoFn(DoFn): + """A DoFn which stores encoded versions of elements. + + It also stores a Coder to decode and emit those elements. + TODO: BEAM-2422 - Make this a SplittableDoFn. + """ + + def __init__(self, encoded_values, coder): + self.encoded_values = encoded_values + self.coder = coder + + def process(self, unused_element): + for encoded_value in self.encoded_values: + yield self.coder.decode(encoded_value) + + class Impulse(PTransform): + """The Dataflow specific override for the impulse primitive.""" + + def expand(self, pbegin): + assert isinstance(pbegin, pvalue.PBegin) + return pvalue.PCollection(pbegin.pipeline) + + def get_windowing(self, inputs): + return Windowing(GlobalWindows()) + + def infer_output_type(self, unused_input_type): + return bytes + + values_coder = typecoders.registry.get_coder(transform.get_output_type()) + encoded_values = map(values_coder.encode, transform.value) + return (pcoll + | 'Impulse' >> Impulse() + | 'Decode Values' >> beam.ParDo( + DecodeAndEmitDoFn(encoded_values, values_coder) + .with_output_types(transform.get_output_type()))) + else: + return self.apply_PTransform(transform, pcoll) + + def run_Impulse(self, transform_node): + standard_options = ( + transform_node.outputs[None].pipeline.options.view_as(StandardOptions)) + if standard_options.streaming: + step = self._add_step( + TransformNames.READ, transform_node.full_label, transform_node) + step.add_property(PropertyNames.FORMAT, 'pubsub') + step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/') + + step.encoding = self._get_encoded_output_coder(transform_node) + step.add_property( + PropertyNames.OUTPUT_INFO, + [{PropertyNames.USER_NAME: ( + '%s.%s' % ( + transform_node.full_label, PropertyNames.OUT)), + PropertyNames.ENCODING: step.encoding, + PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + else: + ValueError('Impulse source for batch pipelines has not been defined.') + def run_Flatten(self, transform_node): step = self._add_step(TransformNames.FLATTEN, transform_node.full_label, transform_node) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 74fd01df7bc2..89af1aff23e0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -111,6 +111,22 @@ def test_remote_runner_translation(self): remote_runner.job = apiclient.Job(p._options) super(DataflowRunner, remote_runner).run(p) + def test_streaming_create_translation(self): + remote_runner = DataflowRunner() + self.default_properties.append("--streaming") + p = Pipeline(remote_runner, PipelineOptions(self.default_properties)) + p | ptransform.Create([1]) # pylint: disable=expression-not-assigned + remote_runner.job = apiclient.Job(p._options) + super(DataflowRunner, remote_runner).run(p) + job_dict = json.loads(str(remote_runner.job)) + self.assertEqual(len(job_dict[u'steps']), 2) + + self.assertEqual(job_dict[u'steps'][0][u'kind'], u'ParallelRead') + self.assertEqual( + job_dict[u'steps'][0][u'properties'][u'pubsub_subscription'], + '_starting_signal/') + self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo') + def test_remote_runner_display_data(self): remote_runner = DataflowRunner() p = Pipeline(remote_runner, diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index c30136de2a43..801821909cf9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1444,15 +1444,18 @@ def infer_output_type(self, unused_input_type): return Any return Union[[trivial_inference.instance_to_type(v) for v in self.value]] + def get_output_type(self): + return (self.get_type_hints().simple_output_type(self.label) or + self.infer_output_type(None)) + def expand(self, pbegin): from apache_beam.io import iobase assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline - ouput_type = (self.get_type_hints().simple_output_type(self.label) or - self.infer_output_type(None)) - coder = typecoders.registry.get_coder(ouput_type) + coder = typecoders.registry.get_coder(self.get_output_type()) source = self._create_source_from_iterable(self.value, coder) - return pbegin.pipeline | iobase.Read(source).with_output_types(ouput_type) + return (pbegin.pipeline + | iobase.Read(source).with_output_types(self.get_output_type())) def get_windowing(self, unused_inputs): return Windowing(GlobalWindows()) From d4b2bef8d9ddef993a8e3728e3bf1b7b86123f02 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Thu, 15 Jun 2017 22:35:12 -0700 Subject: [PATCH 2/4] Use PTransformOverride to override Create --- .../runners/dataflow/dataflow_runner.py | 62 ++++------------- .../runners/dataflow/dataflow_runner_test.py | 2 + .../dataflow/native_io/streaming_create.py | 67 +++++++++++++++++++ .../runners/dataflow/ptransform_overrides.py | 53 +++++++++++++++ 4 files changed, 136 insertions(+), 48 deletions(-) create mode 100644 sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py create mode 100644 sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d843dca5dd25..6d84908d37e5 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -31,7 +31,6 @@ from apache_beam import error from apache_beam import coders from apache_beam import pvalue -from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.internal.gcp import json_value from apache_beam.pvalue import AsSideInput @@ -40,6 +39,7 @@ from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.dataflow.internal.names import PropertyNames from apache_beam.runners.dataflow.internal.names import TransformNames +from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride from apache_beam.runners.runner import PValueCache from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner @@ -70,6 +70,16 @@ class DataflowRunner(PipelineRunner): BATCH_ENVIRONMENT_MAJOR_VERSION = '6' STREAMING_ENVIRONMENT_MAJOR_VERSION = '1' + # A list of PTransformOverride objects to be applied before running a pipeline + # using DataflowRunner. + # Currently this only works for overrides where the input and output types do + # not change. + # For internal SDK use only. This should not be updated by Beam pipeline + # authors. + _PTRANSFORM_OVERRIDES = [ + CreatePTransformOverride(), + ] + def __init__(self, cache=None): # Cache of CloudWorkflowStep protos generated while the runner # "executes" a pipeline. @@ -230,6 +240,9 @@ def run(self, pipeline): 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') + # Performing configured PTransform overrides. + pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES) + # Add setup_options for all the BeamPlugin imports setup_options = pipeline._options.view_as(SetupOptions) plugins = BeamPlugin.get_all_plugin_paths() @@ -371,53 +384,6 @@ def _add_singleton_step(self, label, full_label, tag, input_step): PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) return step - def apply_Create(self, transform, pcoll): - standard_options = pcoll.pipeline._options.view_as(StandardOptions) # pylint: disable=protected-access - if standard_options.streaming: - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.transforms.core import DoFn, PTransform - from apache_beam.transforms.core import Windowing - from apache_beam.transforms.window import GlobalWindows - - class DecodeAndEmitDoFn(DoFn): - """A DoFn which stores encoded versions of elements. - - It also stores a Coder to decode and emit those elements. - TODO: BEAM-2422 - Make this a SplittableDoFn. - """ - - def __init__(self, encoded_values, coder): - self.encoded_values = encoded_values - self.coder = coder - - def process(self, unused_element): - for encoded_value in self.encoded_values: - yield self.coder.decode(encoded_value) - - class Impulse(PTransform): - """The Dataflow specific override for the impulse primitive.""" - - def expand(self, pbegin): - assert isinstance(pbegin, pvalue.PBegin) - return pvalue.PCollection(pbegin.pipeline) - - def get_windowing(self, inputs): - return Windowing(GlobalWindows()) - - def infer_output_type(self, unused_input_type): - return bytes - - values_coder = typecoders.registry.get_coder(transform.get_output_type()) - encoded_values = map(values_coder.encode, transform.value) - return (pcoll - | 'Impulse' >> Impulse() - | 'Decode Values' >> beam.ParDo( - DecodeAndEmitDoFn(encoded_values, values_coder) - .with_output_types(transform.get_output_type()))) - else: - return self.apply_PTransform(transform, pcoll) - def run_Impulse(self, transform_node): standard_options = ( transform_node.outputs[None].pipeline.options.view_as(StandardOptions)) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 89af1aff23e0..819d4713c11c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -117,6 +117,8 @@ def test_streaming_create_translation(self): p = Pipeline(remote_runner, PipelineOptions(self.default_properties)) p | ptransform.Create([1]) # pylint: disable=expression-not-assigned remote_runner.job = apiclient.Job(p._options) + # Performing configured PTransform overrides here. + p.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES) super(DataflowRunner, remote_runner).run(p) job_dict = json.loads(str(remote_runner.job)) self.assertEqual(len(job_dict[u'steps']), 2) diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py new file mode 100644 index 000000000000..6929378f1600 --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Create transform for streaming.""" + +from apache_beam import pvalue +from apache_beam import DoFn +from apache_beam import ParDo +from apache_beam import PTransform +from apache_beam import Windowing +from apache_beam.transforms.window import GlobalWindows + + +class StreamingCreate(PTransform): + """A specialized implementation for ``Create`` transform in streaming mode.""" + + def __init__(self, values, coder): + self.coder = coder + self.encoded_values = map(coder.encode, values) + + class DecodeAndEmitDoFn(DoFn): + """A DoFn which stores encoded versions of elements. + + It also stores a Coder to decode and emit those elements. + TODO: BEAM-2422 - Make this a SplittableDoFn. + """ + + def __init__(self, encoded_values, coder): + self.encoded_values = encoded_values + self.coder = coder + + def process(self, unused_element): + for encoded_value in self.encoded_values: + yield self.coder.decode(encoded_value) + + class Impulse(PTransform): + """The Dataflow specific override for the impulse primitive.""" + + def expand(self, pbegin): + assert isinstance(pbegin, pvalue.PBegin) + return pvalue.PCollection(pbegin.pipeline) + + def get_windowing(self, inputs): + return Windowing(GlobalWindows()) + + def infer_output_type(self, unused_input_type): + return bytes + + def expand(self, pbegin): + return (pbegin + | 'Impulse' >> self.Impulse() + | 'Decode Values' >> ParDo( + self.DecodeAndEmitDoFn(self.encoded_values, self.coder))) diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py new file mode 100644 index 000000000000..a45cf33cb7ce --- /dev/null +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Ptransform overrides for DataflowRunner.""" + +from apache_beam.coders import typecoders +from apache_beam.pipeline import PTransformOverride + + +class CreatePTransformOverride(PTransformOverride): + """A ``PTransformOverride`` for ``Create`` in streaming mode.""" + + def get_matcher(self): + return self.is_streaming_create + + @staticmethod + def is_streaming_create(applied_ptransform): + # Imported here to avoid circular dependencies. + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam import Create + from apache_beam.options.pipeline_options import StandardOptions + + if isinstance(applied_ptransform.transform, Create): + standard_options = (applied_ptransform + .outputs[None] + .pipeline + .options + .view_as(StandardOptions)) + return standard_options.streaming + else: + return False + + def get_replacement_transform(self, ptransform): + # Imported here to avoid circular dependencies. + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.runners.dataflow.native_io.streaming_create import \ + StreamingCreate + coder = typecoders.registry.get_coder(ptransform.get_output_type()) + return StreamingCreate(ptransform.value, coder) From c3846ea1ddd2ff776f09c6bdb175c436923de117 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Fri, 16 Jun 2017 13:28:49 -0700 Subject: [PATCH 3/4] Address comments --- .../runners/dataflow/native_io/streaming_create.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py index 6929378f1600..8c6c8d6d5299 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py @@ -26,7 +26,11 @@ class StreamingCreate(PTransform): - """A specialized implementation for ``Create`` transform in streaming mode.""" + """A specialized implementation for ``Create`` transform in streaming mode. + + Note: There is no unbounded source API in python to wrap the Create source, + so we map this to composite of Impulse primitive and an SDF. + """ def __init__(self, values, coder): self.coder = coder @@ -51,7 +55,8 @@ class Impulse(PTransform): """The Dataflow specific override for the impulse primitive.""" def expand(self, pbegin): - assert isinstance(pbegin, pvalue.PBegin) + assert isinstance(pbegin, pvalue.PBegin), ( + 'Input to Impulse transform must be a PBegin but found %s' % pbegin) return pvalue.PCollection(pbegin.pipeline) def get_windowing(self, inputs): From 685d1fac0b0b2f7dacd60dc6d2986808c1c54ce1 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Fri, 16 Jun 2017 16:26:19 -0700 Subject: [PATCH 4/4] Use _options instead of options to prevent a warning --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 2 +- .../apache_beam/runners/dataflow/ptransform_overrides.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 6d84908d37e5..ce46ea9a23f7 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -386,7 +386,7 @@ def _add_singleton_step(self, label, full_label, tag, input_step): def run_Impulse(self, transform_node): standard_options = ( - transform_node.outputs[None].pipeline.options.view_as(StandardOptions)) + transform_node.outputs[None].pipeline._options.view_as(StandardOptions)) if standard_options.streaming: step = self._add_step( TransformNames.READ, transform_node.full_label, transform_node) diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index a45cf33cb7ce..680a4b7de5c2 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -37,8 +37,7 @@ def is_streaming_create(applied_ptransform): if isinstance(applied_ptransform.transform, Create): standard_options = (applied_ptransform .outputs[None] - .pipeline - .options + .pipeline._options .view_as(StandardOptions)) return standard_options.streaming else: