Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into spark-30801
Browse files Browse the repository at this point in the history
  • Loading branch information
maryannxue committed Feb 13, 2020
2 parents 05a6b6f + 04604b9 commit 1adf99e
Show file tree
Hide file tree
Showing 117 changed files with 4,134 additions and 2,265 deletions.
31 changes: 18 additions & 13 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,29 @@ private[spark] trait ExecutorAllocationClient {
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
* we'd want to be allocated.
* @param localityAwareTasks The number of tasks in all active stages that have a locality
* preferences. This includes running, pending, and completed tasks.
* @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
* that would like to like to run on that host.
* This includes running, pending, and completed tasks.
*
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
* ResourceProfile id. The cluster manager shouldn't kill
* any running executor to reach this number, but, if all
* existing executors were to die, this is the number
* of executors we'd want to be allocated.
* @param numLocalityAwareTasksPerResourceProfileId The number of tasks in all active stages that
* have a locality preferences per
* ResourceProfile id. This includes running,
* pending, and completed tasks.
* @param hostToLocalTaskCount A map of ResourceProfile id to a map of hosts to the number of
* tasks from all active stages that would like to like to run on
* that host. This includes running, pending, and completed tasks.
* @return whether the request is acknowledged by the cluster manager.
*/
private[spark] def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
resourceProfileIdToNumExecutors: Map[Int, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCount: Map[Int, Map[String, Int]]): Boolean

/**
* Request an additional number of executors from the cluster manager.
* Request an additional number of executors from the cluster manager for the default
* ResourceProfile.
* @return whether the request is acknowledged by the cluster manager.
*/
def requestExecutors(numAdditionalExecutors: Int): Boolean
Expand Down
473 changes: 324 additions & 149 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Large diffs are not rendered by default.

150 changes: 42 additions & 108 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc

import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.immutable
import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
Expand Down Expand Up @@ -53,7 +54,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.resource.{ResourceID, ResourceInformation}
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -219,9 +220,10 @@ class SparkContext(config: SparkConf) extends Logging {
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
private var _resources: immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
private var _plugins: Option[PluginContainer] = None
private var _resourceProfileManager: ResourceProfileManager = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -343,6 +345,8 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
_executorAllocationManager

private[spark] def resourceProfileManager: ResourceProfileManager = _resourceProfileManager

private[spark] def cleaner: Option[ContextCleaner] = _cleaner

private[spark] var checkpointDir: Option[String] = None
Expand Down Expand Up @@ -451,6 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

_listenerBus = new LiveListenerBus(_conf)
_resourceProfileManager = new ResourceProfileManager(_conf)

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
Expand Down Expand Up @@ -611,7 +616,7 @@ class SparkContext(config: SparkConf) extends Logging {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner))
cleaner = cleaner, resourceProfileManager = resourceProfileManager))
case _ =>
None
}
Expand Down Expand Up @@ -1622,7 +1627,7 @@ class SparkContext(config: SparkConf) extends Logging {

/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* to help it make decisions. This applies to the default ResourceProfile.
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
Expand All @@ -1638,11 +1643,16 @@ class SparkContext(config: SparkConf) extends Logging {
def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
hostToLocalTaskCount: immutable.Map[String, Int]
): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
// this is being applied to the default resource profile, would need to add api to support
// others
val defaultProfId = resourceProfileManager.defaultResourceProfile.id
b.requestTotalExecutors(immutable.Map(defaultProfId-> numExecutors),
immutable.Map(localityAwareTasks -> defaultProfId),
immutable.Map(defaultProfId -> hostToLocalTaskCount))
case _ =>
logWarning("Requesting executors is not supported by current scheduler.")
false
Expand Down Expand Up @@ -2036,6 +2046,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
// `SparkContext` is stopped.
localProperties.remove()
ResourceProfile.clearDefaultProfile()
// Unset YARN mode system env variable, to allow switching between cluster types.
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
Expand Down Expand Up @@ -2771,109 +2782,34 @@ object SparkContext extends Logging {
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

// Ensure that executor's resources satisfies one or more tasks requirement.
def checkResourcesPerTask(clusterMode: Boolean, executorCores: Option[Int]): Unit = {
// Ensure that default executor's resources satisfies one or more tasks requirement.
// This function is for cluster managers that don't set the executor cores config, for
// others its checked in ResourceProfile.
def checkResourcesPerTask(executorCores: Int): Unit = {
val taskCores = sc.conf.get(CPUS_PER_TASK)
val execCores = if (clusterMode) {
executorCores.getOrElse(sc.conf.get(EXECUTOR_CORES))
} else {
executorCores.get
}
// some cluster managers don't set the EXECUTOR_CORES config by default (standalone
// and mesos coarse grained), so we can't rely on that config for those.
val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
(master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))

// Number of cores per executor must meet at least one task requirement.
if (shouldCheckExecCores && execCores < taskCores) {
throw new SparkException(s"The number of cores per executor (=$execCores) has to be >= " +
s"the task config: ${CPUS_PER_TASK.key} = $taskCores when run on $master.")
}

// Calculate the max slots each executor can provide based on resources available on each
// executor and resources required by each task.
val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX)
val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
.map(request => (request.id.resourceName, request.amount)).toMap

var (numSlots, limitingResourceName) = if (shouldCheckExecCores) {
(execCores / taskCores, "CPU")
} else {
(-1, "")
}

taskResourceRequirements.foreach { taskReq =>
// Make sure the executor resources were specified through config.
val execAmount = executorResourcesAndAmounts.getOrElse(taskReq.resourceName,
throw new SparkException("The executor resource config: " +
new ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
" needs to be specified since a task requirement config: " +
new ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
" was specified")
)
// Make sure the executor resources are large enough to launch at least one task.
if (execAmount < taskReq.amount) {
throw new SparkException("The executor resource config: " +
new ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf +
s" = $execAmount has to be >= the requested amount in task resource config: " +
new ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
s" = ${taskReq.amount}")
}
// Compare and update the max slots each executor can provide.
// If the configured amount per task was < 1.0, a task is subdividing
// executor resources. If the amount per task was > 1.0, the task wants
// multiple executor resources.
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
if (resourceNumSlots < numSlots) {
if (shouldCheckExecCores) {
throw new IllegalArgumentException("The number of slots on an executor has to be " +
"limited by the number of cores, otherwise you waste resources and " +
"dynamic allocation doesn't work properly. Your configuration has " +
s"core/task cpu slots = ${numSlots} and " +
s"${taskReq.resourceName} = ${resourceNumSlots}. " +
"Please adjust your configuration so that all resources require same number " +
"of executor slots.")
}
numSlots = resourceNumSlots
limitingResourceName = taskReq.resourceName
}
}
if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
// if we can't rely on the executor cores config throw a warning for user
logWarning("Please ensure that the number of slots available on your " +
"executors is limited by the number of cores to task cpus and not another " +
"custom resource. If cores is not the limiting resource then dynamic " +
"allocation will not work properly!")
}
// warn if we would waste any resources due to another resource limiting the number of
// slots on an executor
taskResourceRequirements.foreach { taskReq =>
val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
val taskReqStr = if (taskReq.numParts > 1) {
s"${taskReq.amount}/${taskReq.numParts}"
} else {
s"${taskReq.amount}"
}
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt
val message = s"The configuration of resource: ${taskReq.resourceName} " +
s"(exec = ${execAmount}, task = ${taskReqStr}, " +
s"runnable tasks = ${resourceNumSlots}) will " +
s"result in wasted resources due to resource ${limitingResourceName} limiting the " +
s"number of runnable tasks per executor to: ${numSlots}. Please adjust " +
s"your configuration."
if (Utils.isTesting) {
throw new SparkException(message)
} else {
logWarning(message)
}
}
validateTaskCpusLargeEnough(executorCores, taskCores)
val defaultProf = sc.resourceProfileManager.defaultResourceProfile
// TODO - this is temporary until all of stage level scheduling feature is integrated,
// fail if any other resource limiting due to dynamic allocation and scheduler using
// slots based on cores
val cpuSlots = executorCores/taskCores
val limitingResource = defaultProf.limitingResource(sc.conf)
if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS) &&
defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
throw new IllegalArgumentException("The number of slots on an executor has to be " +
"limited by the number of cores, otherwise you waste resources and " +
"dynamic allocation doesn't work properly. Your configuration has " +
s"core/task cpu slots = ${cpuSlots} and " +
s"${limitingResource} = " +
s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your configuration " +
"so that all resources require same number of executor slots.")
}
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
}

master match {
case "local" =>
checkResourcesPerTask(clusterMode = false, Some(1))
checkResourcesPerTask(1)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
Expand All @@ -2886,7 +2822,7 @@ object SparkContext extends Logging {
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
checkResourcesPerTask(clusterMode = false, Some(threadCount))
checkResourcesPerTask(threadCount)
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
Expand All @@ -2897,22 +2833,21 @@ object SparkContext extends Logging {
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
checkResourcesPerTask(clusterMode = false, Some(threadCount))
checkResourcesPerTask(threadCount)
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)

case SPARK_REGEX(sparkUrl) =>
checkResourcesPerTask(clusterMode = true, None)
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
checkResourcesPerTask(coresPerSlave.toInt)
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
Expand Down Expand Up @@ -2941,7 +2876,6 @@ object SparkContext extends Logging {
(backend, scheduler)

case masterUrl =>
checkResourcesPerTask(clusterMode = true, None)
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,13 @@ private[spark] object Tests {
val TEST_N_CORES_EXECUTOR = ConfigBuilder("spark.testing.nCoresPerExecutor")
.intConf
.createWithDefault(2)

val RESOURCES_WARNING_TESTING =
ConfigBuilder("spark.resources.warnings.testing").booleanConf.createWithDefault(false)

val RESOURCE_PROFILE_MANAGER_TESTING =
ConfigBuilder("spark.testing.resourceProfileManager")
.booleanConf
.createWithDefault(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
discoveryScript: String = "",
vendor: String = ""): this.type = {
// a bit weird but for Java api use empty string as meaning None because empty
// string is otherwise invalid for those paramters anyway
// string is otherwise invalid for those parameters anyway
val req = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
_executorResources.put(resourceName, req)
this
Expand Down

0 comments on commit 1adf99e

Please sign in to comment.