Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Adaptive split size #7714

Merged
merged 1 commit into from Jul 28, 2023

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented May 26, 2023

This PR is an alternative to #7688 and what was initially envisioned by #7465.

@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
return taskGroups;
}

private long targetSplitSize() {
long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
int parallelism = sparkContext().defaultParallelism();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the number of cores in the cluster or spark.default.parallelism if set explicitly.

@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
return taskGroups;
}

private long targetSplitSize() {
long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would even handle runtime filtering, so that the number of splits after runtime filtering may be different.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Jun 23, 2023

I gave it a bit of testing on a cluster. In some cases, I experienced quite some degradation when the split size was adjusted to a higher value. The shuffle write time increased quite dramatically when I was processing entire records. I think it is related to the fact that Spark needs to sort the records based on reducer ID during the map phase of a shuffle if the hash shuffle manager is not used (> 200 reducers). There were cases when it helped but it seems too risky to do by default.

I will rework this approach to only pick a smaller split size to utilize all cluster slots.

@aokolnychyi
Copy link
Contributor Author

@rdblue, I've updated the approach after testing it on the cluster. Could you take another look?

@aokolnychyi aokolnychyi reopened this Jun 27, 2023
private long targetSplitSize() {
if (readConf().adaptiveSplitSizeEnabled()) {
long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
int parallelism = sparkContext().defaultParallelism();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the default parallelism instead of the shuffle partitions setting? Is this set correctly by default when using dynamic allocation? I've always used the shuffle partitions because that's more likely to be tuned correctly for a job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default parallelism is populated via TaskScheduler from SchedulerBackend:

  override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

The core count is being updated each time an executor is added/dropped so dynamic allocation should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be issues if the parallelism is set by a config but I doubt people actually set that. We need to know the number of slots in the cluster and this seems to be the closest. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another look and believe the current logic would perform better than the number of shuffle partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi We will use the spark.dynamicAllocation.initialExecutors * spark.executor.cores as the parallelism if dynamic resource allocation is enabled for a newly submitted application. The initial executors maybe is a small number (such as 2) when the application startup. Should this be a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core count is being updated each time an executor is added/dropped so dynamic allocation should work.

I don't think it would because the job may be planned before the initial stage is submitted and the cluster scales up. I think shuffle parallelism is the most reliable way to know how big to go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, I meant the core count would adjust once the cluster scales up. The initial job may not benefit from this. I wasn't sure whether that is a big deal given that acquiring new executors is generally slow.

I feel we should use the current core count if dynamic allocation is disabled (which we can check). When dynamic allocation is enabled, we can rely on the number of shuffle partitions or check the dynamic allocation config (e.g. we know the core count per each executor and the max number of executors). It seems the dynamic allocation config would give us a more precise estimate.

Thoughts, @rdblue @ConeyLiu?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should use the current core count if dynamic allocation is disabled (which we can check).

I agree with this. This should be easy to check and get the parallelism.

When dynamic allocation is enabled, we can rely on the number of shuffle partitions or check the dynamic allocation config (e.g. we know the core count per each executor and the max number of executors). It seems the dynamic allocation config would give us a more precise estimate.

From my option. I would be more likely to calculate the parallelism from the max number of executors. Because the number of shuffle partitions seems more like a parameter for the shuffle stage or reduce stage parallelism.

@@ -80,10 +80,18 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
this.branch = readConf.branch();
}

protected JavaSparkContext sparkContext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the parallelism a different way? What about exposing just the parallelism and not actually the conf or context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the method to SparkScan and neither sparkContext nor readConf are exposed now.

@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
return taskGroups;
}

private long targetSplitSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need this in other places as well? Or does everything go through SparkPartiitoningAwareScan now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check but I think so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place we miss is SparkChangelogScan as it plans tasks directly. We can update it later to plan files first or it will automatically inherit this functionality once we support adaptive split size in core.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this is a good change. I like that it is simpler than the other approach, which is limited by passing parallelism through the scan anyway. That makes the alternative require a lot more changes for not a lot of benefit.

My only issue with this is how it's plugged in. Seems like we should be using shuffle parallelism, I don't think I'd add a Spark SQL property, and I'd prefer if it were a little cleaner (not exposing sparkContext()). We also need to make sure this is applied everywhere, but I think this was just for demonstration not really to commit yet?

@aokolnychyi aokolnychyi merged commit 869301b into apache:master Jul 28, 2023
41 checks passed
@puchengy
Copy link
Contributor

@aokolnychyi is there a plan to port this to spark 3.2 ? thanks

@aokolnychyi
Copy link
Contributor Author

@puchengy, we can, we have to discuss the best way to determine the parallelism, though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants