Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16630][YARN] Blacklist a node if executors won't launch on it #21068

Closed
wants to merge 16 commits into from

Conversation

attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Apr 13, 2018

What changes were proposed in this pull request?

This change extends YARN resource allocation handling with blacklisting functionality.
This handles cases when node is messed up or misconfigured such that a container won't launch on it. Before this change backlisting only focused on task execution but this change introduces YarnAllocatorBlacklistTracker which tracks allocation failures per host (when enabled via "spark.yarn.blacklist.executor.launch.blacklisting.enabled").

How was this patch tested?

With unit tests

Including a new suite: YarnAllocatorBlacklistTrackerSuite.

Manually

It was tested on a cluster by deleting the Spark jars on one of the node.

Behaviour before these changes

Starting Spark as:

spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6"

Log is:

18/04/12 06:49:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://apiros-1.gce.test.com:8020/user/systest/.sparkStaging/application_1523459048274_0016
18/04/12 06:49:39 INFO util.ShutdownHookManager: Shutdown hook called

Behaviour after these changes

Starting Spark as:

spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6" --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true"

And the log is:

18/04/13 05:37:43 INFO yarn.YarnAllocator: Will request 1 executor container(s), each with 1 core(s) and 4505 MB memory (including 409 MB of overhead)
18/04/13 05:37:43 INFO yarn.YarnAllocator: Submitted 1 unlocalized container requests.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Launching container container_1523459048274_0025_01_000008 on host apiros-4.gce.test.com for executor with ID 6
18/04/13 05:37:43 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Completed container container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com (state: COMPLETE, exit status: 1)
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com)
18/04/13 05:37:43 WARN yarn.YarnAllocator: Container marked as failed: container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1523459048274_0025_01_000007
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
        at org.apache.hadoop.util.Shell.run(Shell.java:507)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Where the most important part is:

18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com)

And execution was continued (no shutdown called).

Testing the backlisting of the whole cluster

Starting Spark with YARN blacklisting enabled then removing a the Spark core jar one by one from all the cluster nodes. Then executing a simple spark job which fails checking the yarn log the expected exit status is contained:

18/06/15 01:07:10 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Due to executor failures all available nodes are blacklisted)
18/06/15 01:07:13 INFO util.ShutdownHookManager: Shutdown hook called

@tgravescs
Copy link
Contributor

Jenkins, test this please

@squito
Copy link
Contributor

squito commented Apr 13, 2018

Jenkins, add to whitelist

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89347 has finished for PR 21068 at commit fd1923e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89348 has finished for PR 21068 at commit fd1923e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89350 has finished for PR 21068 at commit fd1923e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just really minor comments from a first read, need to spend more time understanding it all better

// Queue to store the timestamp of failed executors for each host
private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]()

private val sumFailedExecutorsTimeStamps = new mutable.Queue[Long]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this called "sum"? I think the old name failedExecutorTimestamps is more appropriate, same for the other places you added "sum"

