Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20286][core] Improve logic for timing out executors in dynamic allocation. #24704

Closed
wants to merge 16 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented May 24, 2019

This change refactors the portions of the ExecutorAllocationManager class that
track executor state into a new class, to achieve a few goals:

  • make the code easier to understand
  • better separate concerns (task backlog vs. executor state)
  • less synchronization between event and allocation threads
  • less coupling between the allocation code and executor state tracking

The executor tracking code was moved to a new class (ExecutorMonitor) that
encapsulates all the logic of tracking what happens to executors and when
they can be timed out. The logic to actually remove the executors remains
in the EAM, since it still requires information that is not tracked by the
new executor monitor code.

In the executor monitor itself, of interest, specifically, is a change in
how cached blocks are tracked; instead of polling the block manager, the
monitor now uses events to track which executors have cached blocks, and
is able to detect also unpersist events and adjust the time when the executor
should be removed accordingly. (That's the bug mentioned in the PR title.)

Because of the refactoring, a few tests in the old EAM test suite were removed,
since they're now covered by the newly added test suite. The EAM suite was
also changed a little bit to not instantiate a SparkContext every time. This
allowed some cleanup, and the tests also run faster.

Tested with new and updated unit tests, and with multiple TPC-DS workloads
running with dynamic allocation on; also some manual tests for the caching
behavior.

… allocation.

This change refactors the portions of the ExecutorAllocationManager class that
track executor state into a new class, to achieve a few goals:

- make the code easier to understand
- better separate concerns (task backlog vs. executor state)
- less synchronization between event and allocation threads
- less coupling between the allocation code and executor state tracking

The executor tracking code was moved to a new class (ExecutorMonitor) that
encapsulates all the logic of tracking what happens to executors and when
they can be timed out. The logic to actually remove the executors remains
in the EAM, since it still requires information that is not tracked by the
new executor monitor code.

In the executor monitor itself, of interest, specifically, is a change in
how cached blocks are tracked; instead of polling the block manager, the
monitor now uses events to track which executors have cached blocks, and
is able to detect also unpersist events and adjust the time when the executor
should be removed accordingly. (That's the bug mentioned in the PR title.)

Because of the refactoring, a few tests in the old EAM test suite were removed,
since they're now covered by the newly added test suite. The EAM suite was
also changed a little bit to not instantiate a SparkContext every time. This
allowed some cleanup, and the tests also run faster.

Tested with new and updated unit tests, and with multiple TPC-DS workloads
running with dynamic allocation on; also some manual tests for the caching
behavior.
@vanzin
Copy link
Contributor Author

vanzin commented May 24, 2019

@dhruve I cleaned up the code I had lying around and made it work on top of the existing EAM. This PR is a replacement for #22015.

@attilapiros I removed some code you added recently as part of this patch.

@@ -70,10 +71,12 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
}
}

