-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28487][k8s] More responsive dynamic allocation with K8S. #25236
Conversation
This change implements a few changes to the k8s pod allocator so that it behaves a little better when dynamic allocation is on. (i) Allow the application to ramp up immediately when there's a change in the target number of executors. Without this change, scaling would only trigger when a change happened in the state of the cluster, e.g. an executor going down, or when the periodical snapshot was taken (default every 30s). (ii) Get rid of pending pod requests, both acknowledged (i.e. Spark knows that a pod is pending resource allocation) and unacknowledged (i.e. Spark has requested the pod but the API server hasn't created it yet), when they're not needed anymore. This avoids starting those executors to just remove them after the idle timeout, wasting resources in the meantime. (iii) Re-work some of the code to avoid unnecessary logging. While not bad without dynamic allocation, the existing logging was very chatty when dynamic allocation was on. With the changes, all the useful information is still there, but only when interesting changes happen. (iv) Gracefully shut down executors when they become idle. Just deleting the pod causes a lot of ugly logs to show up, so it's better to ask pods to exit nicely. That also allows Spark to respect the "don't delete pods" option when dynamic allocation is on. Tested on a small k8s cluster running different TPC-DS workloads.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #108062 has finished for PR 25236 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #108066 has finished for PR 25236 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #108113 has finished for PR 25236 at commit
|
Retest this please. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #108148 has finished for PR 25236 at commit
|
executorService.schedule(killTask, conf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD), | ||
TimeUnit.MILLISECONDS) | ||
|
||
// Return an immediate success, since we can't confirm or deny that executors have bee |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bee -> been?
} | ||
} | ||
|
||
// Update the flag that helps the setTotalExpectedExecutors() callback avoid trigerring this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trigerring -> trigering
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #108415 has finished for PR 25236 at commit
|
lastSnapshot = snapshots.last | ||
} | ||
|
||
val currentRunningExecutors = lastSnapshot.executorPods.values.count { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: on a quick read, the naming of these variables is a bit confusing on whether its a list of execs or just a count -- would be nice to have the counts consistently use ...Count
or num...
.withField("status.phase", "Pending") | ||
.withLabel(SPARK_APP_ID_LABEL, applicationId) | ||
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) | ||
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does sorting matter here? don't see it mentioned on the k8s api, and you're not doing it above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally it doesn't matter, but it matters when using mocks in the tests (since this is a varargs call, not a parameter that takes a Set
).
|
||
// Return an immediate success, since we can't confirm or deny that executors have been | ||
// actually shut down without waiting too long and blocking the allocation thread. | ||
Future.successful(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems bad. If we get the response wrong, then the ExecutorAllocationManager will mistakenly update its internal state to think the executors have been removed, when they haven't been:
spark/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Lines 448 to 455 in b29829e
val executorsRemoved = if (testing) { | |
executorIdsToBeRemoved | |
} else { | |
// We don't want to change our target number of executors, because we already did that | |
// when the task backlog decreased. | |
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, | |
countFailures = false, force = false) | |
} |
which means we're expecting that call to kubernetes to delete the pods to be foolproof.
Why is it so bad to wait here? Is it because we are holding locks when making this call in CoarseGrainedSchedulerBackend? could that be avoided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a longer comment explaining this.
The gist is:
- it's bad to wait because it blocks the EAM thread (in this case for a really long time)
- it's ok to return "true" because these executors will all die eventually, whether because of the shutdown message or because of the explicit kill.
The return value, to the best of my understanding, is not meant to say "yes all executors have been killed", but rather "an attempt has been made to remove all of these executors, and they'll die eventually".
(Otherwise there would be no need for the EAM to track which executors are pending removal, since it would know immediately from this return value.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks, I buy that explanation
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #108678 has finished for PR 25236 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, though I would really like somebody more familiar w/ k8s integration to take a look as well
|
||
// Return an immediate success, since we can't confirm or deny that executors have been | ||
// actually shut down without waiting too long and blocking the allocation thread. | ||
Future.successful(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks, I buy that explanation
Let's see if @mccheah has anything to add, otherwise I'll end up pushing before EOW. |
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #109050 has finished for PR 25236 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #109062 has finished for PR 25236 at commit
|
Merging to master. |
@@ -330,6 +330,12 @@ private[spark] object Config extends Logging { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD = | |||
ConfigBuilder("spark.kubernetes.dynamicAllocation.deleteGracePeriod") | |||
.doc("How long to wait for executors to shut down gracefully before a forceful kill.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin Does it only work for dynamicAllocation mode? is there any way to delete executors with Grace Period for non dynamic allocation mode?
This change implements a few changes to the k8s pod allocator so
that it behaves a little better when dynamic allocation is on.
(i) Allow the application to ramp up immediately when there's a
change in the target number of executors. Without this change,
scaling would only trigger when a change happened in the state of
the cluster, e.g. an executor going down, or when the periodical
snapshot was taken (default every 30s).
(ii) Get rid of pending pod requests, both acknowledged (i.e. Spark
knows that a pod is pending resource allocation) and unacknowledged
(i.e. Spark has requested the pod but the API server hasn't created it
yet), when they're not needed anymore. This avoids starting those
executors to just remove them after the idle timeout, wasting resources
in the meantime.
(iii) Re-work some of the code to avoid unnecessary logging. While not
bad without dynamic allocation, the existing logging was very chatty
when dynamic allocation was on. With the changes, all the useful
information is still there, but only when interesting changes happen.
(iv) Gracefully shut down executors when they become idle. Just deleting
the pod causes a lot of ugly logs to show up, so it's better to ask pods
to exit nicely. That also allows Spark to respect the "don't delete
pods" option when dynamic allocation is on.
Tested on a small k8s cluster running different TPC-DS workloads.