New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2256] Add the last previous range filter #3093
Conversation
R: @dhalperi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments, do you want me to fix-up and merge or for you to fix-up and merge?
* A MongoDB {@link BoundedSource} reading {@link Document} from a given instance. | ||
*/ | ||
@VisibleForTesting | ||
protected static class BoundedMongoDbSource extends BoundedSource<Document> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private instead of protected
@@ -294,7 +299,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { | |||
* @param additionalFilter A custom (user) additional filter to append to the range filters. | |||
* @return A list of filters containing the ranges. | |||
*/ | |||
private static List<String> splitKeysToFilters(List<Document> splitKeys, String | |||
@VisibleForTesting | |||
protected static List<String> splitKeysToFilters(List<Document> splitKeys, String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private instead of protected
|
||
lowestBound = splitKey; | ||
} | ||
return filters; | ||
} | ||
|
||
/** | ||
* Cleanly format range filter, eventually adding an user additional filter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: adding an user additional filter -> adding the users additional filter
@@ -306,30 +312,44 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { | |||
// the range from the beginning up to this split | |||
rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}", | |||
splitKey); | |||
filters.add(formatFilter(rangeFilter, additionalFilter)); | |||
} else if (i == splitKeys.size() - 1) { | |||
// this is the last split in the list, the filter defines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment should mention that we have two splits when dealing with the last element.
@@ -294,7 +299,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { | |||
* @param additionalFilter A custom (user) additional filter to append to the range filters. | |||
* @return A list of filters containing the ranges. | |||
*/ | |||
private static List<String> splitKeysToFilters(List<Document> splitKeys, String | |||
@VisibleForTesting | |||
protected static List<String> splitKeysToFilters(List<Document> splitKeys, String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: Add a precondition that checks that splitKeys is not empty since this method does not correctly handle the empty splitKeys case.
R: @lukecwik |
@jbonofre I will address the comments I made and will get it merged. |
This has been merged, please close PR. |
Thanks @lukecwik ! |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
.<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.