Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

Cloud Pub/Sub sources and sinks are currently supported only in streaming
pipelines, during remote execution.

This API is currently under development and is subject to change.
"""

from __future__ import absolute_import

from apache_beam import coders
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
Expand All @@ -30,7 +33,8 @@
from apache_beam.transforms.display import DisplayDataItem


__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub']
__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub',
'PubSubSource', 'PubSubSink']


class ReadStringsFromPubSub(PTransform):
Expand Down Expand Up @@ -150,3 +154,71 @@ def _decodeUtf8String(encoded_value):
def _encodeUtf8String(value):
"""Encodes a string in utf-8 format to bytes"""
return value.encode('utf-8')


class PubSubSource(dataflow_io.NativeSource):
"""Deprecated: do not use.

Source for reading from a given Cloud Pub/Sub topic.

Attributes:
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
subscription: Optional existing Cloud Pub/Sub subscription to use in the
form "projects/<project>/subscriptions/<subscription>".
id_label: The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which can
be any string that uniquely identifies the record) will be used for
deduplication of messages. If not provided, Dataflow cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
coder: The Coder to use for decoding incoming Pub/Sub messages.
"""

def __init__(self, topic, subscription=None, id_label=None,
coder=coders.StrUtf8Coder()):
self.topic = topic
self.subscription = subscription
self.id_label = id_label
self.coder = coder

@property
def format(self):
"""Source format name required for remote execution."""
return 'pubsub'

def display_data(self):
return {'id_label':
DisplayDataItem(self.id_label,
label='ID Label Attribute').drop_if_none(),
'topic':
DisplayDataItem(self.topic,
label='Pubsub Topic'),
'subscription':
DisplayDataItem(self.subscription,
label='Pubsub Subscription').drop_if_none()}

def reader(self):
raise NotImplementedError(
'PubSubSource is not supported in local execution.')


class PubSubSink(dataflow_io.NativeSink):
"""Deprecated: do not use.

Sink for writing to a given Cloud Pub/Sub topic."""

def __init__(self, topic, coder=coders.StrUtf8Coder()):
self.topic = topic
self.coder = coder

@property
def format(self):
"""Sink format name required for remote execution."""
return 'pubsub'

def display_data(self):
return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')}

def writer(self):
raise NotImplementedError(
'PubSubSink is not supported in local execution.')