From 42153717b0bcd94a40dcc89b3b124163a3139ec5 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 24 May 2017 18:14:59 -0700 Subject: [PATCH 1/2] [BEAM-2354] Add a ReadStringsFromPubSub/WriteStringsToPubSub PTransform --- .../apache_beam/examples/streaming_wordcap.py | 3 +- .../examples/streaming_wordcount.py | 6 +- sdks/python/apache_beam/io/gcp/pubsub.py | 79 ++++++++++++++++--- sdks/python/apache_beam/io/gcp/pubsub_test.py | 57 +++++++++++-- 4 files changed, 123 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index ce43e1f09849..55b8db519c0e 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -44,8 +44,7 @@ def run(argv=None): with beam.Pipeline(argv=pipeline_args) as p: # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read( - beam.io.PubSubSource(known_args.input_topic)) + lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) # Capitalize the characters in each line. transformed = (lines diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index e9d5dbefa835..ed8b5d08dc62 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -47,8 +47,7 @@ def run(argv=None): with beam.Pipeline(argv=pipeline_args) as p: # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> beam.io.Read( - beam.io.PubSubSource(known_args.input_topic)) + lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) # Capitalize the characters in each line. transformed = (lines @@ -63,8 +62,7 @@ def run(argv=None): # Write to PubSub. # pylint: disable=expression-not-assigned - transformed | 'pubsub_write' >> beam.io.Write( - beam.io.PubSubSink(known_args.output_topic)) + transformed | beam.io.WriteStringsToPubSub(known_args.output_topic) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 103fce0b8714..f18d10ee6bde 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -22,14 +22,67 @@ from __future__ import absolute_import -from apache_beam import coders +from apache_beam.io.iobase import Read +from apache_beam.io.iobase import Write from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms import PTransform +from apache_beam.transforms import ParDo from apache_beam.transforms.display import DisplayDataItem -__all__ = ['PubSubSink', 'PubSubSource'] +__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub'] -class PubSubSource(dataflow_io.NativeSource): + +class ReadStringsFromPubSub(PTransform): + """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.""" + + def __init__(self, topic, subscription=None, id_label=None): + """Initializes ``ReadStringsFromPubSub``. + + Attributes: + topic: Cloud Pub/Sub topic in the form "/topics//". + subscription: Optional existing Cloud Pub/Sub subscription to use in the + form "projects//subscriptions/". + id_label: The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which + can be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, we cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + """ + super(ReadStringsFromPubSub, self).__init__() + self._source = _PubSubSource( + topic, + subscription=subscription, + id_label=id_label) + + def expand(self, pvalue): + pcoll = pvalue.pipeline | Read(self._source) + pcoll.element_type = bytes + pcoll = pcoll | 'decode string' >> ParDo(_decodeUtf8String) + pcoll.element_type = unicode + return pcoll + + +class WriteStringsToPubSub(PTransform): + """A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub.""" + + def __init__(self, topic): + """Initializes ``WriteStringsToPubSub``. + + Attributes: + topic: Cloud Pub/Sub topic in the form "/topics//". + """ + super(WriteStringsToPubSub, self).__init__() + self._sink = _PubSubSink(topic) + + def expand(self, pcoll): + pcoll = pcoll | 'encode string' >> ParDo(_encodeUtf8String) + pcoll.element_type = bytes + return pcoll | Write(self._sink) + + +class _PubSubSource(dataflow_io.NativeSource): """Source for reading from a given Cloud Pub/Sub topic. Attributes: @@ -42,15 +95,12 @@ class PubSubSource(dataflow_io.NativeSource): deduplication of messages. If not provided, Dataflow cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort. - coder: The Coder to use for decoding incoming Pub/Sub messages. """ - def __init__(self, topic, subscription=None, id_label=None, - coder=coders.StrUtf8Coder()): + def __init__(self, topic, subscription=None, id_label=None): self.topic = topic self.subscription = subscription self.id_label = id_label - self.coder = coder @property def format(self): @@ -73,12 +123,11 @@ def reader(self): 'PubSubSource is not supported in local execution.') -class PubSubSink(dataflow_io.NativeSink): +class _PubSubSink(dataflow_io.NativeSink): """Sink for writing to a given Cloud Pub/Sub topic.""" - def __init__(self, topic, coder=coders.StrUtf8Coder()): + def __init__(self, topic): self.topic = topic - self.coder = coder @property def format(self): @@ -91,3 +140,13 @@ def display_data(self): def writer(self): raise NotImplementedError( 'PubSubSink is not supported in local execution.') + + +def _decodeUtf8String(encoded_value): + """Decodes a string in utf-8 format from bytes""" + return encoded_value.decode('utf-8') + + +def _encodeUtf8String(value): + """Encodes a string in utf-8 format to bytes""" + return value.encode('utf-8') diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 1642a95a59e7..f39d74311647 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -22,16 +22,53 @@ import hamcrest as hc -from apache_beam.io.gcp.pubsub import PubSubSink -from apache_beam.io.gcp.pubsub import PubSubSource +from apache_beam.io.gcp.pubsub import _decodeUtf8String +from apache_beam.io.gcp.pubsub import _encodeUtf8String +from apache_beam.io.gcp.pubsub import _PubSubSink +from apache_beam.io.gcp.pubsub import _PubSubSource +from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub +from apache_beam.io.gcp.pubsub import WriteStringsToPubSub +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -class TestPubSubSource(unittest.TestCase): +class TestReadStringsFromPubSub(unittest.TestCase): + def test_expand(self): + p = TestPipeline() + pcoll = p | ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label') + # Ensure that the output type is str + self.assertEqual(unicode, pcoll.element_type) + + # Ensure that the type on the intermediate read output PCollection is bytes + read_pcoll = pcoll.producer.inputs[0] + self.assertEqual(bytes, read_pcoll.element_type) + + # Ensure that the properties passed through correctly + source = read_pcoll.producer.transform.source + self.assertEqual('a_topic', source.topic) + self.assertEqual('a_subscription', source.subscription) + self.assertEqual('a_label', source.id_label) + + +class TestWriteStringsToPubSub(unittest.TestCase): + def test_expand(self): + p = TestPipeline() + pdone = p | ReadStringsFromPubSub('baz') | WriteStringsToPubSub('a_topic') + + # Ensure that the properties passed through correctly + sink = pdone.producer.transform.sink + self.assertEqual('a_topic', sink.topic) + # Ensure that the type on the intermediate payload transformer output + # PCollection is bytes + write_pcoll = pdone.producer.inputs[0] + self.assertEqual(bytes, write_pcoll.element_type) + + +class TestPubSubSource(unittest.TestCase): def test_display_data(self): - source = PubSubSource('a_topic', 'a_subscription', 'a_label') + source = _PubSubSource('a_topic', 'a_subscription', 'a_label') dd = DisplayData.create_from(source) expected_items = [ DisplayDataItemMatcher('topic', 'a_topic'), @@ -41,7 +78,7 @@ def test_display_data(self): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_display_data_no_subscription(self): - source = PubSubSource('a_topic') + source = _PubSubSource('a_topic') dd = DisplayData.create_from(source) expected_items = [ DisplayDataItemMatcher('topic', 'a_topic')] @@ -51,7 +88,7 @@ def test_display_data_no_subscription(self): class TestPubSubSink(unittest.TestCase): def test_display_data(self): - sink = PubSubSink('a_topic') + sink = _PubSubSink('a_topic') dd = DisplayData.create_from(sink) expected_items = [ DisplayDataItemMatcher('topic', 'a_topic')] @@ -59,6 +96,14 @@ def test_display_data(self): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) +class TestEncodeDecodeUtf8String(unittest.TestCase): + def test_encode(self): + self.assertEqual(b'test_data', _encodeUtf8String('test_data')) + + def test_decode(self): + self.assertEqual('test_data', _decodeUtf8String(b'test_data')) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 9615be0256bce7ca2ac9da4964165d029fa0cadf Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Thu, 25 May 2017 17:16:33 -0700 Subject: [PATCH 2/2] fixup! Address PR comments and also fix one existing usage of PubSubSink --- .../apache_beam/examples/streaming_wordcap.py | 3 +-- sdks/python/apache_beam/io/gcp/pubsub.py | 16 ++++++++-------- sdks/python/apache_beam/io/gcp/pubsub_test.py | 10 +++++----- .../runners/dataflow/dataflow_runner.py | 8 ++++---- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index 55b8db519c0e..19f9e5f77ed8 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -52,8 +52,7 @@ def run(argv=None): # Write to PubSub. # pylint: disable=expression-not-assigned - transformed | beam.io.Write( - beam.io.PubSubSink(known_args.output_topic)) + transformed | beam.io.WriteStringsToPubSub(known_args.output_topic) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index f18d10ee6bde..7b838d23fea6 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -51,7 +51,7 @@ def __init__(self, topic, subscription=None, id_label=None): case, deduplication of the stream will be strictly best effort. """ super(ReadStringsFromPubSub, self).__init__() - self._source = _PubSubSource( + self._source = _PubSubPayloadSource( topic, subscription=subscription, id_label=id_label) @@ -74,7 +74,7 @@ def __init__(self, topic): topic: Cloud Pub/Sub topic in the form "/topics//". """ super(WriteStringsToPubSub, self).__init__() - self._sink = _PubSubSink(topic) + self._sink = _PubSubPayloadSink(topic) def expand(self, pcoll): pcoll = pcoll | 'encode string' >> ParDo(_encodeUtf8String) @@ -82,8 +82,8 @@ def expand(self, pcoll): return pcoll | Write(self._sink) -class _PubSubSource(dataflow_io.NativeSource): - """Source for reading from a given Cloud Pub/Sub topic. +class _PubSubPayloadSource(dataflow_io.NativeSource): + """Source for the payload of a message as bytes from a Cloud Pub/Sub topic. Attributes: topic: Cloud Pub/Sub topic in the form "/topics//". @@ -120,11 +120,11 @@ def display_data(self): def reader(self): raise NotImplementedError( - 'PubSubSource is not supported in local execution.') + 'PubSubPayloadSource is not supported in local execution.') -class _PubSubSink(dataflow_io.NativeSink): - """Sink for writing to a given Cloud Pub/Sub topic.""" +class _PubSubPayloadSink(dataflow_io.NativeSink): + """Sink for the payload of a message as bytes to a Cloud Pub/Sub topic.""" def __init__(self, topic): self.topic = topic @@ -139,7 +139,7 @@ def display_data(self): def writer(self): raise NotImplementedError( - 'PubSubSink is not supported in local execution.') + 'PubSubPayloadSink is not supported in local execution.') def _decodeUtf8String(encoded_value): diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index f39d74311647..322d08a34cb3 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -24,8 +24,8 @@ from apache_beam.io.gcp.pubsub import _decodeUtf8String from apache_beam.io.gcp.pubsub import _encodeUtf8String -from apache_beam.io.gcp.pubsub import _PubSubSink -from apache_beam.io.gcp.pubsub import _PubSubSource +from apache_beam.io.gcp.pubsub import _PubSubPayloadSink +from apache_beam.io.gcp.pubsub import _PubSubPayloadSource from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub from apache_beam.io.gcp.pubsub import WriteStringsToPubSub from apache_beam.testing.test_pipeline import TestPipeline @@ -68,7 +68,7 @@ def test_expand(self): class TestPubSubSource(unittest.TestCase): def test_display_data(self): - source = _PubSubSource('a_topic', 'a_subscription', 'a_label') + source = _PubSubPayloadSource('a_topic', 'a_subscription', 'a_label') dd = DisplayData.create_from(source) expected_items = [ DisplayDataItemMatcher('topic', 'a_topic'), @@ -78,7 +78,7 @@ def test_display_data(self): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_display_data_no_subscription(self): - source = _PubSubSource('a_topic') + source = _PubSubPayloadSource('a_topic') dd = DisplayData.create_from(source) expected_items = [ DisplayDataItemMatcher('topic', 'a_topic')] @@ -88,7 +88,7 @@ def test_display_data_no_subscription(self): class TestPubSubSink(unittest.TestCase): def test_display_data(self): - sink = _PubSubSink('a_topic') + sink = _PubSubPayloadSink('a_topic') dd = DisplayData.create_from(sink) expected_items = [ DisplayDataItemMatcher('topic', 'a_topic')] diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index a05e582b05f4..046d3d585388 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -595,8 +595,8 @@ def run_Read(self, transform_node): standard_options = ( transform_node.inputs[0].pipeline.options.view_as(StandardOptions)) if not standard_options.streaming: - raise ValueError('PubSubSource is currently available for use only in ' - 'streaming pipelines.') + raise ValueError('PubSubPayloadSource is currently available for use ' + 'only in streaming pipelines.') step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic) if transform.source.subscription: step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, @@ -677,8 +677,8 @@ def run__NativeWrite(self, transform_node): standard_options = ( transform_node.inputs[0].pipeline.options.view_as(StandardOptions)) if not standard_options.streaming: - raise ValueError('PubSubSink is currently available for use only in ' - 'streaming pipelines.') + raise ValueError('PubSubPayloadSink is currently available for use ' + 'only in streaming pipelines.') step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic) else: raise ValueError(