def shuffleServiceConf: SparkConf = sparkConf.clone().set(SHUFFLE_SERVICE_PORT, 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change isn't exactly related; but this test was failing when I forgot a shuffle service running locally when running tests for the rest of this PR. So might as well fix this.

@SparkQA
Copy link

SparkQA commented May 25, 2019

Test build #105774 has finished for PR 24704 at commit 05b6802.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -142,30 +137,22 @@ private[spark] class ExecutorAllocationManager(
// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible also moving this into ExecutorMonitor ? So that ExecutorAllocationMonitor would be more pure by removing onExecutorRemoved and ExecutorMonitor would be more knowledgeable about executor's state, including pending.

@@ -744,10 +592,6 @@ private[spark] class ExecutorAllocationManager(
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}

// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also remove line 719 of executorId ?

// and if it changed, don't consider this executor for removal yet, delaying
// the decision until the next check.
if (exec.timeoutAt > deadline) {
recomputeTimedOutExecs = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about recomputeTimedOutExecs. Why do we need to eagerly recompute timed out executors while exec.timeoutAt changed to be bigger ? If exec.timeoutAt changed to be bigger than final newNextTimeout, we can just follow the condition now >= nextTimeout.get() to decide whether we need the check. And, instead of setting recomputeTimedOutExecs = true, maybe, we should just update the newNextTimeout comparing to changed exec.timeoutAt. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should just update the newNextTimeout comparing to changed exec.timeoutAt

I think that can work and would avoid a little bit of unnecessary work. Let me do that instead.

@SparkQA
Copy link

SparkQA commented May 28, 2019

Test build #105879 has finished for PR 24704 at commit 7f72720.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105918 has finished for PR 24704 at commit 7927239.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// the executor to change after it was read above. Check the deadline again,
// and if it changed, don't consider this executor for removal yet.
val newDeadline = exec.timeoutAt
if (newDeadline > deadline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this newDeadline > now? somewhat pathological case, but if this thread was really slow, and you set your timeouts super tiny, could happen. more importantly, I just think it would be more clear.

throw new SparkException(s"${DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
if (storageTimeoutMs < 0) {
throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these checks could now go on the configs themselves

exec.updateTimeout()
}
} else {
exec.cachedBlocks.get(blockId.rddId).foreach { blocks =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to do this when storageLevel.isValid && fetchFromShuffleSvcEnabled && storageLevel.useDisk? I If its the first time the block is cached, then its a no-op. If it has already been cached in memory, then this would no longer apply the cached timeout to this executor. I guess that is desired? just want to check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see this is explicitly checked in a test too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's basically what SPARK-27677 does.

blocks -= blockId.splitIndex
if (blocks.isEmpty) {
exec.cachedBlocks -= blockId.rddId
exec.updateTimeout()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be done only if exec.cachedBlocks.isEmpty, though I guess there is nothing wrong this way.

private var runningTasks: Int = 0

// This should only be used in the event thread.
val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would help to add a comment that its rdd_id -> partition_ids

assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))

conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
monitor = new ExecutorMonitor(conf, client, clock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set the monitor back at the end of this test (will be pretty confusing if somebody tries to add another test after this one otherwise)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean? The monitor is re-initialized for each test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooops my fault.

@SparkQA
Copy link

SparkQA commented May 29, 2019

Test build #105930 has finished for PR 24704 at commit 50d9c91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// If the executor was thought to be timed out, but the new deadline is later than the
// old one, ask the EAM thread to update the list of timed out executors.
if (timedOut && newDeadline > oldDeadline) {
nextTimeout.set(Long.MinValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set current nextTimeout to MinValue ? Do you mean we need eagerly recompute timed out executors at next round to update the list of timed out executors ? But, as far as I know from the code, each time EAM thread invoke on timedOutExecutors(), it gets the list of timed out executors and asks cluster manager to kill them by ExecutorAllocationClient immediately. So, if an executor has already been considered timed out and killing after last round, will there be any difference for that executor if we eagerly update the list of timed out executors in next round ?

I guess, here, you mean to handle the case that an event comes making the executor to be non timed out again after the executor just considered to be timeout nearly after scanning. But, as I mentioned above, I think it won't work. Instead of after scanning, I think, maybe, we can do optimization for the case when it happens during scanning. For example, wrapping a while loop on scanning and exit until all timed out executors' timeOut to be true:

// this check guard against event which updates deadline during scanning
while(!checkTimedOut(timedOutExecs)) {
 ...
 timedOutExecs = scanning {
 }
 ...
}

WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets the list of timed out executors and asks cluster manager to kill them by ExecutorAllocationClient immediately. So, if an executor has already been considered timed out and killing after last round, will there be any difference for that executor if we eagerly update the list of timed out executors in next round ?

only the CoarseGrainedSchedulerBackend has a consistent view of which executors are actually running tasks, and so it has ultimate say in whether or not to kill an executor. You might not have killed an executor earlier because it was actually busy.

So here is where you "catch up" with the view of the CGSB, so you know that executor shouldn't be considered timed out and requested to be killed next time around.

(at least I think so, marcelo can confirm ...)

Copy link
Contributor Author

@vanzin vanzin May 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What Imran said, with a complication.

The scenario here is decidedly rare. The EAM thread sees a timed out executor at the same time a new task is starting on it. So the EAM tries to kill it, but the CGSB says no.

While that's happening the task start event arrives, and the monitor now knows the executor is not idle anymore. But the executor is still there in the list of timed out executors, so on every cycle, the EAM would still try to kill it and the CGSB will keep saying no. Until it becomes idle again (but not timed out!), and then it will be killed, but at the wrong time.

So it needs to be removed from the timed out list to avoid that problem.

There's still a case where the task start event isn't processed until after the task has actually ended, and the EAM thread runs again and actually manages to kill an executor before it should. But that should be even rarer, and I'm not sure that can be fixed, since it involves synchronization with threads that this code has no control over.

Copy link
Member

@Ngone51 Ngone51 May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.@squito @vanzin

But the executor is still there in the list of timed out executors, so on every cycle, the EAM would still try to kill it and the CGSB will keep saying no.

I just realized that we have a cached timedOutExecs list in monitor. But there's one last point we may overlook. Once an executor has considered to be timed out in a previous scanning round, its pendingRemoval would be marked as true. So, in next scanning round, we'll filter it out before checking its deadline. Then, it won't appear in timedOutExecs due to pendingRemoval rather than its new deadline. We should cover this.

But, why we need to cache timedOutExecs in monitor ? Could we always return a latest timed out execs list to EAM on every cycle(e.g. return empty when now < nextTimeout.get) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in next scanning round, we'll filter it out before checking its deadline. Then, it won't appear in timedOutExecs due to pendingRemoval rather than its new deadline. We should cover this.

I'm not sure I understand this part. pendingRemoval is after the CGSB has acknowledged that it will remove the executor. We shouldn't need to send any more messages about killing the executor after this.

But, why we need to cache timedOutExecs in monitor ? Could we always return a latest timed out execs list to EAM on every cycle(e.g. return empty when now < nextTimeout.get) ?

I think you might be right about this, I would need to read it more carefully ... but in any case, I think that would make it significantly harder to follow. Its a lot easier to reason about if ExecutorMonitor.timedOutExecutors() consistently returns the right list.

Copy link
Contributor Author

@vanzin vanzin May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this doesn't work because of an edge case.

When calculating the timed out executors, the next timeout is set for when the next /non-timed out/ executor will time out. That is the important part.

Now imagine that you return 3 timed out executors and the EAM doesn't kill any of them (either because of limits, or because the CGSB doesn't do it). Now you will not get a call to executorsKilled, so the next timeout will not be reset.

Which means that next time you call timedOutExecutors() it will return an empty list, instead of the 3 timed out executors.

So you need that cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW you could say "just call executorsKilled with an empty list" but then that means the optimization doesn't work, and you're basically calculating the timed out list on every EAM schedule call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see the problem here. But, things here I care about is that, with a cached list, EAM may performs unnecessary executors removing for many many times before nextTimeout change. For example, we have 3 executors in monitor, and EAM gets those 3 timeout executors in a single round scanning. That's means, we can not update nextTimeout during this scanning and nextTimeout would just be Long.MaxValue before new events(which updates nextTimeout) comes. And, for EAM, before nextTimeout updates, it always gets those 3 cached timed out executors on every 100ms, and performs unnecessary executor removing again and again. This case may be rare, or have less impact on EAM, but I do worry about this similar possible unnecessary behavior with that cache list.

I think remove that cache list can be achieved with a little more change:

if (deadline > now) {
 newNextTimeout = math.min(newNextTimeout, deadline)
 exec.timedOut = false
 false
} else {
 exec.timedOut = true
 true
}
  1. instead of only updating newNextTimeout for non timed out executors, we should update it for timed out executors, too

  2. when we get a call to executorsKilled(executorIds), updates nextTimeout comparing to those killed executors' timeoutAt, rather than just set to Long.MinValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a cached list, EAM may performs unnecessary executors removing for many many times before nextTimeout change

That's actually the point. If the EAM did not kill the executors, it most probably means that there's an event that will be delivered to the listener and cause the next timeout to be updated (e.g. a task start).

If such event doesn't arrive, then the executors are still timed out, and will be always returned to the EAM. Then it may just decide not to kill them again (e.g. because the lower limit of executors has been reached).

All that is fine, and cheap computationally.

instead of only updating newNextTimeout for non timed out executors, we should update it for timed out executors, too

That means that if no executors are killed, you'll be re-scanning the whole executor list on every EAM interval until something changes, which is the exact thing the cache is trying to avoid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

timedOutExecs = Nil
}

def timedOutExecutors(): Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its worth a comment that this can only be called from the EAM scheduling thread.


// An event arriving while this scan is happening may cause the deadline for
// the executor to change after it was read above. Check the deadline again,
// and if it changed, don't consider this executor for removal yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only important for block updates, right? because if the executor is actually running a task, the CoarseGrainedSchedulerBackend should ignore the request to kill the task (and is the only thing which has a consistent view of that).

I'm not so sure this complexity is really necessary, as it will be imperfect anyway (the update could happen immediately after you do this). And it even exists for the other case -- you decided to not time out an executor, but then you just get an update to drop a block or unpersist an RDD, and now it should timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. I was trying to close a very specific and narrow race here, but yes, different races still exist and there isn't a good way to avoid them. The best is to hope the CGSB doesn't kill the executor.

then you just get an update to drop a block or unpersist an RDD, and now it should timeout

That is fine, the EAM should pick that up in the next cycle 100ms later.

// If the executor was thought to be timed out, but the new deadline is later than the
// old one, ask the EAM thread to update the list of timed out executors.
if (timedOut && newDeadline > oldDeadline) {
nextTimeout.set(Long.MinValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets the list of timed out executors and asks cluster manager to kill them by ExecutorAllocationClient immediately. So, if an executor has already been considered timed out and killing after last round, will there be any difference for that executor if we eagerly update the list of timed out executors in next round ?

only the CoarseGrainedSchedulerBackend has a consistent view of which executors are actually running tasks, and so it has ultimate say in whether or not to kill an executor. You might not have killed an executor earlier because it was actually busy.

So here is where you "catch up" with the view of the CGSB, so you know that executor shouldn't be considered timed out and requested to be killed next time around.

(at least I think so, marcelo can confirm ...)

@SparkQA
Copy link

SparkQA commented May 30, 2019

Test build #105976 has finished for PR 24704 at commit fe8eb1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

exec.timedOut = false
false
} else {
exec.timedOut = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change is good here. Previously I thought it overlap with some behavior in Tracker. And, now, it is quite simple and more readable and easy to understand.

} else {
exec.timedOut = true
true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (deadline > now) {
 newNextTimeout = math.min(newNextTimeout, deadline)
 exec.timedOut = false
 false
} else {
 exec.timedOut = true
 true
}

Maybe ?

if (deadline > now) {
 newNextTimeout = math.min(newNextTimeout, deadline)
 exec.timedOut = false
} else {
 exec.timedOut = true
}
exec.timedOut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid another read of a volatile variable.


def updateTimeout(): Unit = {
val oldDeadline = timeoutAt
val newDeadline = if (idleStart >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idleStart seems to be redundant. We could just use runningTasks == 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need it when updating the timeout, specifically when you drop cached blocks. You want to track the original time the executor went idle. (you could make the approximation to just restart the idle time when the blocks get dropped, but might as well do it right)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not redundant. When checking for idleness, this check is the same as runningTasks == 0, but we need both pieces of information for the timeout calculation to be correct.

val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]()

// For testing.
def isIdle: Boolean = idleStart >= 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can replace it with runningTasks == 0

assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))

monitor.onUnpersistRDD(SparkListenerUnpersistRDD(2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from @dhruve 's comment above, I realized it might be good to add to this test case a bit:

    // if we get an unpersist event much later, which moves an executor from having cached blocks
    // to no longer having cached blocks, we still know the timeout from the original time the
    // executor went idle
    clock.setTime(idleDeadline)
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the clock.setTime() is not needed, because the timedOutExecutors call here is a test-specific one that takes the time as a parameter.

But I'll add the comment.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one small suggestion on the tests, otherwise lgtm.

one thing I was thinking is that we don't have any tests on the handling when the CGSB decides to not kill an executor. That test really would need to go in ExecutorAllocationManagerSuite, turn off the dynamic allocation testing config, and mock the cgsb

@squito
Copy link
Contributor

squito commented May 31, 2019

one thing I was thinking is that we don't have any tests on the handling when the CGSB decides to not kill an executor. That test really would need to go in ExecutorAllocationManagerSuite, turn off the dynamic allocation testing config, and mock the cgsb

actually, that is totally wrong -- you can do this test in ExecutionMonitorSuite, just be interleaving calls to timedOutExecutors(), executorsKilled(), onTaskStart(), & executorsRemoved().

Marcelo Vanzin added 3 commits May 31, 2019 11:41
Now that the monitor is also keeping track of which executors are
pending for removal, that cache becomes unnecessary, since it knows
exactly when it should look at the executor list again to see which
ones can be killed.
Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

updateNextTimeout(newNextTimeout)
timedOutExecs
} else {
Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thought it was more clear with the cached value, but fine with this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually found some issue with the current code after I removed another piece of code that seemed to be doing nothing useful. That might also be related to the unit test failure in the last run. Taking a look...

assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3"))
assert(monitor.pendingRemovalCount === 0)

monitor.executorsKilled(Seq("1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a brief comment here would be helpful, eg. "The scheduler only kills some of the executors we asked it to (eg. because another task just started there, but really for whatever reason the scheduler feels like). Make sure the timedOut list gets updated appropriately."

@SparkQA
Copy link

SparkQA commented May 31, 2019

Test build #106031 has finished for PR 24704 at commit 42e5e9e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106037 has finished for PR 24704 at commit 5eb29c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106043 has finished for PR 24704 at commit 9bf4ece.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jun 2, 2019

LGTM

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the changes but this was just a superficial review besides that I do not want my super tiny NIT to be lost.

// "task start" event has just been posted to the listener bus and hasn't yet been delivered to
// this listener. There are safeguards in other parts of the code that would prevent that executor
// from being removed.
private var nextTimeout = new AtomicLong(Long.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This can be a val.

// of timed out executors needs to be updated due to the executor's state changing.
@volatile var timedOut: Boolean = false

var pendingRemoval: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be volatile too -- its accessed by the EAM thread and the listenerbus thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to. I tried very hard to avoid volatile when not necessary; in this code, it's generally only needed when ordering is important when checking multiple variables. That is not the case for this one, since it doesn't really depend on any other variables.

At first I also didn't have timedOut and timeoutAt as volatiles, but I couldn't 100% convince me that it was not needed. But in this case I'm pretty sure it doesn't do anything useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess the one access which is in the listener thread is in onExecutorRemoved(). If that sees the stale value, then it will incorrectly set the nextTimeout, telling the monitor to recompute the list. But while that is inefficient, its never incorrect. And the hope is, its rare enough that its still better to avoid making it volatile.

is that right?

I'd say it at least deserves a comment in onExecutorRemoved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that sees the stale value, then it will incorrectly set the nextTimeout

Right, but notice that isn't really related to the variable being volatile or not, it's just a race that can also occur if the variable is volatile.

Writing to that variable is an atomic operation. Volatile controls what happens to other reads / writes around that volatile write. That's why it's needed when the order of the reads and writes is important. But here it isn't.

@SparkQA
Copy link

SparkQA commented Jun 3, 2019

Test build #106118 has finished for PR 24704 at commit 2e2432f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2019

Test build #106177 has finished for PR 24704 at commit 9ae62d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jun 5, 2019

merged to master

@asfgit asfgit closed this in b312033 Jun 5, 2019
@vanzin vanzin deleted the SPARK-20286 branch June 5, 2019 17:05
emanuelebardelli pushed a commit to emanuelebardelli/spark that referenced this pull request Jun 15, 2019
… allocation.

This change refactors the portions of the ExecutorAllocationManager class that
track executor state into a new class, to achieve a few goals:

- make the code easier to understand
- better separate concerns (task backlog vs. executor state)
- less synchronization between event and allocation threads
- less coupling between the allocation code and executor state tracking

The executor tracking code was moved to a new class (ExecutorMonitor) that
encapsulates all the logic of tracking what happens to executors and when
they can be timed out. The logic to actually remove the executors remains
in the EAM, since it still requires information that is not tracked by the
new executor monitor code.

In the executor monitor itself, of interest, specifically, is a change in
how cached blocks are tracked; instead of polling the block manager, the
monitor now uses events to track which executors have cached blocks, and
is able to detect also unpersist events and adjust the time when the executor
should be removed accordingly. (That's the bug mentioned in the PR title.)

Because of the refactoring, a few tests in the old EAM test suite were removed,
since they're now covered by the newly added test suite. The EAM suite was
also changed a little bit to not instantiate a SparkContext every time. This
allowed some cleanup, and the tests also run faster.

Tested with new and updated unit tests, and with multiple TPC-DS workloads
running with dynamic allocation on; also some manual tests for the caching
behavior.

Closes apache#24704 from vanzin/SPARK-20286.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
cfmcgrady pushed a commit to cfmcgrady/spark that referenced this pull request Aug 1, 2019
… allocation.

This change refactors the portions of the ExecutorAllocationManager class that
track executor state into a new class, to achieve a few goals:

- make the code easier to understand
- better separate concerns (task backlog vs. executor state)
- less synchronization between event and allocation threads
- less coupling between the allocation code and executor state tracking

The executor tracking code was moved to a new class (ExecutorMonitor) that
encapsulates all the logic of tracking what happens to executors and when
they can be timed out. The logic to actually remove the executors remains
in the EAM, since it still requires information that is not tracked by the
new executor monitor code.

In the executor monitor itself, of interest, specifically, is a change in
how cached blocks are tracked; instead of polling the block manager, the
monitor now uses events to track which executors have cached blocks, and
is able to detect also unpersist events and adjust the time when the executor
should be removed accordingly. (That's the bug mentioned in the PR title.)

Because of the refactoring, a few tests in the old EAM test suite were removed,
since they're now covered by the newly added test suite. The EAM suite was
also changed a little bit to not instantiate a SparkContext every time. This
allowed some cleanup, and the tests also run faster.

Tested with new and updated unit tests, and with multiple TPC-DS workloads
running with dynamic allocation on; also some manual tests for the caching
behavior.

Closes apache#24704 from vanzin/SPARK-20286.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants