Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4536] Remove with_attributes keyword from ReadFromPubSub. #5605

Merged
merged 1 commit into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ReadFromPubSub(PTransform):
# Implementation note: This ``PTransform`` is overridden by Directrunner.

def __init__(self, topic=None, subscription=None, id_label=None,
with_attributes=False, timestamp_attribute=None):
timestamp_attribute=None):
"""Initializes ``ReadFromPubSub``.

Args:
Expand All @@ -118,12 +118,8 @@ def __init__(self, topic=None, subscription=None, id_label=None,
deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
with_attributes:
True - output elements will be :class:`~PubsubMessage` objects.
False - output elements will be of type ``str`` (message payload only).
timestamp_attribute: Message value to use as element timestamp. If None,
uses message publishing time as the timestamp.
Note that this argument doesn't require with_attributes=True.

Timestamp values should be in one of two formats:

Expand All @@ -135,12 +131,13 @@ def __init__(self, topic=None, subscription=None, id_label=None,
units smaller than milliseconds) may be ignored.
"""
super(ReadFromPubSub, self).__init__()
self.with_attributes = with_attributes
# TODO(BEAM-4536): Add with_attributes to kwargs once fixed.
self.with_attributes = False
self._source = _PubSubSource(
topic=topic,
subscription=subscription,
id_label=id_label,
with_attributes=with_attributes,
with_attributes=self.with_attributes,
timestamp_attribute=timestamp_attribute)

def expand(self, pvalue):
Expand Down Expand Up @@ -174,8 +171,7 @@ def __init__(self, topic=None, subscription=None, id_label=None):

def expand(self, pvalue):
p = (pvalue.pipeline
| ReadFromPubSub(self.topic, self.subscription, self.id_label,
with_attributes=False)
| ReadFromPubSub(self.topic, self.subscription, self.id_label)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
p.element_type = text_type
return p
Expand Down
27 changes: 17 additions & 10 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_expand_with_topic(self):
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/a_topic',
None, 'a_label', with_attributes=False,
None, 'a_label',
timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
Expand All @@ -87,7 +87,7 @@ def test_expand_with_subscription(self):
pcoll = (p
| ReadFromPubSub(
None, 'projects/fakeprj/subscriptions/a_subscription',
'a_label', with_attributes=False, timestamp_attribute=None)
'a_label', timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)

Expand All @@ -107,16 +107,17 @@ def test_expand_with_subscription(self):
def test_expand_with_no_topic_or_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Either a topic or subscription must be provided."):
ReadFromPubSub(None, None, 'a_label', with_attributes=False,
ReadFromPubSub(None, None, 'a_label',
timestamp_attribute=None)

def test_expand_with_both_topic_and_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Only one of topic or subscription should be provided."):
ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
with_attributes=False, timestamp_attribute=None)
timestamp_attribute=None)

def test_expand_with_other_options(self):
# TODO(BEAM-4536): Reenable test when bug is fixed.
def _test_expand_with_other_options(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
Expand Down Expand Up @@ -291,8 +292,9 @@ def create_client_message(payload, message_id, attributes, publish_time):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadFromPubSub(unittest.TestCase):

# TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
def test_read_messages_success(self, mock_pubsub):
def _test_read_messages_success(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
publish_time = '2018-03-12T13:37:01.234567Z'
Expand Down Expand Up @@ -353,8 +355,9 @@ def test_read_payload_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data))
p.run()

# TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
def _test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '1337'}
Expand All @@ -380,8 +383,10 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data), reify_windows=True)
p.run()

# TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
def _test_read_messages_timestamp_attribute_rfc3339_success(self,
mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '2018-03-12T13:37:01.234567Z'}
Expand All @@ -407,8 +412,9 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data), reify_windows=True)
p.run()

# TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
def _test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '1337'}
Expand All @@ -428,8 +434,9 @@ def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
with self.assertRaisesRegexp(KeyError, r'Timestamp.*nonexistent'):
p.run()

# TODO(BEAM-4536): Reenable test when bug is fixed.
@mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
def _test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
attributes = {'time': '1337 unparseable'}
Expand Down