Skip to content

Commit

Permalink
Merge pull request #6564 from udim/pubsub-0-35-4
Browse files Browse the repository at this point in the history
[BEAM-5513] Upgrade Python SDK to PubSub 0.35.4
  • Loading branch information
charlesccychen committed Oct 5, 2018
2 parents e4965ad + 7ed8f70 commit 324f0b3
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 481 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ def setUp(self):

# Set up PubSub environment.
from google.cloud import pubsub
self.pubsub_client = pubsub.Client(project=self.project)
unique_topic_name = self.INPUT_TOPIC + _unique_id
unique_subscrition_name = self.INPUT_SUB + _unique_id
self.input_topic = self.pubsub_client.topic(unique_topic_name)
self.input_sub = self.input_topic.subscription(unique_subscrition_name)
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))

self.input_topic.create()
test_utils.wait_for_topics_created([self.input_topic])
self.input_sub.create()
self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project,
self.INPUT_SUB + _unique_id),
self.input_topic.name)

# Set up BigQuery environment
from google.cloud import bigquery
Expand All @@ -95,14 +95,15 @@ def _inject_pubsub_game_events(self, topic, message_count):
"""Inject game events as test data to PubSub."""

logging.debug('Injecting %d game events to topic %s',
message_count, topic.full_name)
message_count, topic.name)

for _ in range(message_count):
topic.publish(self.INPUT_EVENT % self._test_timestamp)
self.pub_client.publish(topic.name,
self.INPUT_EVENT % self._test_timestamp)

def _cleanup_pubsub(self):
test_utils.cleanup_subscriptions([self.input_sub])
test_utils.cleanup_topics([self.input_topic])
test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
test_utils.cleanup_topics(self.pub_client, [self.input_topic])

def _cleanup_dataset(self):
self.dataset.delete()
Expand All @@ -123,9 +124,9 @@ def test_game_stats_it(self):

# TODO(mariagh): Add teams table verifier once game_stats.py is fixed.

extra_opts = {'subscription': self.input_sub.full_name,
extra_opts = {'subscription': self.input_sub.name,
'dataset': self.dataset.name,
'topic': self.input_topic.full_name,
'topic': self.input_topic.name,
'fixed_window_duration': 1,
'user_activity_window_duration': 1,
'wait_until_finish_duration':
Expand All @@ -143,8 +144,6 @@ def test_game_stats_it(self):
self.dataset.name, self.OUTPUT_TABLE_TEAMS)

# Generate input data and inject to PubSub.
test_utils.wait_for_subscriptions_created([self.input_topic,
self.input_sub])
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)

# Get pipeline options from command argument: --test-pipeline-options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ def setUp(self):

# Set up PubSub environment.
from google.cloud import pubsub
self.pubsub_client = pubsub.Client(project=self.project)
unique_topic_name = self.INPUT_TOPIC + _unique_id
unique_subscrition_name = self.INPUT_SUB + _unique_id
self.input_topic = self.pubsub_client.topic(unique_topic_name)
self.input_sub = self.input_topic.subscription(unique_subscrition_name)

self.input_topic.create()
test_utils.wait_for_topics_created([self.input_topic])
self.input_sub.create()
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project,
self.INPUT_SUB + _unique_id),
self.input_topic.name)

# Set up BigQuery environment
from google.cloud import bigquery
Expand All @@ -96,14 +97,15 @@ def _inject_pubsub_game_events(self, topic, message_count):
"""Inject game events as test data to PubSub."""

logging.debug('Injecting %d game events to topic %s',
message_count, topic.full_name)
message_count, topic.name)

for _ in range(message_count):
topic.publish(self.INPUT_EVENT % self._test_timestamp)
self.pub_client.publish(topic.name,
self.INPUT_EVENT % self._test_timestamp)

def _cleanup_pubsub(self):
test_utils.cleanup_subscriptions([self.input_sub])
test_utils.cleanup_topics([self.input_topic])
test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
test_utils.cleanup_topics(self.pub_client, [self.input_topic])

def _cleanup_dataset(self):
self.dataset.delete()
Expand Down Expand Up @@ -131,9 +133,9 @@ def test_leader_board_it(self):
teams_query,
self.DEFAULT_EXPECTED_CHECKSUM)

extra_opts = {'subscription': self.input_sub.full_name,
extra_opts = {'subscription': self.input_sub.name,
'dataset': self.dataset.name,
'topic': self.input_topic.full_name,
'topic': self.input_topic.name,
'team_window_duration': 1,
'wait_until_finish_duration':
self.WAIT_UNTIL_FINISH_DURATION,
Expand All @@ -151,8 +153,6 @@ def test_leader_board_it(self):
self.dataset.name, self.OUTPUT_TABLE_TEAMS)

# Generate input data and inject to PubSub.
test_utils.wait_for_subscriptions_created([self.input_topic,
self.input_sub])
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)

