Skip to content

Commit

Permalink
Spark 3.4: Adaptive split size
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Jun 6, 2023
1 parent 04b3e6b commit 4a7468e
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 2 deletions.
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ private TableProperties() {}
public static final String SPLIT_SIZE = "read.split.target-size";
public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB

public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;

public static final String ADAPTIVE_SPLIT_MIN_SIZE_BYTES = "read.split.min-adaptive-size-bytes";
public static final long ADAPTIVE_SPLIT_MIN_SIZE_BYTES_DEFAULT = 16 * 1024 * 1024; // 16 MB

public static final String ADAPTIVE_SPLIT_MAX_SIZE_BYTES = "read.split.max-adaptive-size-bytes";
public static final long ADAPTIVE_SPLIT_MAX_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024; // 512 MB

public static final String METADATA_SPLIT_SIZE = "read.split.metadata-target-size";
public static final long METADATA_SPLIT_SIZE_DEFAULT = 32 * 1024 * 1024; // 32 MB

Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,34 @@ public static <T extends ScanTask> List<T> mergeTasks(List<T> tasks) {
return mergedTasks;
}

public static long computeSplitSize(
long scanSize, int parallelism, long minSplitSize, long maxSplitSize) {

Preconditions.checkArgument(
minSplitSize < maxSplitSize,
"Min split size (%s) must be < max split size (%s)",
minSplitSize,
maxSplitSize);

// aim for a split per slot by default
int splitCount = parallelism;
long splitSize = scanSize / splitCount;

// if the target split size is too big, try to plan for 2x, 3x, etc parallelism
// to constantly utilize all available cluster resources and avoid idle slots
while (splitSize > maxSplitSize) {
splitCount += parallelism;
splitSize = scanSize / splitCount;
}

// split packing won't be perfect so increase the target split size by 10%
// to prevent creating just few extra tasks beyond the parallelism
long adjustedSplitSize = (long) (1.1 * splitSize);

// ensure the split size is big enough
return Math.max(minSplitSize, adjustedSplitSize);
}

private static void validatePlanningArguments(long splitSize, int lookback, long openFileCost) {
Preconditions.checkArgument(splitSize > 0, "Split size must be > 0: %s", splitSize);
Preconditions.checkArgument(lookback > 0, "Split planning lookback must be > 0: %s", lookback);
Expand Down
47 changes: 47 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,53 @@ private DeleteFile[] deleteFilesWithSizes(long... sizes) {
.toArray(DeleteFile[]::new);
}

@Test
public void testAdaptiveSplitSize() {
long scanSize = 750L * 1024 * 1024 * 1024; // 750 GB
long minSplitSize = 16 * 1024 * 1024; // 16 MB
long maxSplitSize = 512 * 1024 * 1024; // 512 MB

// targeting a split per slot with 80 slots would mean having splits with 9+ GB of data
// that's why the utility would target for 19 * 80 = 1520 splits
// it is the smallest number of splits that would produce a split size smaller than the maximum
int smallParallelism = 80;
long expectedSplitSizeSmallParallelism = (long) (1.1 * (scanSize / 1520));
Assert.assertEquals(
"Split size must match",
expectedSplitSizeSmallParallelism,
TableScanUtil.computeSplitSize(scanSize, smallParallelism, minSplitSize, maxSplitSize));

// targeting a split per slot with 5K slots would produce well-sized splits
int reasonableParallelism = 5_000;
long expectedSplitSizeReasonableParallelism = (long) (1.1 * (scanSize / reasonableParallelism));
Assert.assertEquals(
"Split size must match",
expectedSplitSizeReasonableParallelism,
TableScanUtil.computeSplitSize(
scanSize, reasonableParallelism, minSplitSize, maxSplitSize));

// targeting a split per slot with 100K slots would produce tiny splits
// that's why the min split size is used
int bigParallelism = 100_000;
Assert.assertEquals(
"Split size must match",
minSplitSize,
TableScanUtil.computeSplitSize(scanSize, bigParallelism, minSplitSize, maxSplitSize));
}

@Test
public void testInvalidMinMaxAdaptiveSplitSize() {
long scanSize = 750L * 1024 * 1024 * 1024; // 750 GB
int parallelism = 100;
long minSplitSize = 512 * 1024 * 1024; // 512 MB
long maxSplitSize = 16 * 1024 * 1024; // 16 MB

Assertions.assertThatThrownBy(
() -> TableScanUtil.computeSplitSize(scanSize, parallelism, minSplitSize, maxSplitSize))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Min split size (536870912) must be < max split size (16777216)");
}

@Test
public void testPlanTaskWithDeleteFiles() {
List<FileScanTask> testFiles =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,29 @@ public boolean aggregatePushDownEnabled() {
.defaultValue(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT)
.parse();
}

public boolean adaptiveSplitSizeEnabled() {
return confParser
.booleanConf()
.sessionConf(SparkSQLProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
.parse();
}

public long minAdaptiveSplitSize() {
return confParser
.longConf()
.tableProperty(TableProperties.ADAPTIVE_SPLIT_MIN_SIZE_BYTES)
.defaultValue(TableProperties.ADAPTIVE_SPLIT_MIN_SIZE_BYTES_DEFAULT)
.parse();
}

public long maxAdaptiveSplitSize() {
return confParser
.longConf()
.tableProperty(TableProperties.ADAPTIVE_SPLIT_MAX_SIZE_BYTES)
.defaultValue(TableProperties.ADAPTIVE_SPLIT_MAX_SIZE_BYTES_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ private SparkSQLProperties() {}
"spark.sql.iceberg.planning.preserve-data-grouping";
public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;

// Controls whether to estimate the split size adaptively based on the scan size and parallelism
public static final String ADAPTIVE_SPLIT_SIZE_ENABLED =
"spark.sql.iceberg.planning.adaptive-split-size.enabled";

// Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg
public static final String AGGREGATE_PUSH_DOWN_ENABLED =
"spark.sql.iceberg.aggregate-push-down.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
CloseableIterable<ScanTaskGroup<T>> plannedTaskGroups =
TableScanUtil.planTaskGroups(
CloseableIterable.withNoopClose(tasks()),
scan.targetSplitSize(),
targetSplitSize(),
scan.splitLookback(),
scan.splitOpenFileCost());
this.taskGroups = Lists.newArrayList(plannedTaskGroups);
Expand All @@ -212,7 +212,7 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
List<ScanTaskGroup<T>> plannedTaskGroups =
TableScanUtil.planTaskGroups(
tasks(),
scan.targetSplitSize(),
targetSplitSize(),
scan.splitLookback(),
scan.splitOpenFileCost(),
groupingKeyType());
Expand All @@ -232,6 +232,18 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
return taskGroups;
}

private long targetSplitSize() {
if (readConf().adaptiveSplitSizeEnabled()) {
long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
int parallelism = sparkContext().defaultParallelism();
long minSplitSize = readConf().minAdaptiveSplitSize();
long maxSplitSize = readConf().maxAdaptiveSplitSize();
return TableScanUtil.computeSplitSize(scanSize, parallelism, minSplitSize, maxSplitSize);
} else {
return scan.targetSplitSize();
}
}

// only task groups can be reset while resetting tasks
// the set of scanned specs and grouping key type must never change
protected void resetTasks(List<T> filteredTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
this.branch = readConf.branch();
}

protected JavaSparkContext sparkContext() {
return sparkContext;
}

protected Table table() {
return table;
}

protected SparkReadConf readConf() {
return readConf;
}

protected String branch() {
return branch;
}
Expand Down

0 comments on commit 4a7468e

Please sign in to comment.