Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[spark] class ExecutorFailureTracker(

private val executorFailuresValidityInterval =
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
private[spark] val keepaliveOnMinExecutors: Boolean = sparkConf.get(KEEPALIVE_ON_MIN_EXECUTORS)

// Queue to store the timestamp of failed executors for each host
private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]()
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,18 @@ package object config {
.intConf
.createOptional

private[spark] val KEEPALIVE_ON_MIN_EXECUTORS =
ConfigBuilder("spark.executor.failureTracker.keepaliveOnMinLiveExecutors.enabled")
.doc("When true, the executor failure tracker ignores `spark.executor.maxNumFailures` if " +
"the app still have minimum available executors registered. The app fails only if " +
"it exceeds `spark.executor.maxNumFailures` and the current live executors is less " +
"than the minimum which is determined by `spark.dynamicAllocation.minExecutors` when " +
"dynamic allocation is on, or by `spark.executor.instances` when dynamic allocation " +
"is off, multiplied by `spark.scheduler.minRegisteredResourcesRatio`.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +
Expand Down Expand Up @@ -2107,6 +2119,7 @@ package object config {
ConfigBuilder("spark.scheduler.minRegisteredResourcesRatio")
.version("1.1.1")
.doubleConf
.checkValue(v => v > 0 && v <= 1, "The value must be in range (0, 1].")
.createOptional

private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

def sufficientResourcesRegistered(): Boolean = true

def getNumExecutorsRunning: Int = totalRegisteredExecutors.get()

override def isReady(): Boolean = {
if (sufficientResourcesRegistered()) {
logInfo("SchedulerBackend is ready for scheduling beginning after " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES, KEEPALIVE_ON_MIN_EXECUTORS}
import org.apache.spark.util.Utils

private[spark] object SchedulerBackendUtils {
Expand All @@ -44,4 +44,20 @@ private[spark] object SchedulerBackendUtils {
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
}
}

def formatExecutorFailureError(
maxNumExecutorFailures: Int,
numOfExecutorRunning: Int,
minExecutors: Int,
keepaliveOnMinExecutors: Boolean): String = {
s"Max number of executor failures ($maxNumExecutorFailures) reached, ${
if (keepaliveOnMinExecutors) {
s"and the running executors $numOfExecutorRunning is less than minimum($minExecutors) " +
"required"
} else {
s"the running executors is $numOfExecutorRunning. Consider turning on" +
s" ${KEEPALIVE_ON_MIN_EXECUTORS.key} if running executors is sufficient"
}
}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS
import org.apache.spark.util.{Clock, Utils}
import org.apache.spark.util.SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES
Expand Down Expand Up @@ -142,8 +143,15 @@ class ExecutorPodsAllocator(
}
snapshotsStore.addSubscriber(podAllocationDelay) { executorPodsSnapshot =>
onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot)
if (failureTracker.numFailedExecutors > maxNumExecutorFailures) {
logError(s"Max number of executor failures ($maxNumExecutorFailures) reached")
if (getNumExecutorsFailed > maxNumExecutorFailures &&
(!failureTracker.keepaliveOnMinExecutors ||
!schedulerBackend.sufficientResourcesRegistered())) {
val errorMsg = SchedulerBackendUtils.formatExecutorFailureError(
maxNumExecutorFailures,
schedulerBackend.getNumExecutorsRunning,
schedulerBackend.initialExecutors,
failureTracker.keepaliveOnMinExecutors)
logError(errorMsg)
stopApplication(EXCEED_MAX_EXECUTOR_FAILURES)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.minRegisteredRatio
}

private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
private[k8s] val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)

private val shouldDeleteDriverService = conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {

val appId = "testapp"

private val numRegisteredExecutors = new AtomicInteger(2)

before {
MockitoAnnotations.openMocks(this).close()
when(kubernetesClient.pods()).thenReturn(podOperations)
Expand All @@ -138,6 +140,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
when(schedulerBackend.sufficientResourcesRegistered()).thenCallRealMethod()
val totalRegisteredExecutors =
PrivateMethod[AtomicInteger](Symbol("totalRegisteredExecutors"))()
val minRegisteredRatio = PrivateMethod[Double](Symbol("minRegisteredRatio"))()
when(schedulerBackend.invokePrivate[AtomicInteger](totalRegisteredExecutors))
.thenReturn(numRegisteredExecutors)
when(schedulerBackend.initialExecutors).thenReturn(6)
when(schedulerBackend.invokePrivate[Double](minRegisteredRatio)).thenReturn(0.5)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
Expand Down Expand Up @@ -187,6 +197,76 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(_exitCode === SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES)
}

test("SPARK-45873: Stop app directly when keepaliveOnMinExecutors off") {
var _exitCode = 0
val _conf = conf.clone
.set(MAX_EXECUTOR_FAILURES.key, "2")
.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s")
.set(KEEPALIVE_ON_MIN_EXECUTORS.key, "false")
podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) {
override private[spark] def stopApplication(exitCode: Int): Unit = {
_exitCode = exitCode
}
}
val originalLiveExecs = numRegisteredExecutors.get
try {
numRegisteredExecutors.incrementAndGet()
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3))
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
(1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i)))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3)
assert(schedulerBackend.sufficientResourcesRegistered())
assert(_exitCode === 11)
} finally {
numRegisteredExecutors.set(originalLiveExecs)
}
}

test("SPARK-45873: Stops app depends on current executors when keepaliveOnMinExecutors on") {
var _exitCode = 0
val _conf = conf.clone
.set(MAX_EXECUTOR_FAILURES.key, "2")
.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s")
.set(KEEPALIVE_ON_MIN_EXECUTORS.key, "true")

podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) {
override private[spark] def stopApplication(exitCode: Int): Unit = {
_exitCode = exitCode
}
}
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3))
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

val originalLiveExecs = numRegisteredExecutors.get

try {
numRegisteredExecutors.incrementAndGet()
(1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i)))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3)
assert(schedulerBackend.sufficientResourcesRegistered())
assert(_exitCode === 0,
"although we hit max executor failure, but we still have sufficient executors")

} finally {
numRegisteredExecutors.set(originalLiveExecs)
}

try {
numRegisteredExecutors.decrementAndGet()
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3)
assert(!schedulerBackend.sufficientResourcesRegistered())
assert(_exitCode === 11,
"we hit max executor failure and do not have enough executors")
} finally {
numRegisteredExecutors.set(originalLiveExecs)
}
}

test("SPARK-36052: test splitSlots") {
val seq1 = Seq("a")
assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
import org.apache.spark.scheduler.MiscellaneousProcessDetails
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util._

Expand Down Expand Up @@ -562,13 +562,24 @@ private[spark] class ApplicationMaster(
private def allocationThreadImpl(): Unit = {
// The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
val minExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
val minRegisteredRatio = sparkConf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.8)
val keepaliveOnMinExecutors = allocator.failureTracker.keepaliveOnMinExecutors
def insufficientResources: Boolean = {
!keepaliveOnMinExecutors ||
allocator.getNumExecutorsRunning < minExecutors * minRegisteredRatio
}
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures && insufficientResources) {
val errorMsg = SchedulerBackendUtils.formatExecutorFailureError(
maxNumExecutorFailures,
allocator.getNumExecutorsRunning,
minExecutors,
keepaliveOnMinExecutors)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, errorMsg)
} else if (allocator.isAllNodeExcluded) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
Expand Down