Skip to content

[SPARK-31105][CORE]Respect sql execution id for FIFO scheduling mode#27871

Closed
liupc wants to merge 1 commit intoapache:masterfrom
liupc:SPARK-31105
Closed

[SPARK-31105][CORE]Respect sql execution id for FIFO scheduling mode#27871
liupc wants to merge 1 commit intoapache:masterfrom
liupc:SPARK-31105

Conversation

@liupc
Copy link

@liupc liupc commented Mar 11, 2020

What changes were proposed in this pull request?

Currently, spark will sort taskSets by jobId and stageId and then schedule them in order for FIFO schedulingMode. In OLAP senerios, especially under high concurrency, the taskSets are always from different sql queries and several jobs can be submitted for execution at one time for one query, especailly for adaptive execution. But now we order those taskSets without considering the execution group, which may causes the query being delayed.

So I propose to consider the sql execution id when scheduling jobs.

Why are the changes needed?

Improvements

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing UT & added UT

@dongjoon-hyun
Copy link
Member

ok to test

var priority = taskSet.priority
var stageId = taskSet.stageId
val priority = if (taskSet.properties != null) {
taskSet.properties.getProperty("spark.sql.execution.id", "0").toLong
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we has a similar one inside AppStatusListener already, this means another implicit dependency to sql module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, you may want to define private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" like AppStatusListener. Or, if we need this, we may need to define this at the more higher common place.

val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", FIFO, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FairSchedulableBuilder -> FIFOSchedulableBuilder.

@dongjoon-hyun
Copy link
Member

How do you think about this, @gatorsmile and @cloud-fan ?

@SparkQA
Copy link

SparkQA commented Mar 16, 2020

Test build #119893 has finished for PR 27871 at commit 001b234.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

IIUC Spark didn't optimize the workload toward the direction that the query delayed time should be minimized. Actually, I think scheduling Stages from the same sql execution would lead to a few nodes with the shuffle output files become very hot, thus lead to worse performance for the whole cluster compared to current approach.

@jiangxb1987
Copy link
Contributor

Also, please consider the case you submit a sql query that requires so many slots that it would block all the other queries from being executed. In the current approach, other small queries still get a chance to execute between two huge stages, after your change, every small queries need to wait until the first big query finishes.

@dongjoon-hyun
Copy link
Member

+1 for @jiangxb1987 's comments. BTW, I guessed that that's the main reason @liupc tried this at FIFO scheduler.

@cloud-fan
Copy link
Contributor

For small queries, usually they won't hit this problem. For big queries, the query latency shouldn't matter too much?

@liupc have you tried this in real-world workloads?

@dongjoon-hyun
Copy link
Member

Ping, @liupc . If there is no other reason, shall we close this PR according to the review comments?

@liupc
Copy link
Author

liupc commented Mar 20, 2020

For small queries, usually they won't hit this problem. For big queries, the query latency shouldn't matter too much?

@liupc have you tried this in real-world workloads?

Yes, we tried in real workloads, it does better especially there are lots of taskSets to be scheduled for one round scheduling. This is obvious for adaptive execution. Also, I think this is what FIFO should do.
usually queries may mapping to several jobs, if several jobs being delayed due to this reason, the total delay is obvious. Suppose each job duration would be 2 min, then if there are 10 jobs in front of the job and the cores is fully used. then due to this reason, it wait 20min to be scheduled.What's worse, in adaptive exeuction, when next batch of jobs being submitted, it may met this issue again, that may greatly harm the query duration. And each query may have the same issue and thus slow down them all.
Also, users will see lots of jobs running for later comming queries in SparkUI, that's confusing.

@liupc
Copy link
Author

liupc commented Mar 20, 2020

`

IIUC Spark didn't optimize the workload toward the direction that the query delayed time should be minimized. Actually, I think scheduling Stages from the same sql execution would lead to a few nodes with the shuffle output files become very hot, thus lead to worse performance for the whole cluster compared to current approach.

In real clusters, resources is more important than locality. And users expect the FIFO behave like this.

@dongjoon-hyun
Copy link
Member

Hi, @liupc .

  • Currently, Core module's FIFO concept doesn't mean SQL level execution. The following is not Apache Spark's contract.

    And users expect the FIFO behave like this.

  • Also, this is a trade-off. And, in general, this hurts the global throughput (as @jiangxb1987 mentioned already). Apache Spark cannot accept that kind of general performance degradation for those rare use cases. Please note that not every users are SQL users.

  • Lastly, I'm -1 for the current architectural design which makes a dependency from core module's TaskSetManager to the external sql module.

Given that, this PR will not considered as a mergeable PR. If you want to proceed, please split the scopes. First, you may need to focus on adding a new option to respect job level priority in the core module first. The configuration should be false by default. After the suggestion is accepted into master branch, you may need to make a second PR to add another option for the whole SQL Query optimization.

Thanks!

@liupc
Copy link
Author

liupc commented Mar 23, 2020

Thanks @dongjoon-hyun , let's spill the scopes, and add an option to respect jobGroup level priority in the core module.
And I think even in current approach, the congestion issue is serious, so this PR is not about to solve it, but I proposed another PR for this: #27862
I really think this is helpful for OLAP senarios, and we test this in real workloads in xiaomi.

@dongjoon-hyun
Copy link
Member

Thank you, @liupc . I hope Apache Spark can improve your use cases in your enviornment in any way. Please move forward. I'll close this PR (AS-IS). You can reopen this later after the core module is ready independently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants

Comments