-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47458][CORE] Fix the problem with calculating the maximum concurrent tasks for the barrier stage #45528
Conversation
Hi @tgravescs, @Ngone51, Could you help review this PR? Thx |
// Ensure all executors have been launched. | ||
assert(sc.getExecutorIds().length == 1) | ||
} | ||
// Each executor can only launch one task since `spark.task.cpus` is 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the comment wrong.. above sets CPUS_PER_TASK to 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I copied it from somewhere and forgot to delete it. My bad. Fixed in the new commit. Thx
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
so I just noticed this was linked to already closed issue - SPARK-45527, personally I think this should be separate issue as a bug referencing that issue, can you please file a new issue and update description,etc. |
TAMhesabhay har sHAkhs ro fagh baYAd az hesab khoDEshon bashe va KEsi
ejazeh bardASht nadareh.va GHoshi va hesAB shakh si va fEZiki hast.
ShahrzadMahro
در تاریخ جمعه ۱۵ مارس ۲۰۲۴، ۰۶:۱۰ Bobby Wang ***@***.***>
نوشت:
… What changes were proposed in this pull request?
This PR addresses the problem of calculating the maximum concurrent tasks
while evaluating the number of slots for barrier stages, specifically for
the case when the task resource amount is greater than 1.
Why are the changes needed?
test("problem of calculating the maximum concurrent task") {
withTempDir { dir =>
val discoveryScript = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
val conf = new SparkConf()
// Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
.setMaster("local-cluster[1, 6, 1024]")
.setAppName("test-cluster")
.set(WORKER_GPU_ID.amountConf, "4")
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(EXECUTOR_GPU_ID.amountConf, "4")
.set(TASK_GPU_ID.amountConf, "2")
// disable barrier stage retry to fail the application as soon as possible
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 1, 60000)
// Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
// Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
// can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
assert(sc.parallelize(Range(1, 10), 2)
.barrier()
.mapPartitions { iter => iter }
.collect() sameElements Range(1, 10).toArray[Int])
}
}
In the described test scenario, the executor has 6 CPU cores and 4 GPUs,
and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum
number of concurrent tasks should be 2. However, the issue arises when
attempting to launch the subsequent 2 barrier tasks, as the
'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task
limit that is 1 instead of 2. The bug needs to be fixed.
Does this PR introduce *any* user-facing change?
No
How was this patch tested?
The existing and newly added unit tests should pass
Was this patch authored or co-authored using generative AI tooling?
No
------------------------------
You can view, comment on, or merge this pull request online at:
#45528
Commit Summary
- c9cebda
<c9cebda>
fix bug to calculate max concurrent number
File Changes
(5 files <https://github.com/apache/spark/pull/45528/files>)
- *M*
core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
<https://github.com/apache/spark/pull/45528/files#diff-2bb5245e65acbbec81b84431183cd78ab221d881ca5eae4476b95c8d1b5ec10b>
(15)
- *M*
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
<https://github.com/apache/spark/pull/45528/files#diff-9877d4af98ed5226c370732243e409ec50467b3d5d5f10395275f740403b1d7d>
(6)
- *M*
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
<https://github.com/apache/spark/pull/45528/files#diff-47d45f7f9e3eed57893ae5441eda5a1e3874941559b7e12546bccac70b0ae5ba>
(3)
- *M*
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
<https://github.com/apache/spark/pull/45528/files#diff-e0f4abe2cf43afe189c820d2a0d1e3d5e465a3fe6698473a85d3897fac6b1683>
(40)
- *M*
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
<https://github.com/apache/spark/pull/45528/files#diff-9c73792f401d6969f0a4c33839f22d37354a4ee4e33020287bb4b3c2973fd4ec>
(51)
Patch Links:
- https://github.com/apache/spark/pull/45528.patch
- https://github.com/apache/spark/pull/45528.diff
—
Reply to this email directly, view it on GitHub
<#45528>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ANEZZRAYL2ZXP2472SFTDUDYYJNSFAVCNFSM6AAAAABEXHFI3OVHI2DSMVQWIX3LMV43ASLTON2WKOZSGE4DONRQGY3TMNA>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
Done. Thx |
merged to master, thanks. |
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add end-to-end tests ?
i.e. starting a spark job in local-cluster mode, and check task numbers and GPUs allocated to each spark tasks .
We can test the following typical cases:
spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=1
spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=2
spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=4
spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=1
spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=1
spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=2
spark worker cpus=4, spark worker gpus=4, task-cpus = 2, task-gpus=2
spark worker cpus=4, spark worker gpus=4, task-cpus = 4, task-gpus=4
spark worker cpus=4, spark worker gpus=4, task-cpus = 1, task-gpus=3
spark worker cpus=4, spark worker gpus=4, task-cpus = 3, task-gpus=1
…urrent tasks for the barrier stage ### What changes were proposed in this pull request? This PR addresses the problem of calculating the maximum concurrent tasks while evaluating the number of slots for barrier stages, specifically for the case when the task resource amount is greater than 1. ### Why are the changes needed? ``` scala test("problem of calculating the maximum concurrent task") { withTempDir { dir => val discoveryScript = createTempScriptWithExpectedOutput( dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""") val conf = new SparkConf() // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU. .setMaster("local-cluster[1, 6, 1024]") .setAppName("test-cluster") .set(WORKER_GPU_ID.amountConf, "4") .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) .set(EXECUTOR_GPU_ID.amountConf, "4") .set(TASK_GPU_ID.amountConf, "2") // disable barrier stage retry to fail the application as soon as possible .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1) sc = new SparkContext(conf) TestUtils.waitUntilExecutorsUp(sc, 1, 60000) // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU. // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total. assert(sc.parallelize(Range(1, 10), 2) .barrier() .mapPartitions { iter => iter } .collect() sameElements Range(1, 10).toArray[Int]) } } ``` In the described test scenario, the executor has 6 CPU cores and 4 GPUs, and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum number of concurrent tasks should be 2. However, the issue arises when attempting to launch the subsequent 2 barrier tasks, as the 'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task limit that is 1 instead of 2. The bug needs to be fixed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing and newly added unit tests should pass ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45528 from wbo4958/2-gpu. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
…rding to different cpu and gpu configurations ### What changes were proposed in this pull request? Add an end-to-end unit test to ensure that the number of tasks is calculated correctly according to the different task CPU amound and task GPU amount. ### Why are the changes needed? To increase the test coverage. More details can be found at #45528 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The CI can pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45794 from wbo4958/end2end-test. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
What changes were proposed in this pull request?
This PR addresses the problem of calculating the maximum concurrent tasks while evaluating the number of slots for barrier stages, specifically for the case when the task resource amount is greater than 1.
Why are the changes needed?
In the described test scenario, the executor has 6 CPU cores and 4 GPUs, and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum number of concurrent tasks should be 2. However, the issue arises when attempting to launch the subsequent 2 barrier tasks, as the 'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task limit that is 1 instead of 2. The bug needs to be fixed.
Does this PR introduce any user-facing change?
No
How was this patch tested?
The existing and newly added unit tests should pass
Was this patch authored or co-authored using generative AI tooling?
No