Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;

Expand Down Expand Up @@ -71,12 +73,12 @@ public Expression residual() {
}

@Override
public Iterable<FileScanTask> split(long splitSize) {
public Iterable<FileScanTask> split(long targetSplitSize) {
if (file.format().isSplittable()) {
if (file.splitOffsets() != null) {
return () -> new OffsetsBasedSplitScanTaskIterator(file.splitOffsets(), this);
return () -> new OffsetsAwareTargetSplitSizeScanTaskIterator(file.splitOffsets(), this, targetSplitSize);
} else {
return () -> new FixedSizeSplitScanTaskIterator(splitSize, this);
return () -> new FixedSizeSplitScanTaskIterator(targetSplitSize, this);
}
}
return ImmutableList.of(this);
Expand All @@ -91,29 +93,51 @@ public String toString() {
.toString();
}

/**
* This iterator returns {@link FileScanTask} using guidance provided by split offsets.
*/
@VisibleForTesting
static final class OffsetsBasedSplitScanTaskIterator implements Iterator<FileScanTask> {
private final List<Long> splitOffsets;
static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator<FileScanTask> {
private final List<Long> offsets;
private final List<Long> splitSizes;
private final FileScanTask parentScanTask;
private int idx = 0;

OffsetsBasedSplitScanTaskIterator(List<Long> splitOffsets, FileScanTask fileScanTask) {
this.splitOffsets = splitOffsets;
this.parentScanTask = fileScanTask;
private final long targetSplitSize;
private int sizeIdx = 0;

OffsetsAwareTargetSplitSizeScanTaskIterator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: style should be:

    OffsetsAwareTargetSplitSizeScanTaskIterator(
        List<Long> offsetList, FileScanTask parentScanTask, long targetSplitSize) {
      ...
    }

List<Long> offsetList, FileScanTask parentScanTask, long targetSplitSize) {
this.offsets = ImmutableList.copyOf(offsetList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this the first time: why copy the offset list? It shouldn't change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just being a bit defensive. Future implementations of the DataFile interface may not create an immutable copy before passing it downstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine since this should be an ImmutableList in the current read path and won't actually copy.

this.parentScanTask = parentScanTask;
this.targetSplitSize = targetSplitSize;
this.splitSizes = new ArrayList<>(offsets.size());
int lastIndex = offsets.size() - 1;
for (int index = 0; index < lastIndex; index++) {
splitSizes.add(offsets.get(index + 1) - offsets.get(index));
}
splitSizes.add(parentScanTask.length() - offsets.get(lastIndex));
}

@Override
public boolean hasNext() {
return idx < splitOffsets.size();
return sizeIdx < splitSizes.size();
}

@Override
public FileScanTask next() {
long start = splitOffsets.get(idx);
idx++;
long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length();
return new SplitScanTask(start, end - start, parentScanTask);
if (!hasNext()) {
throw new NoSuchElementException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for correctly implementing the contract!

}
int offsetIdx = sizeIdx;
long currentSize = splitSizes.get(sizeIdx);
sizeIdx += 1; // always consume at least one file split
Copy link
Contributor

@rdblue rdblue Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is offsetIdx needed? It is always set to sizeIdx before the end of next. I think it could be a local variable instead and you could remove size from sizeIdx.

while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) {
currentSize += splitSizes.get(sizeIdx);
sizeIdx += 1;
}
FileScanTask combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask);
return combinedTask;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: another blank line

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,34 @@ public class TestOffsetsBasedSplitScanTaskIterator {
@Test
public void testSplits() {
// case when the last row group has more than one byte
verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, asList(
asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L),
asList(45L, 3L)));
verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, 20, asList(
asList(4L, 14L), asList(18L, 12L), asList(30L, 18L)));

// case when the last row group has 1 byte
verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, asList(
asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L),
asList(45L, 1L)));
verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, 20, asList(
asList(4L, 14L), asList(18L, 12L), asList(30L, 16L)));

// case when there is only one row group
verify(asList(4L), 48L, asList(
asList(4L, 44L)));
// case when every row group is of target split size
verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 124L, 20, asList(
asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
asList(64L, 20L), asList(84L, 20L), asList(104L, 20L)));

// case when every row group except last one is of target split size
verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 20, asList(
asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
asList(64L, 20L), asList(84L, 20L), asList(104L, 4L)));

// case when target split size is smaller than splits determined by offset boundaries
verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 2, asList(
asList(4L, 20L), asList(24L, 20L), asList(44L, 20L),
asList(64L, 20L), asList(84L, 20L), asList(104L, 4L)));
}

private static void verify(List<Long> offsetRanges, long fileLen, List<List<Long>> offsetLenPairs) {
private static void verify(List<Long> offsetRanges, long fileLen,
long targetSplitSize, List<List<Long>> offsetLenPairs) {
List<FileScanTask> tasks = Lists.newArrayList(
new BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new MockFileScanTask(fileLen)));
new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(
offsetRanges, new MockFileScanTask(fileLen), targetSplitSize));
Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(), tasks.size());
for (int i = 0; i < tasks.size(); i++) {
FileScanTask task = tasks.get(i);
Expand Down