Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors #7888

Closed
wants to merge 22 commits into from

Conversation

GraceH
Copy link
Contributor

@GraceH GraceH commented Aug 3, 2015

By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized.

For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time.

  1. the timer expiration starts before the listener event arrives.
  2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally.

Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation.

For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking.

@GraceH GraceH changed the title Add force control for killExecutors to avoid false killing for those busy executors [SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors Aug 3, 2015
@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #39524 has finished for PR 7888 at commit 4acbd79.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

initializing = false
removeExecutor(executorId)
expired = removeExecutor(executorId)
if (expired) initializing = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without commenting on the validity of the change, you have some style problems, like this needing to be in a block in braces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done the style check before committing. sorry for missing the if block here. I will fix that.

@CodingCat
Copy link
Contributor

@GraceH , from the patch, I didn't see how the user to pass force flag when calling API? killExecutors is a function in private[spark] class, i.e. it is not supposed to be touched by the end user

@GraceH
Copy link
Contributor Author

GraceH commented Aug 4, 2015

@CodingCat What I mean is to add the force control in the killExecutors API. Currently, the dynamic allocation is using that API with force=false (I suppose we should not kill working executors in Dynamic allocation). And for others, they are free to use that option as true or false. If they really want to do that, they can call the private API by setting that as true.

Regarding the public API for the users, we'd better have a discussion if to add a new public API (it is a little bit out of this PR's scope). From my perspective, to modify the exiting public API is not a good idea. It may cause compatibility issue. What do you think?

final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
final def killExecutors(executorIds: Seq[String],
replace: Boolean,
force: Boolean): Boolean = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style for multiline method defs is each arg on its own line, double indented, so:

final def killExecutors(
    executorIds: Seq[String],
    replace: Boolean,
    force: Boolean): Boolean = synchronized {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will fix that. :)

@CodingCat
Copy link
Contributor

I see... I was just confused by

For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking.

I thought end users mean Spark users, so when they call sc. killExecutors, they can indicate if they want force kill....I agree to leave this flag within Spark

@GraceH
Copy link
Contributor Author

GraceH commented Aug 4, 2015

@CodingCat Sorry for the ambiguous words in the description. In general, the patch aims to fix the false killing bug in dynamic allocation. And at the same time, we leave a chance to have more options in killExecutors.

@SparkQA
Copy link

SparkQA commented Aug 4, 2015

Test build #39625 has finished for PR 7888 at commit 1c7ae2b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow]

@GraceH
Copy link
Contributor Author

GraceH commented Aug 5, 2015

It seems the test failure not related to this PR

doKillExecutors(knownExecutors)
executorsPendingToRemove ++= idleExecutors
// return false: there has some busyExecutors or killing certain executor failed
doKillExecutors(idleExecutors) && idleExecutors.size == knownExecutors.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the semantics of the return value. All it says is whether the request is ack'ed by the cluster manager, not whether the kill will actually happen. We should keep the original return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 The problems here are,

  • if there is not idleExecutor to be killed, shall we return back with acknowledged?
  • It is quite tricky to have the force control for killExecutors. For example, we have 3 executors to kill. But only one of them are idle. Shall we return true to the end user?

@andrewor14
Copy link
Contributor

Hi @GraceH thanks for fixing this problem. I agree with the problem statement and the root cause. However, there are two outstanding issues with the solution:

(1) ExecutorAllocationManager currently assumes that all remove requests will eventually be fulfilled. This assumption is no longer true as of this patch if the executor turns out to be busy. In this case we need to somehow remove it from executorsPendingToRemove. We can do this, for instance, when we get a TaskStart and/or TaskEnd event for that executor.

(2) Currently we never set force = true right? I think we should set it to true if the user explicitly calls sc.killExecutor. However, to distinguish between that case and dynamic allocation, we'll need to add more private[spark] methods to ExecutorAllocationClient that accept the force flag. Then we'll have the ExecutorAllocationManager call the internal methods and explicitly pass force = false.

What do you think?

@GraceH
Copy link
Contributor Author

GraceH commented Sep 7, 2015

@andrewor14 Thanks for the feedback. I will take a look at your comments, and to revise the code accordingly. any concern, will let you know.

@GraceH
Copy link
Contributor Author

GraceH commented Sep 7, 2015

@andrewor14 Thanks for the comments.

Regarding #1, very good point. That's why I try to return back false if force-killing failed. This is the simplest way. That executorID won't be added to executorsPendingToRemove.add(executorId). See https://github.com/GraceH/spark/blob/forcekill/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L410. The only concern here is that it somehow changes the semantics for that return value.

Regarding #2, Nice suggestion. That's also my thoughts too. The end user can force kill any executor by setting force=true. I will make private function for ExecutorAllocationManager

@GraceH
Copy link
Contributor Author

GraceH commented Sep 8, 2015

@andrewor14 I have pushed another proposal. Please let me know your comments.

  • The SparkContext allows end-user to set force control while killExecutor(s). Dynamic allocation will always uses force control as false to avoid false killing while executor is busy.
  • The killExectutors log out some executor busy, and cannot be killed if `force==false
  • The killExectutors return back acknowledge no matter it has executor to kill
  • The onTaskStart (i.e., OnExecutorBusy) will rescuer certain executor from pendingToRemove list if it is busy and added to that list by misjudgment.
  • Add one HashMap for all those activeExecutors, which records the running/launched task number. If the task number > 0, that executor is busy. And isExecutorBusy returns back true.

@SparkQA
Copy link

SparkQA commented Sep 8, 2015

Test build #42135 has finished for PR 7888 at commit ebb13a3.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@GraceH
Copy link
Contributor Author

GraceH commented Oct 28, 2015

@andrewor14 I have tried to rebase the original proposal to latest master branch. Please let me know if you have further question or concern. Thanks a lot.

@SparkQA
Copy link

SparkQA commented Oct 28, 2015

Test build #44534 has finished for PR 7888 at commit c23f887.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

@vanzin can you have a look?

@GraceH
Copy link
Contributor Author

GraceH commented Nov 6, 2015

Thanks @andrewor14.

Hi @vanzin, Let me give a quick brief to you about the patch and its goal.

There is a bug in dynamic allocation. Since some of the busy executors might be killed by "mistake", when we met such kind of situation in real-world deployment frequently.

  1. The executor is being idled for 60 seconds, and just marked as to be killed by dynamic allocation criteria.
  2. The scheduler is assigning one/several tasks to that executor. The listener event not reached that time. (since the listener event only happens after new tasks assigned asynchronously)
  3. The executor is killed as planned. But actually, that executor is just assigned with some tasks. That causes one busy executor is killed by ”mistake".

To solve this problem, one thing is to make that task assignment and notification synchronized. But this approach is not suitable for current implementation (listener mechanism).

Here I proposed another way. To add the force control in killExecutor(). For dynamic allocation, we need to check if the executor is busy or not before really taking the kill action. By doing so, even the listen event not arrives in time, we can actively rescue certain busy executors (to be killed but with new tasks assigned). Thru dynamic allocation we should not kill those busy executors (disable force control).

And meanwhile, we open that force control to the end user (sparkcontext public API). The end user can decide whether to force kill certain executors .

Please let me understand your thoughts. Thanks a lot.

@@ -88,7 +88,8 @@ private[spark] class TaskSchedulerImpl(
val nextTaskId = new AtomicLong(0)

// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
// each executor will record running or launched task number
val activeExecutorIdsWithLoads = new HashMap[String, Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of WithLoads, WithTasks or WithTaskCount?

@vanzin
Copy link
Contributor

vanzin commented Nov 7, 2015

I'm not a huge fan of changing the public API, especially since it breaks binary compatibility (even if annotated with @DeveloperApi)... would it be possible to not change SparkContext and add the new parameter just to the internal classes?

@GraceH
Copy link
Contributor Author

GraceH commented Nov 9, 2015

Thanks @vanzin for the comments. I will change the stuffs accordingly.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45816 has finished for PR 7888 at commit 342a59d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@GraceH
Copy link
Contributor Author

GraceH commented Nov 13, 2015

@vanzin and @andrewor14 , please let me know your further imports. sorry for certain rounds of amendments.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45822 has finished for PR 7888 at commit 4ce0ec0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45829 has finished for PR 7888 at commit 0000551.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)

// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
// Number of tasks runing on each executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. sorry for that.

@andrewor14
Copy link
Contributor

@GraceH I think the new code in CoarseGrainedSchedulerBackend now looks kind of strange just to make the tests pass. I have suggested an alternative of fixing the test in a PR against your branch. Please have a look: GraceH#1

@GraceH
Copy link
Contributor Author

GraceH commented Nov 17, 2015

@andrewor14 That is really a good way to have mock busy status. Thanks a lot, really learn a lot from that.

@GraceH
Copy link
Contributor Author

GraceH commented Nov 17, 2015

@vanzin Also thanks for helping me to clarify the thoughts for acknowledgement part.

@vanzin
Copy link
Contributor

vanzin commented Nov 17, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 17, 2015

Test build #46039 has finished for PR 7888 at commit 0daeb5a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@GraceH
Copy link
Contributor Author

GraceH commented Nov 17, 2015

@andrewor14 My bad. Since the val executors = getExecutorIds(sc) is fetched beforehand. We should not kill executors.head again and again (it should be executor.head, and then executor(1)). Now, i change the sequence of that.

  1. set force = false to ignore the first executor
  2. set force = true to force kill that first executor.

Now it should work.

@SparkQA
Copy link

SparkQA commented Nov 17, 2015

Test build #46073 has finished for PR 7888 at commit 1938e61.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 17, 2015

Test build #46101 has finished for PR 7888 at commit 1938e61.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Ok, LGTM. I'm merging this into master and 1.6.

We can fix the thing @vanzin pointed out (about not adding the executor to executorsPendingToRemove in the first place) later if necessary. It does simplify the logic a little so I'm slightly in favor of doing it too. @GraceH if you have time could you address that in a separate patch using the same JIRA?

@asfgit asfgit closed this in 965245d Nov 17, 2015
asfgit pushed a commit that referenced this pull request Nov 17, 2015
…ng for those busy executors

By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized.

For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time.
 1. the timer expiration starts before the listener event arrives.
 2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally.

Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation.

For the rest of usages, the end users can decide if to use force killing or not by themselves.  If to turn on that option, the killExecutor will do the action without any status checking.

Author: Grace <jie.huang@intel.com>
Author: Andrew Or <andrew@databricks.com>
Author: Jie Huang <jie.huang@intel.com>

Closes #7888 from GraceH/forcekill.

(cherry picked from commit 965245d)
Signed-off-by: Andrew Or <andrew@databricks.com>
@GraceH
Copy link
Contributor Author

GraceH commented Nov 18, 2015

@andrewor14 @vanzin Thanks all. I will follow that by creating a new patch under SPARK-9552.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants