Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31949][SQL] Add spark.default.parallelism in SQLConf for isolated across session #28778

Closed
wants to merge 5 commits into from

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Jun 10, 2020

What changes were proposed in this pull request?

Add new config spark.sql.default.parallelism, the behavior is same as spark.default.parallelism.

Why are the changes needed?

For session isolated.
In concurrent scene, we need to determined parallelism session by session.

One case:
Multi sql running in SparkContext, we should split file into partition more carefully that avoid one sql use the total parallelism.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add UT.

@ulysses-you
Copy link
Contributor Author

@maropu @cloud-fan thanks for review.

@ulysses-you
Copy link
Contributor Author

If the config is needed, then move the old defaultParallelism which in sql module.

* @since 3.1.0
*/
def defaultParallelism: Int = {
sessionState.conf.defaultParallelism.getOrElse(sparkContext.defaultParallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we add a config, whose only usage is to let users get the config value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above. If add this config, I will move the exists defaultParallelism which in sql module follow up. e.g. FilePartition.maxSplitBytes()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just do this in this pr ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do, otherwise it's a useless config

@maropu
Copy link
Member

maropu commented Jun 10, 2020

Yea, having a sessein-local default parallelism param in the SQL side looks fine to me. But, as @cloud-fan said above, you need more work for applying the param into the exising SQL logics.

@ulysses-you
Copy link
Contributor Author

@cloud-fan @maropu
I searched everywhere in sql module, please tell me if I missed somewhere.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you add some tests?

@@ -371,6 +371,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val DEFAULT_PARALLELISM = buildConf("spark.sql.default.parallelism")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.default.parallelism -> spark.sql.sessionLocalDefaultParallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em.. is it better to keep similar with spark.default.parallelism? so we can set this config easy. sessionLocalDefaultParallelism seems complex.

@@ -371,6 +371,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val DEFAULT_PARALLELISM = buildConf("spark.sql.default.parallelism")
.doc("This config behavior is same as spark.default.parallelism, and this value can be " +
"isolated across sessions. Note: always use sc.defaultParallelism as default number.")
Copy link
Member

@maropu maropu Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

The session-local default number of partitions and and this value is widely used inside physical plans.
If not set, the physical plans refer to `spark.default.parallelism` instead.

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123746 has finished for PR 28778 at commit 5c6d661.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123736 has finished for PR 28778 at commit 35b92c2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123752 has finished for PR 28778 at commit 27e113b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123758 has finished for PR 28778 at commit 55d367f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* @since 3.1.0
*/
def defaultParallelism: Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to not have this API, as SparkSession should provide high-level logical APIs, not physical ones.

@cloud-fan
Copy link
Contributor

After more thoughts, I'm wondering what's the real use case of it.

The default parallelism depends on the cluster resources, and it looks weird if different sessions can have different default parallelism.

Looking at the changes in this PR, I think most of them don't really need a per-session config to tune it. The only place looks reasonable is where we split file partitions. Maybe we can just add a new config to do fine-grained control of the file partition splitting?

@ulysses-you
Copy link
Contributor Author

Actually, my first thought is the file partitions split and I tried to add another config to control it, like filesDefaultParallelism. After that I was thinking is it a special config ?It means that the config just control file parallelism. Parallelism in spark is everywhere, and in sql scene, file is just the first-class citizens. Finally I change to add a common config same as the spark.default.parallelism.

I also not see the reasonable case without file partition split now. If you think it's not needed, I'm OK.

@cloud-fan
Copy link
Contributor

The most confusing part is, default parallelism is more like a physical stuff (related to cluster resource), and it's weird to have a per session setting for it.

@ulysses-you
Copy link
Contributor Author

ulysses-you commented Jun 12, 2020

How about defaultSessionParallelism or sessionDefaultParallelism ? Sounds like more logical.

@ulysses-you
Copy link
Contributor Author

Any new thought ? @maropu @cloud-fan

Also cc @HyukjinKwon @dongjoon-hyun @viirya

@cloud-fan
Copy link
Contributor

Parallelism is a physical concept already. Can you explain more about how you are going to tune the file partition split? what are the problems you hit?

@ulysses-you
Copy link
Contributor Author

Yeah, parallelism is a physical concept, but it is also shared among sessions.

I used a long-lived Spark application with enough core and memory (means defaultParallelism is big), then sqls can be executed in parallel and shared the resource.

Some sql which query on hive table contains small files, as a result one sql may hold the total task resource. Then I tried to increase file size in each partition to reduce the partition number so that other sql can be assigned more tasks. But what I can do is reduce the defaultParallelism, and the change will affect all sql.

As said above, I think Spark need to provide a behavior that can control every sql/session parallelism (in this case is file parallelism) so that user can reduce the parallelism if one sql query on small files.

@cloud-fan
Copy link
Contributor

After more thoughts, I think the file partitions split logic itself is problematic. Its target is to make the number of partitions the same as the total number of cores, which doesn't make sense as the cluster may only have a few free cores.

I think a proper way is to set an expected size of each partition, like 64mb. This is also what we do when coalescing shuffle partitions in AQE. Can we add such a config?

@ulysses-you
Copy link
Contributor Author

Actually, AQE CoalesceShufflePartitions also use the defaultParallelism to detect size. List the key code.

val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
          .getOrElse(session.sparkContext.defaultParallelism)
...

val maxTargetSize = math.max(
      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
val targetSize = math.min(maxTargetSize, advisoryTargetSize)

In other words COALESCE_PARTITIONS_MIN_PARTITION_NUM provide a way to change the defaultParallelism and this behavior just like the sessionDefaultParallelism what we discuss.

In file split, we already have some config maxPartitionBytes and openCostInBytes. With the correct parallelism, it would work well. It's no need to add an another config to control partition size, and if do that we will never use the total core resource in spark application.

BTW The file partition split algorithm is similar between CoalesceShufflePartitions and FilePartition, advisoryTargetSize just like the maxPartitionBytes, and we can try to add a openCostInBytes in AQE CoalesceShufflePartitions in future after performance check.

I still think it's needed to control parallelism in session. At least, we should add a config to control file parallelism.

@cloud-fan
Copy link
Contributor

So seems we just need to add a min-partition-num config for file source?

@ulysses-you
Copy link
Contributor Author

Yas, it's a way.

@ulysses-you
Copy link
Contributor Author

@cloud-fan
I will send an another pr later to do that add a min-partition-num config for file source.

dongjoon-hyun pushed a commit that referenced this pull request Jun 21, 2020
### What changes were proposed in this pull request?

Add a new config `spark.sql.files.minPartitionNum` to control file split partition in local session.

### Why are the changes needed?

Aims to control file split partitions in session level.
More details see discuss in [PR-28778](#28778).

### Does this PR introduce _any_ user-facing change?

Yes, new config.

### How was this patch tested?

Add UT.

Closes #28853 from ulysses-you/SPARK-32019.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants