Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10962] Add Multiple PubSub reader to Python SDK #12930

Merged
merged 14 commits into from
Nov 25, 2020

Conversation

InigoSJ
Copy link
Contributor

@InigoSJ InigoSJ commented Sep 24, 2020

As discussed over Dev mail, a very common use case in Dataflow / Beam is reading from multiple PubSub topics/subscriptions and flatten them out.

The PR adds the PTransform to do so.

It takes two parameters:

  • source_list: List of topics/subscriptions.
  • with_context: option to return a key-value pair of the form (source, actual message)

Other options from ReadFromPubSub can also be included as kwargs except topic or subscription.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 24, 2020

R: @chamikaramj

Adding you as reviewer since you replied in the email thread. Thanks a lot!

Let me know if there's something I should change.

@codecov
Copy link

codecov bot commented Sep 24, 2020

Codecov Report

Merging #12930 (34b92bf) into master (3d6cc0e) will increase coverage by 0.02%.
The diff coverage is 87.50%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #12930      +/-   ##
==========================================
+ Coverage   82.48%   82.51%   +0.02%     
==========================================
  Files         455      455              
  Lines       54876    54924      +48     
==========================================
+ Hits        45266    45322      +56     
+ Misses       9610     9602       -8     
Impacted Files Coverage Δ
sdks/python/apache_beam/io/gcp/pubsub.py 91.24% <87.50%> (-1.07%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 94.47% <0.00%> (+0.13%) ⬆️
sdks/python/apache_beam/runners/common.py 89.20% <0.00%> (+0.44%) ⬆️
...ks/python/apache_beam/runners/worker/sdk_worker.py 90.11% <0.00%> (+0.47%) ⬆️
.../python/apache_beam/transforms/periodicsequence.py 98.24% <0.00%> (+1.75%) ⬆️
...ks/python/apache_beam/runners/worker/data_plane.py 91.31% <0.00%> (+1.79%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 70ddf84...17af76c. Read the comment docs.

@chamikaramj
Copy link
Contributor

R: @boyuanzz

Passing to Boyuan who is working on updating PubSub source/sink for Dataflow.

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 28, 2020

I was discussing this PR with Pablo and maybe it would be better to add it within the ReadFromPubSub itself rather than with a different PTransform. I see advantages in both approaches:

Modifying ReadFromPubSub

  • Just one PTransfrom that does both things
  • Less documentation needed

New PTransform

  • Easier to mantain, since MultipleReadFromPubSub would only expand from ReadFromPubSub, all modifications from ReadFromPubSub would directly be added.
  • Easier on different runners: if I'm not mistaken, Dataflow performs some overrides to ReadFromPubSub, so using MultipleReadFromPubSub would not be affected by this (since it expands it). Considering Dataflow is probably the main runner for this operation, we should consider this.
  • Less overhead: Having ReadFromPubSub take both list of topics/subscriptions and single topics/subscriptions may be a bit too much

It should not be hard to move this PR from a different PTransform to inside ReadFromPubSub. So please let me know what do you think about it.

Thanks!

@chamikaramj
Copy link
Contributor

The usual pattern for sources is.

(1) A transform that reads from a given source config
(2) A "ReadAll" transform that reads a PCollection of configs.

Given that PubSub is a native transform for Dataflow though, we cannot really implement (2).

I'm not really sure a composite that just wraps a Flatten adds much value since pipeline authors can easily do that themselves (we can do that for every other transform as well but that will just clutter the API in my opinion).

(2) above will be more useful and will enable new use-cases. But we cannot really do that for Dataflow.

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 29, 2020

The usual pattern for sources is.

(1) A transform that reads from a given source config
(2) A "ReadAll" transform that reads a PCollection of configs.

Given that PubSub is a native transform for Dataflow though, we cannot really implement (2).

I'm not really sure a composite that just wraps a Flatten adds much value since pipeline authors can easily do that themselves (we can do that for every other transform as well but that will just clutter the API in my opinion).

(2) above will be more useful and will enable new use-cases. But we cannot really do that for Dataflow.

I agree with you that adding a similar concept for all IOs would clutter the API. The main idea here is that this use case is widely shared and a lot of users are doing it themselves. This PTransform would speedup a lot their code and it would help organizing the sources better (rather than a wide pipeline graph, the sources are separated by topic/subs and by project).

Anyhow, I do understand your concern, let me know if I should proceed (by fixing the errors) or close the PR.

Thanks again!

@boyuanzz
Copy link
Contributor

boyuanzz commented Oct 1, 2020

Except what Cham has mentioned, another thing is current implementation of MultipleReadFromPubSub only can configure multiple ReadFromPubSub with the same attribute, like the same with_attributes, timestamp_label, id_label, which is not ideal. Given that ReadPubSub is a native transform for Dataflow, having MultipleReadFromPubSub seems like the only solution for now. I'm thinking we could create a PubSubSourceDescriptor which includes topic, subscription and other attributes. And we expose add API from MultipleReadFromPubSub to allow end users to add a new Read.

@aaltay aaltay requested a review from boyuanzz October 8, 2020 18:41
@InigoSJ
Copy link
Contributor Author

InigoSJ commented Oct 9, 2020

Run Python PreCommit

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Oct 9, 2020

retest this please

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Oct 9, 2020

@boyuanzz , I am not aware on how to retry the 3 failing tests, they were successful in the previous commit, but with the new one (that only adds the line to disable Pylint in a block) they failed. Any idea?

Thanks!

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Oct 11, 2020

Run Python PreCommit

1 similar comment
@InigoSJ
Copy link
Contributor Author

InigoSJ commented Oct 12, 2020

Run Python PreCommit

@boyuanzz
Copy link
Contributor

The Python Unit tests is not triggered by Run Python PreCommit. You can click Details and there is a re-run all tasks button to rerun these tests.

sdks/python/apache_beam/io/gcp/pubsub.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/io/gcp/pubsub.py Outdated Show resolved Hide resolved
step_name_base = 'PubSub %s/project:%s' % (source_type, source_project)
read_step_name = '%s/Read %s' % (step_name_base, source_name)

if source_type == 'topics':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PubSubSource has the similar checking logic as here. We should be able to move this check.

sdks/python/apache_beam/io/gcp/pubsub.py Outdated Show resolved Hide resolved
@InigoSJ
Copy link
Contributor Author

InigoSJ commented Nov 4, 2020

Run Python_PVR_Flink PreCommit

1 similar comment
@InigoSJ
Copy link
Contributor Author

InigoSJ commented Nov 9, 2020

Run Python_PVR_Flink PreCommit

Copy link
Contributor

@boyuanzz boyuanzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address comments and resolve the commit conflicts. The PVR flink should not be related.

sdks/python/apache_beam/io/gcp/pubsub.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/io/gcp/pubsub.py Show resolved Hide resolved
sdks/python/apache_beam/io/gcp/pubsub.py Outdated Show resolved Hide resolved
timestamp_attribute: str = None


class MultipleReadFromPubSub(PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a code snippet about how to use this transform and highlight the benefit of using this transform compared to the ReadFromPubSub.

def expand(self, pcol):
sources_pcol = []
for source in self.source_list:
source_split = source.source.split('/')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using split, can we use re.match(TOPIC_REGEXP, source.source) and re.match(SUBSCRIPTION_REGEXP, source.source) as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a new regex (PUBSUB_DESCRIPTOR_REGEXP) that is valid for both, so I could use match.group to check if topic or subscription, let me know what you think.

@boyuanzz
Copy link
Contributor

Run Python_PVR_Flink PreCommit

@boyuanzz
Copy link
Contributor

Please address the conflicting files. @InigoSJ

Copy link
Contributor

@boyuanzz boyuanzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This PR almost looks good me except one additional comment and merge conflicts.

@@ -20,6 +20,27 @@
Cloud Pub/Sub sources and sinks are currently supported only in streaming
pipelines, during remote execution.

Multiple Read from Pub/Sub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this into the MultipleReadFromPubSub class py doc.

@boyuanzz
Copy link
Contributor

Run Python_PVR_Flink PreCommit

Copy link
Contributor

@boyuanzz boyuanzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! Before merging, please squash all commits into one commit. You can do so by:

git rebase -i HEAD~14
git push -f

@boyuanzz boyuanzz merged commit 682f2ea into apache:master Nov 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants