Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Major classes for Spark continuous streaming #396

Merged
merged 31 commits into from
Dec 10, 2020

Conversation

jiangmichaellll
Copy link
Contributor

This implements major classes for spark continuous streaming.

@jiangmichaellll jiangmichaellll requested review from a team as code owners December 3, 2020 04:48
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Dec 3, 2020
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite API. label Dec 3, 2020
Base automatically changed from jiangmichael-spark-utils to master December 3, 2020 23:34
private final MultiPartitionCommitter committer;
private PslSourceOffset startOffset;

public PslContinuousReader(PslDataSourceOptions options) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't belong here, this is wiring code. Remove this constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have this so I could keep code in PslDataSource super simple.

AdminServiceClient adminServiceClient,
CursorServiceClient cursorServiceClient,
MultiPartitionCommitter committer) {
this.options = options;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't take the options, take the things you need from it.

Copy link
Contributor Author

@jiangmichaellll jiangmichaellll Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for lower level (PslContinuousInputPartition) for many params in the option(subPath, flowctrl, creds), prefer not to disassemble it early.

assert PslSourceOffset.class.isAssignableFrom(start.get().getClass())
: "start offset is not assignable to PslSourceOffset.";
startOffset = (PslSourceOffset) start.get();
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these methods need to be thread safe?

Copy link
Contributor Author

@jiangmichaellll jiangmichaellll Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are called serially by the order of deserializeoffset (if writeahead log has any from previous query), setStartOffset, getStartOffset, planInputPartitions... get things done... mergeOffsets, commit.

@jiangmichaellll jiangmichaellll changed the base branch from master to jiangmichael-spark-offsets December 4, 2020 07:04
@jiangmichaellll
Copy link
Contributor Author

jiangmichaellll commented Dec 7, 2020

There is an extra issue in this PR. The buffering pull subscriber will lead to an unbounded message cache inside, thus not respecting the flow control. Please ignore this for this PR. I have another PR to address and make it bounded.

EDIT: #408 is the PR to make it bounded but let's finish this before I assign that.

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder credentialsKey(String credentialsKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. these methods should be called "set"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

@dpcollins-google dpcollins-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebase this, fix the few comments, and then ping me back for approval

Base automatically changed from jiangmichael-spark-offsets to master December 9, 2020 06:22
@codecov
Copy link

codecov bot commented Dec 10, 2020

Codecov Report

Merging #396 (fabdebe) into master (63c34f0) will decrease coverage by 0.46%.
The diff coverage is 37.96%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #396      +/-   ##
============================================
- Coverage     72.10%   71.63%   -0.47%     
- Complexity      845      871      +26     
============================================
  Files           158      163       +5     
  Lines          4399     4573     +174     
  Branches        222      226       +4     
============================================
+ Hits           3172     3276     +104     
- Misses         1104     1168      +64     
- Partials        123      129       +6     
Impacted Files Coverage Δ Complexity Δ
.../pubsublite/spark/PslContinuousInputPartition.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...m/google/cloud/pubsublite/spark/PslDataSource.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...e/cloud/pubsublite/spark/PslDataSourceOptions.java 11.11% <6.45%> (+11.11%) 2.00 <0.00> (+2.00)
...le/cloud/pubsublite/spark/PslContinuousReader.java 42.42% <42.42%> (ø) 10.00 <10.00> (?)
...blite/spark/PslContinuousInputPartitionReader.java 65.62% <65.62%> (ø) 6.00 <6.00> (?)
.../pubsublite/spark/MultiPartitionCommitterImpl.java 80.00% <80.00%> (ø) 6.00 <6.00> (?)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 63c34f0...2950e08. Read the comment docs.

@jiangmichaellll jiangmichaellll merged commit 0c0d928 into master Dec 10, 2020
@jiangmichaellll jiangmichaellll deleted the jiangmichael-spark-continuous-processing branch December 10, 2020 22:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants