Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add adaptive split size #7688

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

danielcweeks
Copy link
Contributor

This PR adds adaptive split planning to help address issues around small tables being collapsed into a single task due to split combining. This is achieved by reducing the split size to try to achieve a minimum parallelism based on coarse grain table-level stats.

There are a number of cases this approach cannot account for including the difference between the filtered files/size vs the full table size. The most benefit is for unpartitioned tables that either have multiple small files or can be split into smaller chunks to achieve higher parallelism.

Alternatives/additions to this approach would be to modify the bin packing algorithm to distribute across the minimum number of bins based on a reduced split size and only combine once the minimum parallelism can be achieved. This is a more complicated approach and still relies on knowing the right size to split on.

Additionally, we don't know at the time of calculating the split size whether we have offsets, so we cannot simply reduce the split size to zero (or near zero) because fixed split planning will then create too many splits.

@github-actions github-actions bot added the core label May 23, 2023
@danielcweeks danielcweeks marked this pull request as draft May 23, 2023 03:22
@danielcweeks danielcweeks changed the title Add adaptive split planning Add adaptive split size May 25, 2023
@@ -187,7 +193,10 @@ public long targetSplitSize() {
long tableValue =
PropertyUtil.propertyAsLong(
table().properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
return PropertyUtil.propertyAsLong(context.options(), TableProperties.SPLIT_SIZE, tableValue);

long splitSize = adaptiveSplitSize(tableValue).orElse(tableValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the table split size override the adaptive size? If so, we would not want to always pass in a table value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the table size is typically going to be configured as the default. Small tables won't align well with that. Even if it is set to a smaller value, it may collapse too much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand. I thought most tables didn't have a split size default explicitly set and would use the Iceberg default, TableProperties.SPLIT_SIZE_DEFAULT. Is that not the case?

What I meant was that if the split size is explicitly set (not common, I think) then we should always use it because it is static.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @rdblue, I thought by default we don't have this table property included in the metadata. The fields should be empty. If the property is explicitly set we should use the table value.

@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
public ThisT metricsReporter(MetricsReporter reporter) {
return newRefinedScan(table(), schema(), context().reportWith(reporter));
}

private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
if (!PropertyUtil.propertyAsBoolean(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this can access table properties, why read tableSplitSize outside and pass it in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move that in, it was just already being defined outside of this method.

return Optional.empty();
}

int minParallelism =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is probably a good idea to have a context option for this so we can easily pass Spark's parallelism or the Flink operator's parallelism.

Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");

Snapshot snapshot =
Stream.of(context.snapshotId(), context.toSnapshotId())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that we want to use the toSnapshotId. I'd need to think through how adaptive split planning would work for incremental reads, and in the meantime it's probably easier to just return Optional.empty instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't sure about incremental read, but this would at least get closer to the course grain table stats (e.g. older snapshots will typically have less data).

return Optional.empty();
}

if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we would want to use partition stats instead of total size / total files in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might work and I think we can always enhance once more stats are available. There's a lot of complexity that gets introduced in terms of filters, projection, partitions, etc. But I think there's a lot of opportunity to improve there as well. For example we also have column types and record counts which could lead to even smaller split sizes knowing that little of the data will be read per task.

However, it might also be more beneficial to look at enhancing bin packing because it has more insight into when/how these splits get combined.

TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));

if (!fileFormat.isSplittable()) {
return Optional.of(totalSize / totalFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, after the check above, we have a case where individual files are not enough to get the parallelism we want. So this is trying to prevent any combining, right?

Why not do that directly with a "do not combine" flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to do that because there's a big difference between lots of small files and just a few small files. We still want to combine and turning that off could have bad consequences when there are lots of small files.


if (totalFiles <= 1) {
// For a table with a single small file, default to the smallest of
// the configured table split size or the format block size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be simpler using just the default split size (not the configured one that I think should override).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is if a file is large and has multiple row groups, then we only get a single task. In most cases these will either be the same value or the data will be small and it won't matter. The only outlier is when there are actually multiple row groups.

// target split size chosen to provide the most parallelism
long targetSplitSize = Math.min(minSplitSize, rowGroupSize);

return Optional.of(targetSplitSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also plan to increase the target size if there are too many splits? Or is that not needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's generally going to be the case. Combining still happens and I'm not clear there's a situation where too many splits will be produced.

@aokolnychyi
Copy link
Contributor

I'd like to take look on Monday too.

Map<String, String> summary = snapshot.summary();
long totalFiles =
PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level. One way to do that is #7714 where we know the amount of data to scan as we first plan files and then spit/combine them into task groups.

I also think it is critical to be bi-directional. We should cover both small and large tables. A task in Spark is worth 1+ sec to create. Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size. However, the challenge right now is the same table can be accessed in a variety of ways and setting the split size in each query is not realistic.

Relying on a table property for parallelism seems like shifting the complexity of tuning the split size. It varies from query to query and from cluster to cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might be able to combine these two approaches in a reasonable way that's more generalizable.

I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level.

I agree with this. However, the most common places where this is a problem are really simple cases of unpartitioned tables with very little data. This approach will only take effect if the table size is great than minParallelism * splitSize effectively. So pretty much anything over a couple GB wouldn't be affected.

Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size.

We've seen this in a lot of cases and you may even want to adjust to higher splits sizes if you're projecting fewer or smaller columns because the calculated splits are based on the whole row group size, but processing a few int columns can be much faster than string columns.

Relying on a table property for parallelism seems like shifting the complexity of tuning the split size. It varies from query to query and from cluster to cluster.

I agree here as well, but I was hoping for a solution that wouldn't be spark specific. I'm wondering if we can put most of the logic in terms of adjusting the split size here and then pass through the relevant information (scan size, parallelism, etc.) through the scan context. That way we can leverage those properties in other engines.

@Test
public void testAdaptiveSplitPlanningWithOffests() {
table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_PLANNING, "true").commit();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have some tests here that show that we are doing the correct thing even if not enough splits are available. For example if we have 1 file with a single offset.

@@ -218,6 +237,17 @@ public void testSplitPlanningWithOffsets() {
"We should get one task per row group", 32, Iterables.size(scan.planTasks()));
}

@Test
public void testAdaptiveSplitPlanningWithOffests() {
table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_PLANNING, "true").commit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pathway needs a more test imho, we mostly deal with files with offsets so we should make sure we aren't doing anything weird in those cases. We also have an adaptive function above which has a lot of early exits and behaviors so we should make sure all of those exits are tested.

@RussellSpitzer
Copy link
Member

I think this is mentioned above, but it does feel like we are targeting this at the wrong place. If we have a min parallelism I think the controls should probably be centered around task coalescing. Currently for files with offsets we always break them into the maximal amount of offset tasks before recombining. The only real issue is for files without offsets correct? That's the only reason we may want to control the split size since they are cut up based on that property rather than actual offsets?

I wonder if it might be clearer to just have a "Offset" codepath that just works during recombination and a special codepath for non-offset filetypes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants