Skip to content

[BEAM-3744] Python PubSub API Fixes and Tests#5952

Merged
charlesccychen merged 5 commits intoapache:masterfrom
udim:pubsub-api-2
Jul 26, 2018
Merged

[BEAM-3744] Python PubSub API Fixes and Tests#5952
charlesccychen merged 5 commits intoapache:masterfrom
udim:pubsub-api-2

Conversation

@udim
Copy link
Member

@udim udim commented Jul 14, 2018

Fixes attributes and adds an integration test for Dataflow.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- --- ---

- Add WriteToPubSub, which can write messages with attributes.
- [BEAM-4536] Fix and re-enable with_attributes support for reads and
write.
- Add pubsub_integration_test:
  - Does reads and writes with and without attributes.
  - Uses timestamp attributes and label ids features.
  - Only runs using TestDataflowRunner for now (DirectRunner-based
  testing TBD).
- Deprecate ReadStringsFromPubSub, WriteStringsToPubSub.
- Update examples to use newer (non-deprecated) PubSub PTransforms.
- Misc: Rename payload -> data, since payload is data+attributes,
according to the official google-cloud-pubsub client.
Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

def expand(self, pcoll):
if self.with_attributes:
pcoll = pcoll | 'ToProtobuf' >> Map(self.to_proto)
pcoll.element_type = six.binary_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this valid? Shouldn't the output type be some protobuf type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation I believe that this is correct.
The output of this mapping is a string (uses the proto object's SerializeToString()).
On Dataflow, this string is passed to a runner harness written in Java, so it must be a serialized protobuf to be understood.

@charlesccychen
Copy link
Contributor

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM after minor nits.

if transform.source.timestamp_attribute is not None:
step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
transform.source.timestamp_attribute)
logging.info('pubsub source')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debugging comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Removed

if transform.source.with_attributes:
# Setting this property signals Dataflow runner to return full
# PubsubMessages instead of just the payload.
# PubsubMessages instead of just data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the payload data" for clarity? This is the official doc for PubsubMessage: https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member Author

@udim udim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

if transform.source.timestamp_attribute is not None:
step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
transform.source.timestamp_attribute)
logging.info('pubsub source')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Removed

if transform.source.with_attributes:
# Setting this property signals Dataflow runner to return full
# PubsubMessages instead of just the payload.
# PubsubMessages instead of just data.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@charlesccychen
Copy link
Contributor

Thank you! This LGTM.

@charlesccychen charlesccychen merged commit 6afa12f into apache:master Jul 26, 2018
@aaltay
Copy link
Member

aaltay commented Jul 31, 2018

Thank you! I appreciate this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants