-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Combine tasks to scan up to target split size using parquet row group information #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…information is available
| this.targetSplitSize = targetSplitSize; | ||
| this.splitSizes = new ArrayList<>(offsetList.size()); | ||
| int idx = 0; | ||
| while (idx < offsets.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use a for loop instead of a while?
| return combinedTask; | ||
| } | ||
|
|
||
| private long getSplitSize(int idx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this were left in the for loop, you could handle the last offset outside the loop instead of using the check here:
int lastIndex = offsets.size() - 1
for (int index = 0; index < lastIndex; index += 1) {
splitSizes.add(offsets.get(index + 1) - offsets.get(index));
}
splitSizes.add(parentScanTask.length() - offsets.get(lastIndex));| long currentSize = splitSizes.get(sizeIdx); | ||
| FileScanTask combinedTask; | ||
| sizeIdx++; | ||
| while (hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better for maintaining this over time if this didn't make assumptions about how hasNext is implemented. You should probably copy that check here.
| return combinedTask; | ||
| } | ||
| } | ||
| combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't update offsetIdx and it isn't obvious at first why that is okay (because splitSizes is finished). I think it would be better to simplify this logic a little bit to have only one return statement. Like this:
long currentSize = splitSizes.get(sizeIdx);
sizeIdx += 1; // always consume at least one file split
while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) {
currentSize += splitSizes.get(sizeIdx);
sizeIdx += 1;
}
combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask);
offsetIdx = sizeIdx;
return combinedTask;That way, the behavior is always the same for all splits.
| long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length(); | ||
| return new SplitScanTask(start, end - start, parentScanTask); | ||
| if (!hasNext()) { | ||
| throw new NoSuchElementException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for correctly implementing the contract!
| private int offsetIdx = 0; | ||
| private int sizeIdx = 0; | ||
|
|
||
| OffsetsAwareTargetSplitSizeScanTaskIterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: style should be:
OffsetsAwareTargetSplitSizeScanTaskIterator(
List<Long> offsetList, FileScanTask parentScanTask, long targetSplitSize) {
...
}| long targetSplitSize, List<List<Long>> offsetLenPairs) { | ||
| List<FileScanTask> tasks = Lists.newArrayList( | ||
| new BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new MockFileScanTask(fileLen))); | ||
| new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(offsetRanges, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: style should be to wrap at the function call and place all arguments on the next line.
| import org.junit.Test; | ||
|
|
||
| public class TestOffsetsBasedSplitScanTaskIterator { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unless this was required by lint checks, could we avoid adding newlines? They can cause conflicts.
|
|
||
| OffsetsAwareTargetSplitSizeScanTaskIterator( | ||
| List<Long> offsetList, FileScanTask parentScanTask, long targetSplitSize) { | ||
| this.offsets = ImmutableList.copyOf(offsetList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this the first time: why copy the offset list? It shouldn't change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just being a bit defensive. Future implementations of the DataFile interface may not create an immutable copy before passing it downstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine since this should be an ImmutableList in the current read path and won't actually copy.
| throw new NoSuchElementException(); | ||
| } | ||
| long currentSize = splitSizes.get(sizeIdx); | ||
| sizeIdx += 1; // always consume at least one file split |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is offsetIdx needed? It is always set to sizeIdx before the end of next. I think it could be a local variable instead and you could remove size from sizeIdx.
| offsetIdx = sizeIdx; | ||
| return combinedTask; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: another blank line
| .toString(); | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra blank line
cb3dd11 to
70ee933
Compare
|
Merged. Thanks for fixing this, @samarthjain! |
* matf-non-flattening-mongodb-debezium-smt - adds debezium mongo SMT for converting BSON before/after into typed Struct before/after (cherry picked from commit 21d741e53ce77547edbb5838f1b2b49db619be0c)
* matf-non-flattening-mongodb-debezium-smt - adds debezium mongo SMT for converting BSON before/after into typed Struct before/after (cherry picked from commit 21d741e53ce77547edbb5838f1b2b49db619be0c)
* matf-non-flattening-mongodb-debezium-smt - adds debezium mongo SMT for converting BSON before/after into typed Struct before/after (cherry picked from commit 21d741e53ce77547edbb5838f1b2b49db619be0c)
No description provided.