BLACKLIST_SIZE_LIMIT.getOrElse((numClusterNodes * BLACKLIST_SIZE_DEFAULT_WEIGHT).toInt)
val nodesToBlacklist =
if (schedulerBlacklistedNodesWithExpiry.size +
allocationBlacklistedNodesWithExpiry.size > limit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double-indent the continued continuation of the if condition. (we dont' do this everywhere but we should, I find it helps)

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its worth considering if we can make these changes less yarn-specific. Really we're only getting a bit of info from the cluster manager:

  1. the container failed during allocation
  2. how many nodes are on the cluster

and we only need to have the combined set of blacklisted nodes available to the cluster manager. The rest of the logic could live within BlacklistTracker (or some similar helper) which doesn't need to know about the cluster manager at all.

Other than just renaming, the significant change that would mean is that all the logic in YarnAllocatorBlacklistTracker would need to move to ther driver, instead of on the AM, so it would change the messages somewhat. In particular I think you'd need to change the ExecutorExited message to include whether it was a failure to even allocate the container.

This way it would be easier to add this for mesos (there are already mesos changes that are sort of waiting on this) and kubernetes

@tgravescs thoughts?

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89355 has finished for PR 21068 at commit e49bd0d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 15, 2018

Test build #89373 has finished for PR 21068 at commit 57086bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@henryr henryr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just took a quick look - would work on Imran's advice first and then see if any of my comments are still valid.

}

private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider just:

if (!IS_YARN_ALLOCATION_BLACKLIST_ENABLED) return;

to save a level of indentation below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I know using return in Scala is mostly discouraged. Anyway here we have only two levels of indentations so I would keep these if conditions as it is.

if (IS_YARN_ALLOCATION_BLACKLIST_ENABLED) {
val failuresOnHost = failureWithinTimeIntervalTracker.getNumExecutorFailuresOnHost(hostname)
if (failuresOnHost > BLACKLIST_MAX_FAILED_EXEC_PER_NODE) {
logInfo("blacklisting host as YARN allocation failed: %s".format(hostname))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log msg could include the number of failures

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will add it


private var currentBlacklistedYarnNodes = Set.empty[String]

private var schedulerBlacklistedNodesWithExpiry = Map.empty[String, Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to keep a separate data structure for the scheduler and allocator blacklisted nodes? Instead, could you add the scheduler ones into a shared map when setSchedulerBlacklistedNodes is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to store them separately as we there is these two sources of backlisted nodes and they are updated separately via the two setters where not the diffs but the complete state of the backlisted sets are coming (another reason is only allocator backlisted nodes expiry handled by YarnAllocatorBlacklistTracker).

while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStampsForHost.nonEmpty
&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) {
failedExecutorsTimeStampsForHost.dequeue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's counter-intuitive that this get* method mutates state. If I called

getNumFailuresWithinValidityInterval(foo, 0)
getNumFailuresWithinValidityInterval(foo, 10)
getNumFailuresWithinValidityInterval(foo, 0)

The last call can return something different from the first because all the failures that weren't within 10 - executorFailuresValidityInterval will have been dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will take your recommendation and drop endTime with renaming the method to getRecentFailureCount.

endTime: Long): Int = {
while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStampsForHost.nonEmpty
&& failedExecutorsTimeStampsForHost.head < endTime - executorFailuresValidityInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on the fact the clock is monotonic, but if it's a SystemClock it's based on System.currentTimeMillis() which is not monotonic and can time-travel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is coming from YarnAllocator.scala#L175.

As I see it the common solution to use Clock, SystemClock and ManualClock in Spark. And here the validity time is much higher then the diff NTP can apply.


private val failedExecutorsTimeStamps = new mutable.Queue[Long]()

private def getNumFailuresWithinValidityInterval(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really clear what a 'validity interval' is. I think it means that only failures that have happened recently are considered valid? I think it would be clearer to call this getNumFailuresSince(), or getRecentFailureCount() or similar, and explicitly pass in the timestamp the caller wants to consider failures since.

If you do the latter, and drop the endTime argument, then you partly address the issue I raise below about how this mutates state, because getRecentFailureCount() suggests more clearly that it's expecting to take into account the current time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks then getRecentFailureCount will be the method name without the endTime argument.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can move it to be common I think that would be good.
@squito do you know if mesos and/or kubernetes can provide this same information?

@@ -216,6 +216,10 @@ private[scheduler] class BlacklistTracker (
}
}

private def updateNodeBlacklist(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function seems unnecessary to me, I don't see it adding any value vs doing inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

.booleanConf
.createOptional

private[spark] val YARN_BLACKLIST_SIZE_LIMIT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want both this and the spark.yarn.blacklist.size.default.weight?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove it

@@ -328,4 +328,26 @@ package object config {
CACHED_FILES_TYPES,
CACHED_CONF_ARCHIVE)

/* YARN allocator-level blacklisting related config entries. */
private[spark] val YARN_ALLOCATION_BLACKLIST_ENABLED =
ConfigBuilder("spark.yarn.allocation.blacklist.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say either just call it spark.yarn.blacklist.enabled or we make it more specific that this is executor launch failure blacklisting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I named it "spark.yarn.blacklist.enabled" but then I was wondering whether a user will confuse it with YARN backlisting so I have added the "allocation" part. So I would go for the second option: "spark.yarn.executor.launch.blacklist.enabled".


private[spark] val YARN_BLACKLIST_SIZE_DEFAULT_WEIGHT =
ConfigBuilder("spark.yarn.blacklist.size.default.weight")
.doc("If blacklist size limit is not specified then the default limit will be the number of " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps rename something like spark.yarn.blacklist.maxNodeBlacklistRatio . (note we are talking about using Ratio in another config here: #19881)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@tgravescs
Copy link
Contributor

actually the only other thing I need to make sure is there aren't any delays if we now send the information from yarn allocator back to scheduler and then I assume it would need to get it back again from scheduler. During that the yarn allocator could be calling allocate() and updating things. So we need to make sure it gets the most up to date blacklist.

also I need to double check but the blacklist information isn't being sent to the yarn allocator when dynamic allocation is off right? We would want that to happen.

@squito
Copy link
Contributor

squito commented Apr 17, 2018

actually the only other thing I need to make sure is there aren't any delays if we now send the information from yarn allocator back to scheduler and then I assume it would need to get it back again from scheduler. During that the yarn allocator could be calling allocate() and updating things. So we need to make sure it gets the most up to date blacklist.

also I need to double check but the blacklist information isn't being sent to the yarn allocator when dynamic allocation is off right? We would want that to happen.

yeah both good points. actually, don't we want to update the general node blacklist on the yarn allocator even when dynamic allocation is off? I don't think it gets updated at all unless dynamic allocation is on, it seems all the updates originate in ExecutorAllocationManager, the blacklist never actively pushes updates to the yarn allocator. That seems like an existing shortcoming.

do you know if mesos and/or kubernetes can provide this same information?

I don't know about kubernetes at all. Mesos does provide info when a container fails. I don't think it lets you know the total cluster size, but that should be optional. Btw, node count is never going to be totally sufficient, as the remaining nodes might not actually be able to run your executors (smaller hardware, always taken up by higher priority applications, other constraints in a framework like mesos), its always going to be best effort.

@attilapiros and I discussed this briefly yesterday, an alternative to moving everything into the BlacklistTracker on the driver is to just have some abstract base class, which is changed slightly for each cluster manager. Then you could keep the flow like it is here, with the extra blacklisting living in YarnAllocator still.

@attilapiros
Copy link
Contributor Author

Yes we can create an abstract class from YarnAllocatorBlacklistTracker (like AbstractAllocatorBlacklistTracker) where the method synchronizeBlacklistedNodes can have different implementations. In this case the core and the messages can stay as it is. As I see this is the less risky and cheaper solution. On the other hand having the complete blacklisting in the driver has a more centralized/clear design.

We just have to make our mind where to go from here. Any help and suggestions are welcomed for the decision.

@squito
Copy link
Contributor

squito commented Apr 18, 2018

I think Tom makes a good case for why this should live in the YarnAllocator as you have it.

I also don't think you need to worry about creating an abstract class yet, that refactoring can be done when another cluster manager tries to share some code ... it would just be helpful to keep that use in mind.

also I filed https://issues.apache.org/jira/browse/SPARK-24016 for updating the task-based node blacklist even with static allocation

@tgravescs
Copy link
Contributor

thanks for filing that jira @squito, I agree we should have blacklisting work with dynamic allocation disabled as well. (A bit of a tangent from this jira) I'm actually wondering now about the scheduler blacklisting and whether it should have a max blacklisted Ratio as well. I don't remember if we discussed this previously.

For this, I'm fine either way, if there are people interested in doing the mesos/kubernetes stuff now we could certainly coordinate with them to see if there is something common we could do now. I haven't had time to keep up with those jira to know though. Otherwise this isn't public facing so we can do that when they decide to implement it.

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89514 has finished for PR 21068 at commit c92a090.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Apr 19, 2018

@tgravescs on the blacklist ratio for task-based blacklisting -- there is nothing, but there are some related jiras: SPARK-22148 & SPARK-15815

to be honest I have doubts about the utility of the ratio ... if you really want to make sure blacklisting doesn't lead to starvation, you've got to have some other mechanism, as you could easily have the remaining nodes be occupied or have insufficient resources.

Kubernetes doesn't do anything with the node blacklisting currently: SPARK-23485

Mesos already has a notion of blacklisting nodes for failing to allocate containers, but its currently at odds with the task-based blacklist. #20640 is somewhat stalled because blacklisting based on allocation failures is missing in a general sense.

In any case, I still think we shouldn't make the code more complex for something other clusters managers might use in the future, and that the current overall organization is fine.

@tgravescs
Copy link
Contributor

ok sounds fine to me, so we should review as is then

@squito
Copy link
Contributor

squito commented Apr 25, 2018

A couple more high-level thoughts:

  1. Do we want to have a event posted about the node getting blacklisted? I think it would be useful. But then there needs to be a msg from the YarnAllocator back to the driver about the blacklisting.

  2. I was thinking about how this interacts with SPARK-13669. at first I was thinking this makes that entirely unnecessary, but I guess that is not true -- that is still useful if the external shuffle service goes down after the executor is started.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first pass and mostly pointed out stylistic stuff... I need a second pass to take a closer look at the functionality. Didn't see any red flags though.

@@ -126,7 +126,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.remove(node)
listenerBus.post(SparkListenerNodeUnblacklisted(now, node))
}
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
_nodeBlacklist.set(collection.immutable.Map(nodeIdToBlacklistExpiryTime.toSeq: _*))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as calling nodeIdToBlacklistExpiryTime.toMap? (That returns an immutable map.)

At the very least, the collectiom.immutable. part looks unnecessary. Same thing happens below.

@@ -651,8 +651,8 @@ private[spark] class TaskSchedulerImpl(
* Get a snapshot of the currently blacklisted nodes for the entire application. This is
* thread-safe -- it can be called without a lock on the TaskScheduler.
*/
def nodeBlacklist(): scala.collection.immutable.Set[String] = {
blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set())
def nodeBlacklistWithExpiryTimes(): scala.collection.immutable.Map[String, Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Map[String, Long]?

I kinda find it odd when I see these types used this way, so unless there's a good reason...

@@ -170,8 +170,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist != null &&
scheduler.nodeBlacklist.contains(hostname)) {
} else if (scheduler.nodeBlacklistWithExpiryTimes != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodeBlacklistWithExpiryTimes is never null right?

Also calling that method twice causes unnecessary computation...

import org.apache.spark.internal.Logging
import org.apache.spark.util.{Clock, SystemClock}

private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add scaladoc explaining what this does?


private[spark] class FailureWithinTimeIntervalTracker(sparkConf: SparkConf) extends Logging {

private var clock: Clock = new SystemClock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a constructor argument.

private def synchronizeBlacklistedNodeWithYarn(nodesToBlacklist: Set[String]): Unit = {
// Update blacklist information to YARN ResourceManager for this application,
// in order to avoid allocating new Containers on the problematic nodes.
val blacklistAdditions = (nodesToBlacklist -- currentBlacklistedYarnNodes).toList.sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additions, removals are just as good names for these variables.


private def removeExpiredYarnBlacklistedNodes() = {
val now = clock.getTimeMillis()
allocationBlacklistedNodesWithExpiry.retain {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this work?

allocationBlacklistedNodesWithExpiry.retain { case (_, expiry) =>
  ...
}

.doubleConf
.checkValue(weight => weight >= 0 && weight <= 1, "The value of this ratio must be in [0, 1].")
.createWithDefault(0.75)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many blank lines.

test("expiring its own blacklisted nodes") {
clock.setTime(0L)

1 to MAX_FAILED_EXEC_PER_NODE_VALUE foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1 to blah).foreach { _ =>
  ...
}

}

test("not handling the expiry of scheduler blacklisted nodes") {
clock.setTime(0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant?

@SparkQA
Copy link

SparkQA commented Apr 26, 2018

Test build #89889 has finished for PR 21068 at commit 0ba8510.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 30, 2018

Test build #91307 has finished for PR 21068 at commit 0e78b38.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented May 31, 2018

hey sorry I have been meaning to respond to this but keep getting sidetracked. As Tom and I are going to meet in person next week anyway, I figure at this point it makes sense to just wait till we chat directly to make sure we're on the same page. It sounds like we're in agreement but at this point might as well wait a couple more days, as I haven't had a chance to do a final review anyway

@squito
Copy link
Contributor

squito commented Jun 12, 2018

Tom and I had a chance to discuss this in person, and after some back and forth I think we decided that maybe its best to remove the limit but have the application fail if the entire cluster is blacklisted. @tgravescs does that sound correct?

I mentioned this briefly to @attilapiros and he mentioned that might be hard, but instead you could stop allocation blacklisting which would result in the usual yarn app failure from too many executors. He's going to look at this a little more closely and report back here. I'd be OK with that -- the main goal is just make sure that an app doesn't hang if you've blacklisted the entire cluster. I'm pretty sure that's @tgravescs main concern as well. (If the only reasonable way to do that is with the existing limit, I'm fine w/ that too.)

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91764 has finished for PR 21068 at commit 61f3d17.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91779 has finished for PR 21068 at commit 7fce4ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of minor things, but overall lgtm

@attilapiros can you please test this latest version on a cluster again?

@tgravescs this version will kill the app when the whole cluster is blacklisted, attila found out it was easy to do.


private val defaultTimeout = "1h"

private val blacklistTimeoutMillis =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlacklistTracker.getBlacklistTimeout(conf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then I relax a bit on the visibility of BlacklistTracker by changing it from private[scheduler] to private[spark].

val endTime = clock.getTimeMillis()
while (executorFailuresValidityInterval > 0 &&
failedExecutorsWithTimeStamps.nonEmpty &&
failedExecutorsWithTimeStamps.head < endTime - executorFailuresValidityInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double indent the condition

failedExecutorsWithTimeStamps.dequeue()
failedExecutorsWithTimeStamps.nonEmpty &&
failedExecutorsWithTimeStamps.head < endTime - executorFailuresValidityInterval) {
failedExecutorsWithTimeStamps.dequeue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but only single indent the body of the while

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91860 has finished for PR 21068 at commit aa52f6e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

attilapiros commented Jun 15, 2018

Retested manually on a cluster. The PR's description is updated with the result .

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91905 has finished for PR 21068 at commit 848d050.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91907 has finished for PR 21068 at commit a462ce0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jun 15, 2018

Jenkins, retest this please

@squito
Copy link
Contributor

squito commented Jun 15, 2018

lgtm

will leave open for a couple of days to let @tgravescs take a look

*
* <ul>
* <li> from the scheduler as task level blacklisted nodes
* <li> from this class (tracked here) as YARN resource allocation problems
Copy link
Contributor

@skonto skonto Jun 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This source never touches the scheduler's blacklist right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Just the other way around: the scheduler's blacklisted hosts will be sent here for forwarding them to YARN. This way at the resource allocation they will be taken into account.

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91920 has finished for PR 21068 at commit a462ce0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is going to be very useful

private def updateAllocationBlacklistedNodes(hostname: String): Unit = {
val failuresOnHost = failureTracker.numFailuresOnHost(hostname)
if (failuresOnHost > maxFailuresPerHost) {
logInfo(s"blacklisting $hostname as YARN allocation failed $failuresOnHost times")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe logWarn?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great if there is a metric on failuresOnHost count...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I am happy you consider this change useful.

Regarding logInfo I have chosen that to be consistent with the logging of the existing BlacklistTracker where blacklisting itself is taken as a part of the normal behaviour and logInfo is used. But if you have a strong feeling about logWarn I can do the change.

For the metrics I've done some quick search in the yarn module and it seems to me currently no metrics are coming from there so the change probably is not just a few lines. What about me creating a new jira task for it? Is that fine for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exposing metrics is not a bad idea, but I'd like to leave it out of this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung I have started to gain some experience about metrics (as I worked on SPARK-24594) and it seems to me the structure of the metrics (the metric names) should be known and registered before starting the metric systems. So I can add a new metric for ALL the failures, but not for each hosts separately, like with console sink:

-- Gauges ----------------------------------------------------------------------
yarn_cluster.executorFailures.ALL
             value = 3

Aggregated values would be also possible. Any idea what would be the most valuable for Spark users besides this restriction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a bit more about this problem and I have an idea: creating metrics for the number of hosts with a predetermined number of executor failures. Like yarn_cluster.numHostWithExecutorFailures.x where x is [1 , ... max (10, spark.blacklist.application.maxFailedExecutorsPerNode if backlisting enable, spark.yarn.max.executor.failures if set)]. What is your opinion?

@@ -328,4 +328,10 @@ package object config {
CACHED_FILES_TYPES,
CACHED_CONF_ARCHIVE)

/* YARN allocator-level blacklisting related config entries. */
private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to document this in docs/running-on-yarn.md

@tgravescs
Copy link
Contributor

Looks like it was modified to kill if all nodes blacklisted so I'm good with this approach.

@SparkQA
Copy link

SparkQA commented Jun 18, 2018

Test build #92031 has finished for PR 21068 at commit f71c7c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jun 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jun 18, 2018

Test build #92040 has finished for PR 21068 at commit f71c7c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

Here is the new task for the metrics: https://issues.apache.org/jira/browse/SPARK-24594.

@squito
Copy link
Contributor

squito commented Jun 21, 2018

merged to master. Thanks @attilapiros !

@asfgit asfgit closed this in b56e9c6 Jun 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants