Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23365][CORE] Do not adjust num executors when killing idle executors. #20604

Closed
wants to merge 5 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Feb 13, 2018

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name replace was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to killExecutors, so I adjusted that name. Also separated out the logic of countFailures as you don't always want that tied to replace.

While I was there I made two changes that weren't directly related to this:

  1. Fixed countFailures in a couple cases where it was getting an incorrect value since it used to be tied to replace, eg. when killing executors on a blacklisted node.
  2. hard error if you call sc.killExecutors with dynamic allocation on, since that's another way the ExecutorAllocationManager and the CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to ExecutorAllocationClient do not adjust the number of executors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The change is more than just flipping the value of `replace` because
`replace` also implied failure handling.  Furthermore, the name
`replace` was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to `killExecutors`.
@SparkQA
Copy link

SparkQA commented Feb 14, 2018

Test build #87429 has finished for PR 20604 at commit b5a39da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito squito changed the title [WIP][SPARK-23365][CORE] Do not adjust num executors when killing idle executors. [SPARK-23365][CORE] Do not adjust num executors when killing idle executors. Feb 15, 2018
@SparkQA
Copy link

SparkQA commented Feb 16, 2018

Test build #87496 has finished for PR 20604 at commit 4d0b52e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Feb 16, 2018

@tgravescs @vanzin @zsxwing could you take a look? thanks

@@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager(

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
// We lower the target number of executors but don't actively kill any yet. We do this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this comment.

From my reading of it, it's saying that you don't want to kill executors because you don't want to immediately get a new one to replace it. But how can that happen, if you're also lowering the target number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to answer a different question -- if we don't kill the executor now, why even bother lowering the target number? as that would be an alternative solution -- don't adjust the target number here at all, just wait until you kill the executors for being idle. (and really I'm just guessing at the logic.)

lemme try to reword this some ...

client.killExecutors(executorIdsToBeRemoved)
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased. Normally there wouldn't be any tasks running on these
// executors, but maybe the scheduler *just* decided to run a task there -- in that case,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also don't follow this part. If force = false, doesn't that mean the executor won't be killed if tasks are running on it? So wouldn't countFailures be meaningless in that context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I didn't look closely enough at the semantics of force

@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging {
def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
require(executorAllocationManager.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a developer api, so probably ok, but this is a change in behavior. Is it just not possible to support this with dynamic allocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would calling this mean with dynamic allocation on? Note this api explicitly says its meant to adjust resource usage downwards. If you've got just one executor, and then you kill it, should your app sit with 0 executors? Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? I can't think of useful clear semantics for this

(though this is not necessary to fix the bug, I could pull this out and move to a discussion in a new jira)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you'd use this with dynamic allocation, but it's been possible in the past. It's probably ok to change this though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @squito , I'm quite questioned about the cases:

If you've got just one executor, and then you kill it, should your app sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to ExecutorAllocationManager increases target number of executors. So, app will not always sit with 0 executors.

Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up?

for this case, to be honest, I really do not get your point. But, it must blame my poor English.

And, what will happens if we use this method without ExecutorAllocationManager ? Or do we really need adjust TargetNumExecutors (set adjustTargetNumExecutors = true below) if we are not using ExecutorAllocationManager ?

see these several lines in killExecutors():

if (adjustTargetNumExecutors) {
  requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
  ...
  doRequestTotalExecutors(requestedTotalExecutors)
}

Set adjustTargetNumExecutors = true will change requestedTotalExecutors . And IIUC, requestedTotalExecutors is only used in dynamic allocation mode. So, if we are not using ExecutorAllocationManager , allocation client will request requestedTotalExecutors = 0 number of executors to cluster manager (this is really terrible). But, actually, app without ExecutorAllocationManager do not have a limit requesting executors (in default).

Actually, I think this series methods, including killAndReplaceExecutor , requestExecutors, etc, are designed with dynamic allocation mode. And if we still want use these methods while app do not use ExecutorAllocationManager, we should not change requestedTotalExecutors , or even not request cluster manager with a specific number.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point in general is that the semantics of combining SparkContext.killExecutors() (which is a publicly visible function, which the end user can call) with dynamic allocation aren't well defined, and I have no idea what the behavior really should be. I was giving some examples of weird behavior.

If you've got just one executor, and then you kill it, should your app sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to ExecutorAllocationManager increases target number of executors. So, app will not always sit with 0 executors.

Thats true -- but only when pending tasks increase. But if you've got 0 executors, how do you expect pending tasks to increase? That would only happen when another taskset gets submitted, but with no executors your spark program will probably just be blocked.

In the other case, I'm just trying to point out strange interactions between user control and dynamic allocation control. Imagine this sequence:

Dynamic Allocation: 1000 tasks, so 1000 executors
User: I only want 10 executors, so let me tell spark to kill 990 of them
...
... another taskset is submitted to add 1 more task ...
Dynamic Allocation: 1001 tasks, so 1001 executors
User: ??? I set the target to 10 executors, what happened?

So, if we are not using ExecutorAllocationManager, allocation client will request requestedTotalExecutors = 0 number of executors to cluster manager (this is really terrible)

hmm, from a quick look, I think you're right. it doesn't seem that using sc.killExecutors() doesn't make sense even with dynamic allocation off. I think CoarseGrainedSchedulerBackend should actually initiliaze requestedTotalExecutors with SchedulerBackendUtils.getInitialTargetExecutorNumber

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @squito , thanks for your reply.

but only when pending tasks increase.

ExecutorAllocationManager will check pending (or backlog) tasks periodically. So, we do not have to wait for increment actually.

And for Dynamic Allocation & User case, yeah, that's hard to define.

And I checked SchedulerBackendUtils.getInitialTargetExecutorNumbe, it set DEFAULT_NUMBER_EXECUTORS = 2. But, this is not consistent with Master, which set executorLimit to Int.MaxValue if we are not under dynamic allocation mode. Maybe we can just init requestedTotalExecutors with Int.MaxValue(only when we are not under dynamic allocation mode).
Or, we do not call doRequestTotalExecutors if we call requestExecutors or killExecutors, except requestTotalExecutors(only when we are not under dynamic allocation mode).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito any thoughts?

@SparkQA
Copy link

SparkQA commented Feb 21, 2018

Test build #87567 has finished for PR 20604 at commit e69851b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Feb 21, 2018

Jenkins, retest this please

@squito
Copy link
Contributor Author

squito commented Feb 21, 2018

known flaky test: https://issues.apache.org/jira/browse/SPARK-23458

@SparkQA
Copy link

SparkQA commented Feb 21, 2018

Test build #87589 has finished for PR 20604 at commit e69851b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
// We lower the target number of executors but don't actively kill any yet. Killing is
// controlled separately by an idle timeout. Its still helpful to reduce the target number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's

* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been killed
* @param countFailures if there are tasks running on the executors when they are killed, whether
* those failures be counted to task failure limits?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "whether to count those failures toward task failure limits"

// controlled separately by an idle timeout. Its still helpful to reduce the target number
// in case an executor just happens to get lost (eg., bad hardware, or the cluster manager
// preempts it) -- in that case, there is no point in trying to immediately get a new
// executor, since we couldn't even use it yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/couldn't/wouldn't

* @param replace whether to replace the killed executors with new ones, default false
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been killed
* @param countFailures if there are tasks running on the executors when they are killed, whether
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a little confused about this parameter.

If force = false, it's a no op. And all call sites I've seen seem to set this parameter to false. So is there something I'm missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, I was supposed to set countFailures = true in sc.killAndReplaceExecutors, thanks for catching that.

@SparkQA
Copy link

SparkQA commented Feb 24, 2018

Test build #87635 has finished for PR 20604 at commit 35314cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Feb 24, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 24, 2018

Test build #87639 has finished for PR 20604 at commit 35314cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: space before }

@@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: space before }

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87722 has finished for PR 20604 at commit cf7ac86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Feb 27, 2018

Merging to master / 2.3.

asfgit pushed a commit that referenced this pull request Feb 27, 2018
…cutors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name `replace` was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to `killExecutors`, so I adjusted that name.  Also separated out the logic of `countFailures` as you don't always want that tied to `replace`.

While I was there I made two changes that weren't directly related to this:
1) Fixed `countFailures` in a couple cases where it was getting an incorrect value since it used to be tied to `replace`, eg. when killing executors on a blacklisted node.
2) hard error if you call `sc.killExecutors` with dynamic allocation on, since that's another way the ExecutorAllocationManager and the CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to ExecutorAllocationClient do not adjust the number of executors.

Author: Imran Rashid <irashid@cloudera.com>

Closes #20604 from squito/SPARK-23365.

(cherry picked from commit ecb8b38)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@asfgit asfgit closed this in ecb8b38 Feb 27, 2018
@squito
Copy link
Contributor Author

squito commented Mar 22, 2018

@vanzin @sitalkedia @jiangxb1987 I was looking at this code again, and I'd appreciate your thoughts on how this relates to SPARK-21834 #19081

I actually think that SPARK-21834 probably solves the bug I was describing initially. I hit the bug on 2.2.0, and didn't properly understand the change of SPARK-21834 when proposing this change. Nonetheless, I still think this fix is a good one -- it improves code clarity in general and fixes a couple other minor cases. I'd also link the issues in jira etc. so the relationship is more clear.

I'd go even further and suggest that with this fix in, we can actually remove SPARK-21834, as its no longer necessary. its not harmful, but its just confusing.

thoughts?

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A late review comment with individual understanding.

@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging {
def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
require(executorAllocationManager.isEmpty,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @squito , I'm quite questioned about the cases:

If you've got just one executor, and then you kill it, should your app sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to ExecutorAllocationManager increases target number of executors. So, app will not always sit with 0 executors.

Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up?

for this case, to be honest, I really do not get your point. But, it must blame my poor English.

And, what will happens if we use this method without ExecutorAllocationManager ? Or do we really need adjust TargetNumExecutors (set adjustTargetNumExecutors = true below) if we are not using ExecutorAllocationManager ?

see these several lines in killExecutors():

if (adjustTargetNumExecutors) {
  requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
  ...
  doRequestTotalExecutors(requestedTotalExecutors)
}

Set adjustTargetNumExecutors = true will change requestedTotalExecutors . And IIUC, requestedTotalExecutors is only used in dynamic allocation mode. So, if we are not using ExecutorAllocationManager , allocation client will request requestedTotalExecutors = 0 number of executors to cluster manager (this is really terrible). But, actually, app without ExecutorAllocationManager do not have a limit requesting executors (in default).

Actually, I think this series methods, including killAndReplaceExecutor , requestExecutors, etc, are designed with dynamic allocation mode. And if we still want use these methods while app do not use ExecutorAllocationManager, we should not change requestedTotalExecutors , or even not request cluster manager with a specific number.

WDYT?

@Ngone51
Copy link
Member

Ngone51 commented Apr 17, 2018

I'd go even further and suggest that with this fix in, we can actually remove SPARK-21834, as its no longer necessary.

Yes, otherwise, this PR's work is meaningless.

@Ngone51
Copy link
Member

Ngone51 commented Apr 25, 2018

ping @squito

1 similar comment
@Ngone51
Copy link
Member

Ngone51 commented Apr 27, 2018

ping @squito

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…cutors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name `replace` was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to `killExecutors`, so I adjusted that name.  Also separated out the logic of `countFailures` as you don't always want that tied to `replace`.

While I was there I made two changes that weren't directly related to this:
1) Fixed `countFailures` in a couple cases where it was getting an incorrect value since it used to be tied to `replace`, eg. when killing executors on a blacklisted node.
2) hard error if you call `sc.killExecutors` with dynamic allocation on, since that's another way the ExecutorAllocationManager and the CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to ExecutorAllocationClient do not adjust the number of executors.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20604 from squito/SPARK-23365.

(cherry picked from commit ecb8b38)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…cutors.

Ref: LIHADOOP-52212

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The name `replace` was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to `killExecutors`, so I adjusted that name.  Also separated out the logic of `countFailures` as you don't always want that tied to `replace`.

While I was there I made two changes that weren't directly related to this:
1) Fixed `countFailures` in a couple cases where it was getting an incorrect value since it used to be tied to `replace`, eg. when killing executors on a blacklisted node.
2) hard error if you call `sc.killExecutors` with dynamic allocation on, since that's another way the ExecutorAllocationManager and the CoarseGrainedSchedulerBackend would get out of sync.

Added a unit test case which verifies that the calls to ExecutorAllocationClient do not adjust the number of executors.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20604 from squito/SPARK-23365.

(cherry picked from commit ecb8b38)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

RB=2012562
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants