Skip to content

Commit

Permalink
[SPARK-17675][CORE] Expand Blacklist for TaskSets
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This is a step along the way to SPARK-8425.

To enable incremental review, the first step proposed here is to expand the blacklisting within tasksets. In particular, this will enable blacklisting for
* (task, executor) pairs (this already exists via an undocumented config)
* (task, node)
* (taskset, executor)
* (taskset, node)

Adding (task, node) is critical to making spark fault-tolerant of one-bad disk in a cluster, without requiring careful tuning of "spark.task.maxFailures". The other additions are also important to avoid many misleading task failures and long scheduling delays when there is one bad node on a large cluster.

Note that some of the code changes here aren't really required for just this -- they put pieces in place for SPARK-8425 even though they are not used yet (eg. the `BlacklistTracker` helper is a little out of place, `TaskSetBlacklist` holds onto a little more info than it needs to for just this change, and `ExecutorFailuresInTaskSet` is more complex than it needs to be).

## How was this patch tested?

Added unit tests, run tests via jenkins.

Author: Imran Rashid <irashid@cloudera.com>
Author: mwws <wei.mao@intel.com>

Closes #15249 from squito/taskset_blacklist_only.
  • Loading branch information
squito committed Oct 12, 2016
1 parent 47776e7 commit 9ce7d3e
Show file tree
Hide file tree
Showing 17 changed files with 964 additions and 198 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more.")
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ case class FetchFailed(
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}

/**
* Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
* 4 task failures, instead we immediately go back to the stage which generated the map output,
* and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
* presumably its not the fault of the executor where the task ran, but the executor which
* stored the data. This is especially important because we we might rack up a bunch of
* fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
*/
override def countTowardsTaskFailures: Boolean = false
}

/**
Expand Down Expand Up @@ -204,6 +214,7 @@ case object TaskResultLost extends TaskFailedReason {
@DeveloperApi
case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
override def countTowardsTaskFailures: Boolean = false
}

/**
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.internal

import java.util.concurrent.TimeUnit

import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -91,6 +93,49 @@ package object config {
.toSequence
.createWithDefault(Nil)

private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)

// Blacklist confs
private[spark] val BLACKLIST_ENABLED =
ConfigBuilder("spark.blacklist.enabled")
.booleanConf
.createOptional

private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor")
.intConf
.createWithDefault(1)

private[spark] val MAX_TASK_ATTEMPTS_PER_NODE =
ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
.intConf
.createWithDefault(2)

private[spark] val BLACKLIST_TIMEOUT_CONF =
ConfigBuilder("spark.blacklist.timeout")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
// End blacklist confs

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
Expand Down
114 changes: 114 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.Utils

private[scheduler] object BlacklistTracker extends Logging {

private val DEFAULT_TIMEOUT = "1h"

/**
* Returns true if the blacklist is enabled, based on checking the configuration in the following
* order:
* 1. Is it specifically enabled or disabled?
* 2. Is it enabled via the legacy timeout conf?
* 3. Default is off
*/
def isBlacklistEnabled(conf: SparkConf): Boolean = {
conf.get(config.BLACKLIST_ENABLED) match {
case Some(enabled) =>
enabled
case None =>
// if they've got a non-zero setting for the legacy conf, always enable the blacklist,
// otherwise, use the default.
val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).exists { legacyTimeout =>
if (legacyTimeout == 0) {
logWarning(s"Turning off blacklisting due to legacy configuration: $legacyKey == 0")
false
} else {
logWarning(s"Turning on blacklisting due to legacy configuration: $legacyKey > 0")
true
}
}
}
}

def getBlacklistTimeout(conf: SparkConf): Long = {
conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse {
conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse {
Utils.timeStringAsMs(DEFAULT_TIMEOUT)
}
}
}

/**
* Verify that blacklist configurations are consistent; if not, throw an exception. Should only
* be called if blacklisting is enabled.
*
* The configuration for the blacklist is expected to adhere to a few invariants. Default
* values follow these rules of course, but users may unwittingly change one configuration
* without making the corresponding adjustment elsewhere. This ensures we fail-fast when
* there are such misconfigurations.
*/
def validateBlacklistConfs(conf: SparkConf): Unit = {

def mustBePos(k: String, v: String): Unit = {
throw new IllegalArgumentException(s"$k was $v, but must be > 0.")
}

Seq(
config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
config.MAX_TASK_ATTEMPTS_PER_NODE,
config.MAX_FAILURES_PER_EXEC_STAGE,
config.MAX_FAILED_EXEC_PER_NODE_STAGE
).foreach { config =>
val v = conf.get(config)
if (v <= 0) {
mustBePos(config.key, v.toString)
}
}

val timeout = getBlacklistTimeout(conf)
if (timeout <= 0) {
// first, figure out where the timeout came from, to include the right conf in the message.
conf.get(config.BLACKLIST_TIMEOUT_CONF) match {
case Some(t) =>
mustBePos(config.BLACKLIST_TIMEOUT_CONF.key, timeout.toString)
case None =>
mustBePos(config.BLACKLIST_LEGACY_TIMEOUT_CONF.key, timeout.toString)
}
}

val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)

if (maxNodeAttempts >= maxTaskFailures) {
throw new IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
s"Spark will not be robust to one bad node. Decrease " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler

import scala.collection.mutable.HashMap

/**
* Small helper for tracking failed tasks for blacklisting purposes. Info on all failures on one
* executor, within one task set.
*/
private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
/**
* Mapping from index of the tasks in the taskset, to the number of times it has failed on this
* executor.
*/
val taskToFailureCount = HashMap[Int, Int]()

def updateWithFailure(taskIndex: Int): Unit = {
val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
taskToFailureCount(taskIndex) = prevFailureCount + 1
}

def numUniqueTasksWithFailures: Int = taskToFailureCount.size

/**
* Return the number of times this executor has failed on the given task index.
*/
def getNumTaskFailures(index: Int): Int = {
taskToFailureCount.getOrElse(index, 0)
}

override def toString(): String = {
s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
s"tasksToFailureCount = $taskToFailureCount"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.util.{Timer, TimerTask}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.scheduler.local.LocalSchedulerBackend
Expand Down Expand Up @@ -57,7 +57,7 @@ private[spark] class TaskSchedulerImpl(
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))

val conf = sc.conf

Expand Down Expand Up @@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl(

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
protected val executorsByHost = new HashMap[String, HashSet[String]]
protected val hostToExecutors = new HashMap[String, HashSet[String]]

protected val hostsByRack = new HashMap[String, HashSet[String]]

Expand Down Expand Up @@ -243,8 +243,8 @@ private[spark] class TaskSchedulerImpl(
}
}
manager.parent.removeSchedulable(manager)
logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s"
.format(manager.taskSet.id, manager.parent.name))
logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
s" ${manager.parent.name}")
}

private def resourceOfferSingleTaskSet(
Expand Down Expand Up @@ -291,11 +291,11 @@ private[spark] class TaskSchedulerImpl(
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToTaskCount.contains(o.executorId)) {
executorsByHost(o.host) += o.executorId
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount(o.executorId) = 0
Expand Down Expand Up @@ -334,7 +334,7 @@ private[spark] class TaskSchedulerImpl(
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}

Expand Down Expand Up @@ -542,10 +542,10 @@ private[spark] class TaskSchedulerImpl(
executorIdToTaskCount -= executorId

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
val execs = hostToExecutors.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
executorsByHost -= host
hostToExecutors -= host
for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
hosts -= host
if (hosts.isEmpty) {
Expand All @@ -565,11 +565,11 @@ private[spark] class TaskSchedulerImpl(
}

def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
executorsByHost.get(host).map(_.toSet)
hostToExecutors.get(host).map(_.toSet)
}

def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
executorsByHost.contains(host)
hostToExecutors.contains(host)
}

def hasHostAliveOnRack(rack: String): Boolean = synchronized {
Expand Down Expand Up @@ -662,5 +662,4 @@ private[spark] object TaskSchedulerImpl {

retval.toList
}

}

0 comments on commit 9ce7d3e

Please sign in to comment.