diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 7b838d23fea6..1ba8ac051272 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -18,10 +18,13 @@ Cloud Pub/Sub sources and sinks are currently supported only in streaming pipelines, during remote execution. + +This API is currently under development and is subject to change. """ from __future__ import absolute_import +from apache_beam import coders from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write from apache_beam.runners.dataflow.native_io import iobase as dataflow_io @@ -30,7 +33,8 @@ from apache_beam.transforms.display import DisplayDataItem -__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub'] +__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub', + 'PubSubSource', 'PubSubSink'] class ReadStringsFromPubSub(PTransform): @@ -150,3 +154,71 @@ def _decodeUtf8String(encoded_value): def _encodeUtf8String(value): """Encodes a string in utf-8 format to bytes""" return value.encode('utf-8') + + +class PubSubSource(dataflow_io.NativeSource): + """Deprecated: do not use. + + Source for reading from a given Cloud Pub/Sub topic. + + Attributes: + topic: Cloud Pub/Sub topic in the form "/topics//". + subscription: Optional existing Cloud Pub/Sub subscription to use in the + form "projects//subscriptions/". + id_label: The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which can + be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, Dataflow cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + coder: The Coder to use for decoding incoming Pub/Sub messages. + """ + + def __init__(self, topic, subscription=None, id_label=None, + coder=coders.StrUtf8Coder()): + self.topic = topic + self.subscription = subscription + self.id_label = id_label + self.coder = coder + + @property + def format(self): + """Source format name required for remote execution.""" + return 'pubsub' + + def display_data(self): + return {'id_label': + DisplayDataItem(self.id_label, + label='ID Label Attribute').drop_if_none(), + 'topic': + DisplayDataItem(self.topic, + label='Pubsub Topic'), + 'subscription': + DisplayDataItem(self.subscription, + label='Pubsub Subscription').drop_if_none()} + + def reader(self): + raise NotImplementedError( + 'PubSubSource is not supported in local execution.') + + +class PubSubSink(dataflow_io.NativeSink): + """Deprecated: do not use. + + Sink for writing to a given Cloud Pub/Sub topic.""" + + def __init__(self, topic, coder=coders.StrUtf8Coder()): + self.topic = topic + self.coder = coder + + @property + def format(self): + """Sink format name required for remote execution.""" + return 'pubsub' + + def display_data(self): + return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')} + + def writer(self): + raise NotImplementedError( + 'PubSubSink is not supported in local execution.')