diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala index 7c7b9c60b47e..aa3c43330832 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala @@ -35,6 +35,7 @@ private[spark] class ExecutorFailureTracker( private val executorFailuresValidityInterval = sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) + private[spark] val keepaliveOnMinExecutors: Boolean = sparkConf.get(KEEPALIVE_ON_MIN_EXECUTORS) // Queue to store the timestamp of failed executors for each host private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index bce7ee70c7b2..05d5bb6c890e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -938,6 +938,18 @@ package object config { .intConf .createOptional + private[spark] val KEEPALIVE_ON_MIN_EXECUTORS = + ConfigBuilder("spark.executor.failureTracker.keepaliveOnMinLiveExecutors.enabled") + .doc("When true, the executor failure tracker ignores `spark.executor.maxNumFailures` if " + + "the app still have minimum available executors registered. The app fails only if " + + "it exceeds `spark.executor.maxNumFailures` and the current live executors is less " + + "than the minimum which is determined by `spark.dynamicAllocation.minExecutors` when " + + "dynamic allocation is on, or by `spark.executor.instances` when dynamic allocation " + + "is off, multiplied by `spark.scheduler.minRegisteredResourcesRatio`.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.executor.failuresValidityInterval") .doc("Interval after which Executor failures will be considered independent and not " + @@ -2107,6 +2119,7 @@ package object config { ConfigBuilder("spark.scheduler.minRegisteredResourcesRatio") .version("1.1.1") .doubleConf + .checkValue(v => v > 0 && v <= 1, "The value must be in range (0, 1].") .createOptional private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME = diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 831fbd45edd7..e6831a586d9f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -720,6 +720,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp def sufficientResourcesRegistered(): Boolean = true + def getNumExecutorsRunning: Int = totalRegisteredExecutors.get() + override def isReady(): Boolean = { if (sufficientResourcesRegistered()) { logInfo("SchedulerBackend is ready for scheduling beginning after " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala index c166d030f2c8..88d134b452c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.SparkConf -import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES, KEEPALIVE_ON_MIN_EXECUTORS} import org.apache.spark.util.Utils private[spark] object SchedulerBackendUtils { @@ -44,4 +44,20 @@ private[spark] object SchedulerBackendUtils { conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors) } } + + def formatExecutorFailureError( + maxNumExecutorFailures: Int, + numOfExecutorRunning: Int, + minExecutors: Int, + keepaliveOnMinExecutors: Boolean): String = { + s"Max number of executor failures ($maxNumExecutorFailures) reached, ${ + if (keepaliveOnMinExecutors) { + s"and the running executors $numOfExecutorRunning is less than minimum($minExecutors) " + + "required" + } else { + s"the running executors is $numOfExecutorRunning. Consider turning on" + + s" ${KEEPALIVE_ON_MIN_EXECUTORS.key} if running executors is sufficient" + } + }" + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 236dfff9ac11..202c36dc63a4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.cluster.SchedulerBackendUtils import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS import org.apache.spark.util.{Clock, Utils} import org.apache.spark.util.SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES @@ -142,8 +143,15 @@ class ExecutorPodsAllocator( } snapshotsStore.addSubscriber(podAllocationDelay) { executorPodsSnapshot => onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot) - if (failureTracker.numFailedExecutors > maxNumExecutorFailures) { - logError(s"Max number of executor failures ($maxNumExecutorFailures) reached") + if (getNumExecutorsFailed > maxNumExecutorFailures && + (!failureTracker.keepaliveOnMinExecutors || + !schedulerBackend.sufficientResourcesRegistered())) { + val errorMsg = SchedulerBackendUtils.formatExecutorFailureError( + maxNumExecutorFailures, + schedulerBackend.getNumExecutorsRunning, + schedulerBackend.initialExecutors, + failureTracker.keepaliveOnMinExecutors) + logError(errorMsg) stopApplication(EXCEED_MAX_EXECUTOR_FAILURES) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index daf8d5e3f58a..aaa1b26c34a7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -62,7 +62,7 @@ private[spark] class KubernetesClusterSchedulerBackend( super.minRegisteredRatio } - private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + private[k8s] val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) private val shouldDeleteDriverService = conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index eb9246a2a5f2..b9905bc49d70 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -117,6 +117,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val appId = "testapp" + private val numRegisteredExecutors = new AtomicInteger(2) + before { MockitoAnnotations.openMocks(this).close() when(kubernetesClient.pods()).thenReturn(podOperations) @@ -138,6 +140,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) + when(schedulerBackend.sufficientResourcesRegistered()).thenCallRealMethod() + val totalRegisteredExecutors = + PrivateMethod[AtomicInteger](Symbol("totalRegisteredExecutors"))() + val minRegisteredRatio = PrivateMethod[Double](Symbol("minRegisteredRatio"))() + when(schedulerBackend.invokePrivate[AtomicInteger](totalRegisteredExecutors)) + .thenReturn(numRegisteredExecutors) + when(schedulerBackend.initialExecutors).thenReturn(6) + when(schedulerBackend.invokePrivate[Double](minRegisteredRatio)).thenReturn(0.5) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) @@ -187,6 +197,76 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(_exitCode === SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES) } + test("SPARK-45873: Stop app directly when keepaliveOnMinExecutors off") { + var _exitCode = 0 + val _conf = conf.clone + .set(MAX_EXECUTOR_FAILURES.key, "2") + .set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s") + .set(KEEPALIVE_ON_MIN_EXECUTORS.key, "false") + podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) { + override private[spark] def stopApplication(exitCode: Int): Unit = { + _exitCode = exitCode + } + } + val originalLiveExecs = numRegisteredExecutors.get + try { + numRegisteredExecutors.incrementAndGet() + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + (1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i))) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(schedulerBackend.sufficientResourcesRegistered()) + assert(_exitCode === 11) + } finally { + numRegisteredExecutors.set(originalLiveExecs) + } + } + + test("SPARK-45873: Stops app depends on current executors when keepaliveOnMinExecutors on") { + var _exitCode = 0 + val _conf = conf.clone + .set(MAX_EXECUTOR_FAILURES.key, "2") + .set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s") + .set(KEEPALIVE_ON_MIN_EXECUTORS.key, "true") + + podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) { + override private[spark] def stopApplication(exitCode: Int): Unit = { + _exitCode = exitCode + } + } + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + val originalLiveExecs = numRegisteredExecutors.get + + try { + numRegisteredExecutors.incrementAndGet() + (1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i))) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(schedulerBackend.sufficientResourcesRegistered()) + assert(_exitCode === 0, + "although we hit max executor failure, but we still have sufficient executors") + + } finally { + numRegisteredExecutors.set(originalLiveExecs) + } + + try { + numRegisteredExecutors.decrementAndGet() + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(!schedulerBackend.sufficientResourcesRegistered()) + assert(_exitCode === 11, + "we hit max executor failure and do not have enough executors") + } finally { + numRegisteredExecutors.set(originalLiveExecs) + } + } + test("SPARK-36052: test splitSlots") { val seq1 = Seq("a") assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0))) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4f1ba3b9ed24..585553f31962 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -50,7 +50,7 @@ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler.MiscellaneousProcessDetails -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util._ @@ -562,13 +562,24 @@ private[spark] class ApplicationMaster( private def allocationThreadImpl(): Unit = { // The number of failures in a row until the allocation thread gives up. val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) + val minExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf) + val minRegisteredRatio = sparkConf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.8) + val keepaliveOnMinExecutors = allocator.failureTracker.keepaliveOnMinExecutors + def insufficientResources: Boolean = { + !keepaliveOnMinExecutors || + allocator.getNumExecutorsRunning < minExecutors * minRegisteredRatio + } var failureCount = 0 while (!finished) { try { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures && insufficientResources) { + val errorMsg = SchedulerBackendUtils.formatExecutorFailureError( + maxNumExecutorFailures, + allocator.getNumExecutorsRunning, + minExecutors, + keepaliveOnMinExecutors) finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, - s"Max number of executor failures ($maxNumExecutorFailures) reached") + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, errorMsg) } else if (allocator.isAllNodeExcluded) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,