From 7b46aee52f3ef8f3b11dff9eb2345e0bf65fdb3d Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 23 May 2017 14:39:41 -0700 Subject: [PATCH 1/2] [BEAM-2354] Enable a native pubsub source within the Dataflow Runner --- .../apache_beam/options/pipeline_options.py | 8 ---- .../pipeline_options_validator_test.py | 4 +- .../runners/dataflow/dataflow_runner.py | 38 +++++++++++++++++++ .../runners/dataflow/dataflow_runner_test.py | 11 ++++++ 4 files changed, 51 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 777926ab5297..8598e057e83c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -278,14 +278,6 @@ def _add_argparse_args(cls, parser): action='store_true', help='Whether to enable streaming mode.') - # TODO(BEAM-1265): Remove this error, once at least one runner supports - # streaming pipelines. - def validate(self, validator): - errors = [] - if self.view_as(StandardOptions).streaming: - errors.append('Streaming pipelines are not supported.') - return errors - class TypeOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py index 28fcbe3e4ab9..69bfd939f852 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py @@ -302,11 +302,11 @@ def test_validate_dataflow_job_file(self): def test_streaming(self): pipeline_options = PipelineOptions(['--streaming']) - runner = MockRunners.TestDataflowRunner() + runner = MockRunners.OtherRunner() validator = PipelineOptionsValidator(pipeline_options, runner) errors = validator.validate() - self.assertIn('Streaming pipelines are not supported.', errors) + self.assertEqual(len(errors), 0) def test_test_matcher(self): def get_validator(matcher): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index a05e582b05f4..061474ac8c5f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -532,6 +532,44 @@ def run_CombineValues(self, transform_node): PropertyNames.OUTPUT_NAME: PropertyNames.OUT}) step.add_property(PropertyNames.OUTPUT_INFO, outputs) + def apply_Read(self, transform, pcoll): + """Adds a necessary transform to convert native PubSubSource representations + to the expected type based upon the user specified coder within the + PubSubSource. + """ + + # Import here to avoid adding the dependency for local running scenarios. + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam import Map + from apache_beam.io.gcp.pubsub import PubSubSource + + # Only add the additional portion if this is a PubSubSource + if not isinstance(transform.source, PubSubSource): + return pvalue.PCollection(pcoll.pipeline) + + class PubSubMessagePayloadTransformer(object): + """Converts from a payload of bytes encoded in the nested context to + the supplied coder type. + """ + def __init__(self, value_coder): + self._input_coder = coders.BytesCoder() + self._value_coder = value_coder + + def transform_value(self, encoded_value): + bytes = self._input_coder.decode(encoded_value) + return self._value_coder.decode(bytes) + + pcoll.element_type = coders.WindowedValueCoder( + coders.BytesCoder(), + coders.coders.GlobalWindowCoder()) + + pcoll = pcoll | 'parse fn' >> Map( + PubSubMessagePayloadTransformer(transform.source.coder).transform_value) + pcoll.element_type = coders.WindowedValueCoder( + transform.source.coder, + coders.coders.GlobalWindowCoder()) + return pcoll + def run_Read(self, transform_node): transform = transform_node.transform step = self._add_step( diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index ff4b51d5a826..71c3cc9d5aae 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -27,6 +27,7 @@ import apache_beam.transforms as ptransform from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.pipeline import Pipeline, AppliedPTransform from apache_beam.pvalue import PCollection from apache_beam.runners import create_runner @@ -240,6 +241,16 @@ def _test_flatten_input_visitor(self, input_type, output_type, num_inputs): for _ in range(num_inputs): self.assertEqual(inputs[0].element_type, output_type) + def test_pubsub_source_override(self): + remote_runner = DataflowRunner() + options = StandardOptions(self.default_properties + ["--streaming"]) + p = Pipeline(remote_runner, + options=options) + + (p | beam.io.Read(beam.io.PubSubSource("test_topic"))) + remote_runner.job = apiclient.Job(p._options) + super(DataflowRunner, remote_runner).run(p) + print remote_runner.job if __name__ == '__main__': unittest.main() From 27b88ddc434e72c82e15ebfe1e1d237914782351 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 23 May 2017 15:02:04 -0700 Subject: [PATCH 2/2] fixup! Use apply_PTransform --- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 061474ac8c5f..5ac3f33b5545 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -545,7 +545,7 @@ def apply_Read(self, transform, pcoll): # Only add the additional portion if this is a PubSubSource if not isinstance(transform.source, PubSubSource): - return pvalue.PCollection(pcoll.pipeline) + return self.apply_PTransform(transform, pcoll) class PubSubMessagePayloadTransformer(object): """Converts from a payload of bytes encoded in the nested context to @@ -563,7 +563,7 @@ def transform_value(self, encoded_value): coders.BytesCoder(), coders.coders.GlobalWindowCoder()) - pcoll = pcoll | 'parse fn' >> Map( + pcoll = self.apply_PTransform(transform, pcoll) | 'parse fn' >> Map( PubSubMessagePayloadTransformer(transform.source.coder).transform_value) pcoll.element_type = coders.WindowedValueCoder( transform.source.coder,