Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37528][SQL][CORE] Schedule Tasks By Input Size #34791

Closed
wants to merge 9 commits into from

Conversation

ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Dec 3, 2021

What changes were proposed in this pull request?

In general, the larger input data size means longer running time. So ideally, we can let DAGScheduler submit bigger input size task first. It can reduce the whole stage running time.

design doc

this pr add two cases as initialization implementation:

  • datasourcev1 file scan for leaf node
  • coalesce partition spec for AQE

Why are the changes needed?

For example, we have one stage with 4 tasks and the defaultParallelism is 2 and the 4 tasks have different running time [1s, 3s, 2s, 4s].

  • in normal, the running time of the stage is: 7s
    • 1, 2
    • 3, 4
  • if big task first, the running time of the stage is: 5s
    • 4, 1
    • 3, 2

In worse, if we have a skewed task set [1s, 3s, 3s, 7s, 7s, 20s] with the 2 defaultParallelism:

  • in normal, the running time of the stage is: 31s
    • 1, 3, 7, 20
    • 3, 3, 7
  • if big task first, the running time of the stage is: 21s
    • 20, 1
    • 7, 7, 3, 3

Does this PR introduce any user-facing change?

yes, a new config spark.scheduler.sortTasksByInputSize.enabled at core module to decide if we allow to sort tasks.

How was this patch tested?

Add test in:

  • org.apache.spark.scheduler.DAGSchedulerSuite
  • org.apache.spark.sql.execution.ScheduleTasksByInputSizeSuiteBase

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50359/

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50359/

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Test build #145884 has finished for PR 34791 at commit 2ccebe0.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50372/

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50372/

@SparkQA
Copy link

SparkQA commented Dec 3, 2021

Test build #145897 has finished for PR 34791 at commit 2ccebe0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor

The idea looks reasonable.
If this functionality can only be used in SQL with AQE enabled, what about just making it a AQEShuffleReadRule which insert/update AQEShuffleReadExecs with reordered ShufflePartitionSpecs?

cc @cloud-fan

@cloud-fan
Copy link
Contributor

The general idea is to make tasks report more statistics so that the task scheduler can schedule them better. This is really a big feature and I'm a bit hesitant to merge any partial improvements without an overall design.

@mridulm
Copy link
Contributor

mridulm commented Feb 10, 2022

Let us hold off on this until #35185 has been merged - else users will have no way to identify partitions in a stage.

@mridulm
Copy link
Contributor

mridulm commented Feb 10, 2022

Btw, this is weak precedence assuming all tasks match the same locality (or no locality) - we should word the config documentation appropriately.

@ulysses-you
Copy link
Contributor Author

Btw, this is weak precedence assuming all tasks match the same locality (or no locality) - we should word the config documentation appropriately.

it's ture, It doest not affect if the tasks have different locality. It just try the best to make larger task run first without breaking the exists task scheduling.

@ulysses-you ulysses-you changed the title [SPARK-37528][SQL][CORE] Support reorder tasks during scheduling by shuffle partition size in AQE [SPARK-37528][SQL][CORE] Schedule Tasks By Input Size Apr 1, 2022
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 29, 2022
@github-actions github-actions bot closed this Sep 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants