Skip to content

Commit

Permalink
Make Cook scheduler respect minRegisteredResourcesRatio (apache#22)
Browse files Browse the repository at this point in the history
* Make Cook scheduler respect minRegisteredResourcesRatio

* More detail comments

* address comments

* address comments

* address comments

(cherry picked from commit 2b5b8cd)
(cherry picked from commit d39b204)
  • Loading branch information
WenboZhao authored and Curtis Howard committed Feb 15, 2018
1 parent 52518f3 commit 49a3460
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
Expand Up @@ -151,9 +151,17 @@ class CoarseCookSchedulerBackend(

/**
* Note that the mapping between Cook job and executor is 1-1 and onto, thus
* the number of acquired executors equals to the number non-completed jobs.
* the number of requested executors equals to the number non-completed jobs.
*/
private def totalExecutorsAcquired: Int = nonCompletedJobUUIDs.size
private def totalExecutorsRequested: Int = nonCompletedJobUUIDs.size

/**
* We can't use `totalExecutorsRequested` as the number of registered executors because
* the latency between requesting resources from Cook and launching jobs after that.
* However, `getExecutorIds()` only returns the list of registered executor ids and
* thus could be used for querying the number of registered executors.
*/
private def totalExecutorsRegistered: Int = getExecutorIds().length

/**
* The set of UUIDs for the jobs that are aborted intentionally, e.g.
Expand Down Expand Up @@ -198,8 +206,10 @@ class CoarseCookSchedulerBackend(
private[this] val mesosSchedulerBackend =
new MesosCoarseGrainedSchedulerBackend(scheduler, sc, "", sc.env.securityManager)

// This is only used by TaskScheduler for checking if the scheduler backend is ready
// before sending the first batch of tasks.
override def sufficientResourcesRegistered(): Boolean =
totalExecutorsAcquired >= executorLimit * schedulerContext.minRegisteredResourceRatio
totalExecutorsRegistered >= executorLimit * schedulerContext.minRegisteredResourceRatio

override def applicationId(): String =
schedulerContext.cookApplicationIdOption.getOrElse(super.applicationId())
Expand Down Expand Up @@ -336,8 +346,10 @@ class CoarseCookSchedulerBackend(
val cur = System.currentTimeMillis
if (!ret && cur - lastIsReadyLog > 5000) {
logInfo(
s"Backend is not yet ready. Total acquired executors [$totalExecutorsAcquired] " +
s"vs executor limit [$executorLimit]")
s"Scheduler backend is not yet ready: " +
s"number of requested executors [$totalExecutorsRequested] vs " +
s"number of registered executors [$totalExecutorsRegistered] vs " +
s"executor limit [$executorLimit].")
lastIsReadyLog = cur
}
ret
Expand Down Expand Up @@ -438,15 +450,15 @@ class CoarseCookSchedulerBackend(
}

private def shouldRequestExecutors(): Boolean =
totalExecutorsAcquired < executorLimit
totalExecutorsRequested < executorLimit

private def executorStatusMessage(): String =
s"Currently, the total acquired executors is [$totalExecutorsAcquired] " +
s"Currently, the total requested executors is [$totalExecutorsRequested] " +
s"and the executor limit is [$executorLimit]."

private def requestExecutorsIfNecessary(): Unit =
if (shouldRequestExecutors()) {
val requestedExecutors = executorLimit - totalExecutorsAcquired
val requestedExecutors = executorLimit - totalExecutorsRequested

if (requestedExecutors > 0) {
val executorIdAndJob = (1 to requestedExecutors).map { _ =>
Expand Down
Expand Up @@ -63,8 +63,8 @@ case class CookSchedulerContext(
// ==========================================================================

val minRegisteredResourceRatio: Double = math.min(
1,
conf.getDouble(SPARK_SCHEDULER_MIN_REGISTERED_RESOURCE_RATIO, 0))
1d,
conf.getDouble(SPARK_SCHEDULER_MIN_REGISTERED_RESOURCE_RATIO, 0d))

val isDynamicAllocationEnabled: Boolean =
conf.getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, defaultValue = false)
Expand Down

0 comments on commit 49a3460

Please sign in to comment.