From 201dad64c7fa599593d86196d4b0a6877c6b2fbc Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Mon, 3 Jun 2019 15:37:27 -0700 Subject: [PATCH 1/4] Combine tasks to scan up to target split size when parquet row group information is available --- .../org/apache/iceberg/BaseFileScanTask.java | 77 +++++++++++++++---- ...TestOffsetsBasedSplitScanTaskIterator.java | 35 ++++++--- 2 files changed, 86 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index b90805c737a0..20eb487caefb 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -22,8 +22,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; @@ -71,12 +73,12 @@ public Expression residual() { } @Override - public Iterable split(long splitSize) { + public Iterable split(long targetSplitSize) { if (file.format().isSplittable()) { if (file.splitOffsets() != null) { - return () -> new OffsetsBasedSplitScanTaskIterator(file.splitOffsets(), this); + return () -> new OffsetsAwareTargetSplitSizeScanTaskIterator(file.splitOffsets(), this, targetSplitSize); } else { - return () -> new FixedSizeSplitScanTaskIterator(splitSize, this); + return () -> new FixedSizeSplitScanTaskIterator(targetSplitSize, this); } } return ImmutableList.of(this); @@ -91,31 +93,76 @@ public String toString() { .toString(); } + + /** + * This iterator returns {@link FileScanTask} using guidance provided by split offsets. + */ @VisibleForTesting - static final class OffsetsBasedSplitScanTaskIterator implements Iterator { - private final List splitOffsets; + static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator { + private final List offsets; + private final List splitSizes; private final FileScanTask parentScanTask; - private int idx = 0; - - OffsetsBasedSplitScanTaskIterator(List splitOffsets, FileScanTask fileScanTask) { - this.splitOffsets = splitOffsets; - this.parentScanTask = fileScanTask; + private final long targetSplitSize; + private int offsetIdx = 0; + private int sizeIdx = 0; + + OffsetsAwareTargetSplitSizeScanTaskIterator( + List offsetList, + FileScanTask parentScanTask, + long targetSplitSize + ) { + this.offsets = ImmutableList.copyOf(offsetList); + this.parentScanTask = parentScanTask; + this.targetSplitSize = targetSplitSize; + this.splitSizes = new ArrayList<>(offsetList.size()); + int idx = 0; + while (idx < offsets.size()) { + splitSizes.add(getSplitSize(idx)); + idx++; + } } @Override public boolean hasNext() { - return idx < splitOffsets.size(); + return sizeIdx < splitSizes.size(); } @Override public FileScanTask next() { - long start = splitOffsets.get(idx); - idx++; - long end = hasNext() ? splitOffsets.get(idx) : parentScanTask.length(); - return new SplitScanTask(start, end - start, parentScanTask); + if (!hasNext()) { + throw new NoSuchElementException(); + } + // We always pick the current split even if it potentially exceeds the target split size + long currentSize = splitSizes.get(sizeIdx); + FileScanTask combinedTask; + sizeIdx++; + while (hasNext()) { + if (currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) { + currentSize += splitSizes.get(sizeIdx); + sizeIdx++; + } else { + combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); + offsetIdx = sizeIdx; + return combinedTask; + } + } + combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); + return combinedTask; + } + + private long getSplitSize(int idx) { + long nextOffset = (idx + 1) < offsets.size() ? offsets.get(idx + 1) : parentScanTask.length(); + return nextOffset - offsets.get(idx); } } + /** + * This iterator returns {@link FileScanTask} that generate tasks that scan fixed amount of data are generated using + * the guidance use split + * offset + * information + * available + */ @VisibleForTesting static final class FixedSizeSplitScanTaskIterator implements Iterator { private long offset; diff --git a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java index 0b70ebbf83c3..00f20203b99c 100644 --- a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java +++ b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java @@ -25,26 +25,39 @@ import org.junit.Test; public class TestOffsetsBasedSplitScanTaskIterator { + @Test public void testSplits() { // case when the last row group has more than one byte - verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, asList( - asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L), - asList(45L, 3L))); + verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 48L, 20, asList( + asList(4L, 14L), asList(18L, 12L), asList(30L, 18L))); // case when the last row group has 1 byte - verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, asList( - asList(4L, 6L), asList(10L, 5L), asList(15L, 3L), asList(18L, 12L), asList(30L, 15L), - asList(45L, 1L))); + verify(asList(4L, 10L, 15L, 18L, 30L, 45L), 46L, 20, asList( + asList(4L, 14L), asList(18L, 12L), asList(30L, 16L))); + + // case when every row group is of target split size + verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 124L, 20, asList( + asList(4L, 20L), asList(24L, 20L), asList(44L, 20L), + asList(64L, 20L), asList(84L, 20L), asList(104L, 20L))); + + // case when every row group except last one is of target split size + verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 20, asList( + asList(4L, 20L), asList(24L, 20L), asList(44L, 20L), + asList(64L, 20L), asList(84L, 20L), asList(104L, 4L))); - // case when there is only one row group - verify(asList(4L), 48L, asList( - asList(4L, 44L))); + // case when target split size is smaller than splits determined by offset boundaries + verify(asList(4L, 24L, 44L, 64L, 84L, 104L), 108L, 2, asList( + asList(4L, 20L), asList(24L, 20L), asList(44L, 20L), + asList(64L, 20L), asList(84L, 20L), asList(104L, 4L))); } - private static void verify(List offsetRanges, long fileLen, List> offsetLenPairs) { + private static void verify(List offsetRanges, long fileLen, + long targetSplitSize, List> offsetLenPairs) { List tasks = Lists.newArrayList( - new BaseFileScanTask.OffsetsBasedSplitScanTaskIterator(offsetRanges, new MockFileScanTask(fileLen))); + new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(offsetRanges, + new MockFileScanTask(fileLen), + targetSplitSize)); Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(), tasks.size()); for (int i = 0; i < tasks.size(); i++) { FileScanTask task = tasks.get(i); From 4173d53fb61c2e6a534e40899ee5439e0d2bc2b1 Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Mon, 3 Jun 2019 15:44:07 -0700 Subject: [PATCH 2/4] Cleanup --- .../src/main/java/org/apache/iceberg/BaseFileScanTask.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index 20eb487caefb..fb59f48acf37 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -156,13 +156,6 @@ private long getSplitSize(int idx) { } } - /** - * This iterator returns {@link FileScanTask} that generate tasks that scan fixed amount of data are generated using - * the guidance use split - * offset - * information - * available - */ @VisibleForTesting static final class FixedSizeSplitScanTaskIterator implements Iterator { private long offset; From 457e2fb337e6ff022e9137f08481727f616431be Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Mon, 3 Jun 2019 16:28:25 -0700 Subject: [PATCH 3/4] Address code review comments --- .../org/apache/iceberg/BaseFileScanTask.java | 38 ++++++------------- ...TestOffsetsBasedSplitScanTaskIterator.java | 6 +-- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index fb59f48acf37..848cc53c8dc9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -107,19 +107,16 @@ static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterat private int sizeIdx = 0; OffsetsAwareTargetSplitSizeScanTaskIterator( - List offsetList, - FileScanTask parentScanTask, - long targetSplitSize - ) { + List offsetList, FileScanTask parentScanTask, long targetSplitSize) { this.offsets = ImmutableList.copyOf(offsetList); this.parentScanTask = parentScanTask; this.targetSplitSize = targetSplitSize; - this.splitSizes = new ArrayList<>(offsetList.size()); - int idx = 0; - while (idx < offsets.size()) { - splitSizes.add(getSplitSize(idx)); - idx++; + this.splitSizes = new ArrayList<>(offsets.size()); + int lastIndex = offsets.size() - 1; + for (int index = 0; index < lastIndex; index++) { + splitSizes.add(offsets.get(index + 1) - offsets.get(index)); } + splitSizes.add(parentScanTask.length() - offsets.get(lastIndex)); } @Override @@ -132,28 +129,17 @@ public FileScanTask next() { if (!hasNext()) { throw new NoSuchElementException(); } - // We always pick the current split even if it potentially exceeds the target split size long currentSize = splitSizes.get(sizeIdx); - FileScanTask combinedTask; - sizeIdx++; - while (hasNext()) { - if (currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) { - currentSize += splitSizes.get(sizeIdx); - sizeIdx++; - } else { - combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); - offsetIdx = sizeIdx; - return combinedTask; - } + sizeIdx += 1; // always consume at least one file split + while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) { + currentSize += splitSizes.get(sizeIdx); + sizeIdx += 1; } - combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); + FileScanTask combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); + offsetIdx = sizeIdx; return combinedTask; } - private long getSplitSize(int idx) { - long nextOffset = (idx + 1) < offsets.size() ? offsets.get(idx + 1) : parentScanTask.length(); - return nextOffset - offsets.get(idx); - } } @VisibleForTesting diff --git a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java index 00f20203b99c..f5524c89c0b8 100644 --- a/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java +++ b/core/src/test/java/org/apache/iceberg/TestOffsetsBasedSplitScanTaskIterator.java @@ -25,7 +25,6 @@ import org.junit.Test; public class TestOffsetsBasedSplitScanTaskIterator { - @Test public void testSplits() { // case when the last row group has more than one byte @@ -55,9 +54,8 @@ public void testSplits() { private static void verify(List offsetRanges, long fileLen, long targetSplitSize, List> offsetLenPairs) { List tasks = Lists.newArrayList( - new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator(offsetRanges, - new MockFileScanTask(fileLen), - targetSplitSize)); + new BaseFileScanTask.OffsetsAwareTargetSplitSizeScanTaskIterator( + offsetRanges, new MockFileScanTask(fileLen), targetSplitSize)); Assert.assertEquals("Number of tasks don't match", offsetLenPairs.size(), tasks.size()); for (int i = 0; i < tasks.size(); i++) { FileScanTask task = tasks.get(i); From 70ee9335dd9823c5f566b1bad90b4ab510b6ca3f Mon Sep 17 00:00:00 2001 From: Samarth Jain Date: Mon, 3 Jun 2019 21:38:11 -0700 Subject: [PATCH 4/4] offsetIdx doesn't need to be an instance variable --- core/src/main/java/org/apache/iceberg/BaseFileScanTask.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index 848cc53c8dc9..77ae7a65162f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -93,7 +93,6 @@ public String toString() { .toString(); } - /** * This iterator returns {@link FileScanTask} using guidance provided by split offsets. */ @@ -103,7 +102,6 @@ static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterat private final List splitSizes; private final FileScanTask parentScanTask; private final long targetSplitSize; - private int offsetIdx = 0; private int sizeIdx = 0; OffsetsAwareTargetSplitSizeScanTaskIterator( @@ -129,6 +127,7 @@ public FileScanTask next() { if (!hasNext()) { throw new NoSuchElementException(); } + int offsetIdx = sizeIdx; long currentSize = splitSizes.get(sizeIdx); sizeIdx += 1; // always consume at least one file split while (sizeIdx < splitSizes.size() && currentSize + splitSizes.get(sizeIdx) <= targetSplitSize) { @@ -136,7 +135,6 @@ public FileScanTask next() { sizeIdx += 1; } FileScanTask combinedTask = new SplitScanTask(offsets.get(offsetIdx), currentSize, parentScanTask); - offsetIdx = sizeIdx; return combinedTask; }