# Get pipeline options from command argument: --test-pipeline-options,
Expand Down
45 changes: 22 additions & 23 deletions sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,31 @@ def setUp(self):

# Set up PubSub environment.
from google.cloud import pubsub
self.pubsub_client = pubsub.Client(project=self.project)
self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)

self.input_topic.create()
self.output_topic.create()
test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
self.input_sub.create()
self.output_sub.create()
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name)

def _inject_numbers(self, topic, num_messages):
"""Inject numbers as test data to PubSub."""
logging.debug('Injecting %d numbers to topic %s',
num_messages, topic.full_name)
logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
for n in range(num_messages):
topic.publish(str(n))

def _cleanup_pubsub(self):
test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
test_utils.cleanup_topics([self.input_topic, self.output_topic])
self.pub_client.publish(self.input_topic.name, str(n))

def tearDown(self):
self._cleanup_pubsub()
test_utils.cleanup_subscriptions(self.sub_client,
[self.input_sub, self.output_sub])
test_utils.cleanup_topics(self.pub_client,
[self.input_topic, self.output_topic])

@attr('IT')
def test_streaming_wordcount_it(self):
Expand All @@ -86,17 +86,16 @@ def test_streaming_wordcount_it(self):
# Set extra options to the pipeline for test purpose
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
pubsub_msg_verifier = PubSubMessageMatcher(self.project,
OUTPUT_SUB + self.uuid,
self.output_sub.name,
expected_msg,
timeout=400)
extra_opts = {'input_subscription': self.input_sub.full_name,
'output_topic': self.output_topic.full_name,
extra_opts = {'input_subscription': self.input_sub.name,
'output_topic': self.output_topic.name,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier,
pubsub_msg_verifier)}

# Generate input data and inject to PubSub.
test_utils.wait_for_subscriptions_created([self.input_sub])
self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)

# Get pipeline options from command argument: --test-pipeline-options,
Expand Down
13 changes: 6 additions & 7 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.annotations import deprecated

# The protobuf library is only used for running on Dataflow.
try:
from google.cloud.proto.pubsub.v1 import pubsub_pb2
from google.cloud import pubsub
except ImportError:
pubsub_pb2 = None
pubsub = None

__all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
'WriteStringsToPubSub', 'WriteToPubSub']
Expand Down Expand Up @@ -92,7 +91,7 @@ def _from_proto_str(proto_msg):
Returns:
A new PubsubMessage object.
"""
msg = pubsub_pb2.PubsubMessage()
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
Expand All @@ -109,17 +108,17 @@ def _to_proto_str(self):
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
containing the payload of this object.
"""
msg = pubsub_pb2.PubsubMessage()
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.data = self.data
for key, value in self.attributes.iteritems():
msg.attributes[key] = value
return msg.SerializeToString()

@staticmethod
def _from_message(msg):
"""Construct from ``google.cloud.pubsub.message.Message``.
"""Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``.
https://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/api/message.html
https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
Expand Down
39 changes: 21 additions & 18 deletions sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,25 @@ def setUp(self):

# Set up PubSub environment.
from google.cloud import pubsub
self.pubsub_client = pubsub.Client(project=self.project)
self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)

self.input_topic.create()
self.output_topic.create()
test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
self.input_sub.create()
self.output_sub.create()
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))

self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
self.output_topic.name)

def tearDown(self):
test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
test_utils.cleanup_topics([self.input_topic, self.output_topic])
test_utils.cleanup_subscriptions(self.sub_client,
[self.input_sub, self.output_sub])
test_utils.cleanup_topics(self.pub_client,
[self.input_topic, self.output_topic])

def _test_streaming(self, with_attributes):
"""Runs IT pipeline with message verifier.
Expand All @@ -139,21 +143,20 @@ def _test_streaming(self, with_attributes):
strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE]
pubsub_msg_verifier = PubSubMessageMatcher(
self.project,
OUTPUT_SUB + self.uuid,
self.output_sub.name,
expected_messages,
timeout=MESSAGE_MATCHER_TIMEOUT_S,
with_attributes=with_attributes,
strip_attributes=strip_attributes)
extra_opts = {'input_subscription': self.input_sub.full_name,
'output_topic': self.output_topic.full_name,
extra_opts = {'input_subscription': self.input_sub.name,
'output_topic': self.output_topic.name,
'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
'on_success_matcher': all_of(state_verifier,
pubsub_msg_verifier)}

# Generate input data and inject to PubSub.
test_utils.wait_for_subscriptions_created([self.input_sub])
for msg in self.INPUT_MESSAGES[self.runner_name]:
self.input_topic.publish(msg.data, **msg.attributes)
self.pub_client.publish(self.input_topic.name, msg.data, **msg.attributes)

# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
Expand Down
Loading

0 comments on commit 324f0b3

Please sign in to comment.