Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Pub/Sub Lite's I/O using UnboundedSource. #22612

Merged
merged 8 commits into from
Aug 10, 2022

Conversation

dpcollins-google
Copy link
Contributor

SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
@dpcollins-google
Copy link
Contributor Author

R: @chamikaramj

@github-actions
Copy link
Contributor

github-actions bot commented Aug 5, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

if (!subscriber.state().equals(State.RUNNING)) {
throw new IOException("Subscriber failed: ", subscriber.failureCause());
}
if (advanced) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the subscriber already set to the first record the first time the start() is called ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is if the record is ready- this seems to be the correct behavior and match KafkaUnboundedReader?

@codecov
Copy link

codecov bot commented Aug 8, 2022

Codecov Report

Merging #22612 (17fb9c0) into master (54b0784) will increase coverage by 0.01%.
The diff coverage is 63.15%.

❗ Current head 17fb9c0 differs from pull request most recent head 6ccf85d. Consider uploading reports for the commit 6ccf85d to get more accurate results

@@            Coverage Diff             @@
##           master   #22612      +/-   ##
==========================================
+ Coverage   74.19%   74.20%   +0.01%     
==========================================
  Files         706      708       +2     
  Lines       93229    93462     +233     
==========================================
+ Hits        69168    69355     +187     
- Misses      22793    22832      +39     
- Partials     1268     1275       +7     
Flag Coverage Δ
go 51.52% <56.97%> (+0.02%) ⬆️
python 83.61% <72.56%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/core/runtime/exec/input.go 51.96% <ø> (+0.99%) ⬆️
sdks/go/pkg/beam/runners/direct/direct.go 65.76% <0.00%> (-0.60%) ⬇️
sdks/go/pkg/beam/validate.go 42.10% <0.00%> (ø)
...on/apache_beam/examples/complete/juliaset/setup.py 0.00% <0.00%> (ø)
...m/examples/inference/pytorch_image_segmentation.py 0.00% <ø> (ø)
...ython/apache_beam/examples/kafkataxi/kafka_taxi.py 0.00% <ø> (ø)
sdks/python/apache_beam/examples/sql_taxi.py 0.00% <ø> (ø)
...python/apache_beam/examples/wordcount_xlang_sql.py 0.00% <ø> (ø)
sdks/python/apache_beam/io/azure/blobstorageio.py 26.88% <0.00%> (ø)
...thon/apache_beam/ml/inference/pytorch_inference.py 0.00% <0.00%> (ø)
... and 32 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
@chamikaramj
Copy link
Contributor

Probably someone from Dataflow streaming team should take a quick look as well. @scwhittle will you be able to check ?

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnboundedSource implementation looks good to me but I'd prefer someone more familiar with Java to review as well

SDF is not well supported by the default mode of dataflow, and UnboundedSource is supported by all modes.
@chamikaramj
Copy link
Contributor

Run PostCommit_Java_Dataflow

@chamikaramj
Copy link
Contributor

Run PostCommit_Java_DataflowV2

@chamikaramj
Copy link
Contributor

Thanks Sam. I think we can merge this when tests pass. LGTM.

Based on an offline discussion with Daniel, we'll be adding a transform override in a follow-up PR to keep SDF version the default while overriding with UnboundedSource-based version for production (Runner v1) Dataflow.

@dpcollins-google
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@chamikaramj
Copy link
Contributor

Run PostCommit_Java_Dataflow

@dpcollins-google
Copy link
Contributor Author

Run PostCommit_Java_Dataflow

@dpcollins-google
Copy link
Contributor Author

Run PostCommit_Java_DataflowV2

@chamikaramj
Copy link
Contributor

Run Java_GCP_IO_Direct PreCommit

@dpcollins-google
Copy link
Contributor Author

Run PostCommit_Java_DataflowV2

@dpcollins-google
Copy link
Contributor Author

Run PostCommit_Java_Dataflow

@chamikaramj
Copy link
Contributor

Pls fix spotless.

@@ -18,7 +18,6 @@
17:22:48 package·org.apache.beam.sdk.io.gcp.pubsublite.internal;
17:22:48
17:22:48 import·static·org.junit.Assert.assertEquals;
17:22:48 -import·static·org.junit.Assert.assertThrows;
17:22:48 import·static·org.mockito.Mockito.verify;

@dpcollins-google
Copy link
Contributor Author

Run PostCommit_Java_DataflowV2

@dpcollins-google
Copy link
Contributor Author

Run PostCommit_Java_Dataflow

@dpcollins-google
Copy link
Contributor Author

Pls fix spotless.

@@ -18,7 +18,6 @@ 17:22:48 package·org.apache.beam.sdk.io.gcp.pubsublite.internal; 17:22:48 17:22:48 import·static·org.junit.Assert.assertEquals; 17:22:48 -import·static·org.junit.Assert.assertThrows; 17:22:48 import·static·org.mockito.Mockito.verify;

Done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants