Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11266] Python IO MongoDB: add bucket_auto aggregation option for bundling in Atlas. #13350

Merged

Conversation

nikie
Copy link
Contributor

@nikie nikie commented Nov 15, 2020

This fixes issue BEAM-11266 allowing to use ReadFromMongoDB with MongoDB Atlas by optionally using @bucketAuto MongoDB aggregation instead of splitVector:

  pipeline | ReadFromMongoDB(uri='mongodb+srv://user:pwd@cluster0.mongodb.net',
                             db='testdb',
                             coll='input',
                             bucket_auto=True)

This enhancement is based on the solution, provided by Susumu Asaga in this comment to the issue BEAM-4567, related to Java MongoDB connector.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@nikie
Copy link
Contributor Author

nikie commented Nov 15, 2020

Hi! Could someone, review this, please?
R: @chamikaramj
R: @aaltay
R: @y1chi

@chamikaramj
Copy link
Contributor

@y1chi can you take a look ?

@iemejia
Copy link
Member

iemejia commented Nov 16, 2020

@nikie in case you have the extra cycles to create a PR for the Java connector I will be glad to take a look

# single document not splittable
return []
size = self.estimate_size()
bucket_count = size // desired_chunk_size
Copy link
Contributor

@y1chi y1chi Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split function will likely be called recursively for dynamic rebalancing, so for a range with start_pos and end_pos, it can be further split upon backend request, so it might not be reasonable to always use the total collection size divided by desired_chunk_size to calculate the bucket count. Is it possible to only get the buckets within the give _id range? and we can probably use an average document size times the number of documents to calculate the size of the range being split.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @y1chi!
I will look closer how it works in Java sdk and will try to filter by _id ranges.

if size % desired_chunk_size != 0:
bucket_count += 1
with beam.io.mongodbio.MongoClient(self.uri, **self.spec) as client:
buckets = list(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return buckets should guarantee the _id range is start_pos and end_pos otherwise same document could be read multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed - return buckets cover all the requested range.

@nikie
Copy link
Contributor Author

nikie commented Nov 17, 2020

@nikie in case you have the extra cycles to create a PR for the Java connector I will be glad to take a look

@iemejia, Java connector already has this feature implemented - withBucketAuto method.

@iemejia
Copy link
Member

iemejia commented Nov 18, 2020

Thanks @nikie I have forgottent this was already fixed (and didn't check 🥵 ) !

…r bundling in Atlas.

Bucket_auto mode:
- Respects dynamic rebalancing, filter by _id range in each split.
- Respects custom filter by merging it with the _id range filter, so
  that splits hold similar number of docs actually matching the filter
  (not possible in splitVector mode).
- Estimates bundle size for non-initial splits by counting docs with
  filters applied and using 'avgObjSize' from MongoDB collstats.
- Uses bundle generator common with splitVector mode for clarity of
  covering all the same cases.

Misc:
- Refactor _merge_id_filter to use '$and' only if necessary.
- Fix one-off issue with single-document-not-splittable checks
  for both bucket_auto and splitVector modes (unit test added,
  before the fix if branches were unreachable).

Unit tests:
- Increase coverage and sanity.
- Refactor collection mock filter and projection handling.

Integration tests:
- Add read cases: splitVector/bucket_auto * filter/no-filter.
- Add checks for expected docs count.
@nikie
Copy link
Contributor Author

nikie commented Nov 22, 2020

@y1chi
I have implemented your suggested changes and more (see the last commit message for more details):

  • auto-bucketing respects not only _id range, but also custom filter for both docs counting and the aggregation (this might feel like an overhead, but should provide more precise splits);
  • improved unit and integration tests.

Java's MongoDBIO works differently:

  • there is a numSplits option which controls the number of auto buckets (10 by default) and the number of splitVector buckets if set;
  • does not estimate desired bundle size for auto bucketing, only for splitVector mode if numSplits is not provided and recalculates bundle size based on numSplits if it is provided;
  • does not use custom filter for auto bucketing, only filters the actual reads as per the split buckets;
  • does not have start/stop logic for dynamic rebalancing.

@y1chi
Copy link
Contributor

y1chi commented Nov 23, 2020

Run Python MongoDBIO_IT

Copy link
Contributor

@y1chi y1chi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the contribution!

@y1chi
Copy link
Contributor

y1chi commented Nov 23, 2020

@chamikaramj Cham do you mind help merging the PR?

@aaltay aaltay merged commit 67339a9 into apache:master Nov 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants