From 7ed887491698f0e55280d58b0e86182d185eb5e5 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 10 Nov 2023 13:29:15 +0800 Subject: [PATCH 1/5] [SPARK-45873][CORE][YARN][K8S] Make ExecutorFailureTracker more tolerant when app remains sufficient resources --- .../org/apache/spark/internal/config/package.scala | 11 +++++++++++ .../cluster/CoarseGrainedSchedulerBackend.scala | 9 +++++++++ .../scheduler/cluster/SchedulerBackendUtils.scala | 8 ++++++++ .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 3 ++- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 9 +++++++-- .../scheduler/cluster/YarnSchedulerBackend.scala | 9 --------- 6 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index bbadf91fc41c..c7422f5dc052 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2079,6 +2079,17 @@ package object config { .doubleConf .createOptional + private[spark] val SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO = + ConfigBuilder("spark.scheduler.minResourcesToSurviveRatio") + .doc("When encountering max number of executor failures, if the scheduler still has " + + "sufficient resources, which means live executors >= max number of executor * ratio. " + + "The application will not fail immediately. The smaller the ratio is, the more tolerant " + + "the application will be to executor failures.") + .version("4.0.0") + .doubleConf + .checkValue(x => x > 0, "must be positive") + .createWithDefault(0.5) + private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME = ConfigBuilder("spark.scheduler.maxRegisteredResourcesWaitingTime") .version("1.1.1") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e02dd2793706..821a2f7d91d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -65,6 +65,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // is equal to at least this value, that is double between 0 and 1. private val _minRegisteredRatio = math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0)) + protected val minSurviveRatio: Double = conf.get(SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO) + protected val maxExecutors: Int = SchedulerBackendUtils.getMaxTargetExecutorNumber(conf) // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeNs = TimeUnit.MILLISECONDS.toNanos( @@ -717,6 +719,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp def sufficientResourcesRegistered(): Boolean = true + // When the executor failure tracker collects enough failures, if the current resources are + // insufficient for keep the app running, it will fail the application directly; otherwise, + // it survives this check round. + def insufficientResourcesRetained(): Boolean = { + totalRegisteredExecutors.get() < maxExecutors * minSurviveRatio + } + override def isReady(): Boolean = { if (sufficientResourcesRegistered()) { logInfo("SchedulerBackend is ready for scheduling beginning after " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala index c166d030f2c8..2abcc1a3f5a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala @@ -44,4 +44,12 @@ private[spark] object SchedulerBackendUtils { conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors) } } + + def getMaxTargetExecutorNumber(conf: SparkConf): Int = { + if (Utils.isDynamicAllocationEnabled(conf)) { + conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + } else { + conf.get(EXECUTOR_INSTANCES).getOrElse(0) + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index a4403fb96b21..bc48fbfd6c66 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -142,7 +142,8 @@ class ExecutorPodsAllocator( } snapshotsStore.addSubscriber(podAllocationDelay) { executorPodsSnapshot => onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot) - if (failureTracker.numFailedExecutors > maxNumExecutorFailures) { + if (getNumExecutorsFailed > maxNumExecutorFailures && + schedulerBackend.insufficientResourcesRetained()) { logError(s"Max number of executor failures ($maxNumExecutorFailures) reached") stopApplication(EXCEED_MAX_EXECUTOR_FAILURES) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4f1ba3b9ed24..bdd71f063c98 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -50,7 +50,7 @@ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler.MiscellaneousProcessDetails -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util._ @@ -562,10 +562,15 @@ private[spark] class ApplicationMaster( private def allocationThreadImpl(): Unit = { // The number of failures in a row until the allocation thread gives up. val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) + val maxExecutors = SchedulerBackendUtils.getMaxTargetExecutorNumber(sparkConf) + val minSurviveRatio: Double = sparkConf.get(SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO) + def insufficientResources: Boolean = { + allocator.getNumExecutorsRunning < maxExecutors * minSurviveRatio + } var failureCount = 0 while (!finished) { try { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures && insufficientResources) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, s"Max number of executor failures ($maxNumExecutorFailures) reached") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1cfb3d955704..96b9eeae5a58 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -88,10 +88,6 @@ private[spark] abstract class YarnSchedulerBackend( private val minMergersStaticThreshold = conf.get(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD) - private val maxNumExecutors = conf.get(config.DYN_ALLOCATION_MAX_EXECUTORS) - - private val numExecutors = conf.get(config.EXECUTOR_INSTANCES).getOrElse(0) - /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -178,11 +174,6 @@ private[spark] abstract class YarnSchedulerBackend( resourceProfileId: Int): Seq[BlockManagerId] = { // TODO (SPARK-33481) This is a naive way of calculating numMergersDesired for a stage, // TODO we can use better heuristics to calculate numMergersDesired for a stage. - val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { - maxNumExecutors - } else { - numExecutors - } val tasksPerExecutor = sc.resourceProfileManager .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) val numMergersDesired = math.min( From 344cc188a52e03f569fe63898d423fde3d659aed Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 10 Nov 2023 16:01:31 +0800 Subject: [PATCH 2/5] import --- .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 96b9eeae5a58..68d77f889787 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -36,7 +36,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} +import org.apache.spark.util.{RpcUtils, ThreadUtils} /** * Abstract Yarn scheduler backend that contains common logic From da43e25ad5b92a3261de76fb520fd6c2360368bc Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 10 Nov 2023 21:08:55 +0800 Subject: [PATCH 3/5] test --- .../k8s/ExecutorPodsAllocatorSuite.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index eb9246a2a5f2..b32502011e49 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -117,6 +117,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val appId = "testapp" + private val numRegisteredExecutors = new AtomicInteger(2) + before { MockitoAnnotations.openMocks(this).close() when(kubernetesClient.pods()).thenReturn(podOperations) @@ -138,6 +140,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) + when(schedulerBackend.insufficientResourcesRetained()).thenCallRealMethod() + val totalRegisteredExecutors = + PrivateMethod[AtomicInteger](Symbol("totalRegisteredExecutors"))() + val maxExecutors = PrivateMethod[Int](Symbol("maxExecutors"))() + val minSurviveRatio = PrivateMethod[Double](Symbol("minSurviveRatio"))() + when(schedulerBackend.invokePrivate[AtomicInteger](totalRegisteredExecutors)) + .thenReturn(numRegisteredExecutors) + when(schedulerBackend.invokePrivate(maxExecutors)).thenReturn(6) + when(schedulerBackend.invokePrivate(minSurviveRatio)).thenReturn(0.5) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) @@ -187,6 +198,35 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(_exitCode === SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES) } + test("SPARK-45873: Make FailureTracker more tolerant when app remains sufficient resources") { + var _exitCode = 0 + val _conf = conf.clone + .set(MAX_EXECUTOR_FAILURES.key, "2") + .set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s") + podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) { + override private[spark] def stopApplication(exitCode: Int): Unit = { + _exitCode = exitCode + } + } + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + numRegisteredExecutors.incrementAndGet() + (1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i))) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(!schedulerBackend.insufficientResourcesRetained()) + assert(_exitCode === 0, + "although we hit max executor failure, but we still have sufficient executors") + + numRegisteredExecutors.decrementAndGet() + assert(schedulerBackend.insufficientResourcesRetained()) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(_exitCode === 11, + "we hit max executor failure and do not have enough executors") + } + test("SPARK-36052: test splitSlots") { val seq1 = Seq("a") assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0))) From 4a0468d81715ee209e8911d62e0b5f187afa2e76 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 13 Nov 2023 15:49:37 +0800 Subject: [PATCH 4/5] address comments --- .../CoarseGrainedSchedulerBackend.scala | 4 +++- .../cluster/SchedulerBackendUtils.scala | 18 ++++++++++++++++-- .../cluster/k8s/ExecutorPodsAllocator.scala | 7 ++++++- .../k8s/ExecutorPodsAllocatorSuite.scala | 3 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 7 +++++-- 5 files changed, 31 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 821a2f7d91d1..54d968be1e71 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -66,7 +66,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val _minRegisteredRatio = math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0)) protected val minSurviveRatio: Double = conf.get(SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO) - protected val maxExecutors: Int = SchedulerBackendUtils.getMaxTargetExecutorNumber(conf) + private[cluster] val maxExecutors: Int = SchedulerBackendUtils.getMaxTargetExecutorNumber(conf) // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeNs = TimeUnit.MILLISECONDS.toNanos( @@ -726,6 +726,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalRegisteredExecutors.get() < maxExecutors * minSurviveRatio } + def getNumExecutorsRunning: Int = totalRegisteredExecutors.get() + override def isReady(): Boolean = { if (sufficientResourcesRegistered()) { logInfo("SchedulerBackend is ready for scheduling beginning after " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala index 2abcc1a3f5a7..153aff7208a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala @@ -17,7 +17,8 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.SparkConf -import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES, MAX_EXECUTOR_FAILURES, SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO} +import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS import org.apache.spark.util.Utils private[spark] object SchedulerBackendUtils { @@ -46,10 +47,23 @@ private[spark] object SchedulerBackendUtils { } def getMaxTargetExecutorNumber(conf: SparkConf): Int = { - if (Utils.isDynamicAllocationEnabled(conf)) { + if (Utils.isStreamingDynamicAllocationEnabled(conf)) { + conf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS) + } else if (Utils.isDynamicAllocationEnabled(conf)) { conf.get(DYN_ALLOCATION_MAX_EXECUTORS) } else { conf.get(EXECUTOR_INSTANCES).getOrElse(0) } } + + def formatExecutorFailureError( + maxNumExecutorFailures: Int, + numOfExecutorRunning: Int, + maxExecutors: Int): String = { + s"Max number of executor failures ($maxNumExecutorFailures) reached and the current running " + + s"executors ratio $numOfExecutorRunning/$maxExecutors is insufficient. Consider " + + s"increasing ${MAX_EXECUTOR_FAILURES.key} or " + + s"${SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO.key} for app being more tolerant to " + + s"executor failures" + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index bc48fbfd6c66..e591f166e823 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -36,6 +36,7 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.cluster.SchedulerBackendUtils import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS import org.apache.spark.util.{Clock, Utils} import org.apache.spark.util.SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES @@ -144,7 +145,11 @@ class ExecutorPodsAllocator( onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot) if (getNumExecutorsFailed > maxNumExecutorFailures && schedulerBackend.insufficientResourcesRetained()) { - logError(s"Max number of executor failures ($maxNumExecutorFailures) reached") + val errorMsg = SchedulerBackendUtils.formatExecutorFailureError( + maxNumExecutorFailures, + schedulerBackend.getNumExecutorsRunning, + schedulerBackend.maxExecutors) + logError(errorMsg) stopApplication(EXCEED_MAX_EXECUTOR_FAILURES) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index b32502011e49..7a6f677e17b9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -143,11 +143,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(schedulerBackend.insufficientResourcesRetained()).thenCallRealMethod() val totalRegisteredExecutors = PrivateMethod[AtomicInteger](Symbol("totalRegisteredExecutors"))() - val maxExecutors = PrivateMethod[Int](Symbol("maxExecutors"))() val minSurviveRatio = PrivateMethod[Double](Symbol("minSurviveRatio"))() when(schedulerBackend.invokePrivate[AtomicInteger](totalRegisteredExecutors)) .thenReturn(numRegisteredExecutors) - when(schedulerBackend.invokePrivate(maxExecutors)).thenReturn(6) + when(schedulerBackend.maxExecutors).thenReturn(6) when(schedulerBackend.invokePrivate(minSurviveRatio)).thenReturn(0.5) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index bdd71f063c98..2603b668fc59 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -571,9 +571,12 @@ private[spark] class ApplicationMaster( while (!finished) { try { if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures && insufficientResources) { + val errorMsg = SchedulerBackendUtils.formatExecutorFailureError( + maxNumExecutorFailures, + allocator.getNumExecutorsRunning, + maxExecutors) finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, - s"Max number of executor failures ($maxNumExecutorFailures) reached") + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, errorMsg) } else if (allocator.isAllNodeExcluded) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, From 8b8691b53ff87a0c86a73ed44ea701460ff272da Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 16 Nov 2023 14:19:56 +0800 Subject: [PATCH 5/5] use min executor --- .../spark/deploy/ExecutorFailureTracker.scala | 1 + .../spark/internal/config/package.scala | 24 +++--- .../CoarseGrainedSchedulerBackend.scala | 9 --- .../cluster/SchedulerBackendUtils.scala | 30 +++----- .../cluster/k8s/ExecutorPodsAllocator.scala | 6 +- .../KubernetesClusterSchedulerBackend.scala | 2 +- .../k8s/ExecutorPodsAllocatorSuite.scala | 77 ++++++++++++++----- .../spark/deploy/yarn/ApplicationMaster.scala | 11 ++- .../cluster/YarnSchedulerBackend.scala | 11 ++- 9 files changed, 107 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala index 7c7b9c60b47e..aa3c43330832 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorFailureTracker.scala @@ -35,6 +35,7 @@ private[spark] class ExecutorFailureTracker( private val executorFailuresValidityInterval = sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) + private[spark] val keepaliveOnMinExecutors: Boolean = sparkConf.get(KEEPALIVE_ON_MIN_EXECUTORS) // Queue to store the timestamp of failed executors for each host private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e6dab9db22d5..1f5cf186e0c1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -938,6 +938,18 @@ package object config { .intConf .createOptional + private[spark] val KEEPALIVE_ON_MIN_EXECUTORS = + ConfigBuilder("spark.executor.failureTracker.keepaliveOnMinLiveExecutors.enabled") + .doc("When true, the executor failure tracker ignores `spark.executor.maxNumFailures` if " + + "the app still have minimum available executors registered. The app fails only if " + + "it exceeds `spark.executor.maxNumFailures` and the current live executors is less " + + "than the minimum which is determined by `spark.dynamicAllocation.minExecutors` when " + + "dynamic allocation is on, or by `spark.executor.instances` when dynamic allocation " + + "is off, multiplied by `spark.scheduler.minRegisteredResourcesRatio`.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.executor.failuresValidityInterval") .doc("Interval after which Executor failures will be considered independent and not " + @@ -2085,19 +2097,9 @@ package object config { ConfigBuilder("spark.scheduler.minRegisteredResourcesRatio") .version("1.1.1") .doubleConf + .checkValue(v => v > 0 && v <= 1, "The value must be in range (0, 1].") .createOptional - private[spark] val SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO = - ConfigBuilder("spark.scheduler.minResourcesToSurviveRatio") - .doc("When encountering max number of executor failures, if the scheduler still has " + - "sufficient resources, which means live executors >= max number of executor * ratio. " + - "The application will not fail immediately. The smaller the ratio is, the more tolerant " + - "the application will be to executor failures.") - .version("4.0.0") - .doubleConf - .checkValue(x => x > 0, "must be positive") - .createWithDefault(0.5) - private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME = ConfigBuilder("spark.scheduler.maxRegisteredResourcesWaitingTime") .version("1.1.1") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 54d968be1e71..e14f60906c08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -65,8 +65,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // is equal to at least this value, that is double between 0 and 1. private val _minRegisteredRatio = math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0)) - protected val minSurviveRatio: Double = conf.get(SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO) - private[cluster] val maxExecutors: Int = SchedulerBackendUtils.getMaxTargetExecutorNumber(conf) // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeNs = TimeUnit.MILLISECONDS.toNanos( @@ -719,13 +717,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp def sufficientResourcesRegistered(): Boolean = true - // When the executor failure tracker collects enough failures, if the current resources are - // insufficient for keep the app running, it will fail the application directly; otherwise, - // it survives this check round. - def insufficientResourcesRetained(): Boolean = { - totalRegisteredExecutors.get() < maxExecutors * minSurviveRatio - } - def getNumExecutorsRunning: Int = totalRegisteredExecutors.get() override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala index 153aff7208a7..88d134b452c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala @@ -17,8 +17,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.SparkConf -import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES, MAX_EXECUTOR_FAILURES, SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO} -import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS +import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES, KEEPALIVE_ON_MIN_EXECUTORS} import org.apache.spark.util.Utils private[spark] object SchedulerBackendUtils { @@ -46,24 +45,19 @@ private[spark] object SchedulerBackendUtils { } } - def getMaxTargetExecutorNumber(conf: SparkConf): Int = { - if (Utils.isStreamingDynamicAllocationEnabled(conf)) { - conf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS) - } else if (Utils.isDynamicAllocationEnabled(conf)) { - conf.get(DYN_ALLOCATION_MAX_EXECUTORS) - } else { - conf.get(EXECUTOR_INSTANCES).getOrElse(0) - } - } - def formatExecutorFailureError( maxNumExecutorFailures: Int, numOfExecutorRunning: Int, - maxExecutors: Int): String = { - s"Max number of executor failures ($maxNumExecutorFailures) reached and the current running " + - s"executors ratio $numOfExecutorRunning/$maxExecutors is insufficient. Consider " + - s"increasing ${MAX_EXECUTOR_FAILURES.key} or " + - s"${SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO.key} for app being more tolerant to " + - s"executor failures" + minExecutors: Int, + keepaliveOnMinExecutors: Boolean): String = { + s"Max number of executor failures ($maxNumExecutorFailures) reached, ${ + if (keepaliveOnMinExecutors) { + s"and the running executors $numOfExecutorRunning is less than minimum($minExecutors) " + + "required" + } else { + s"the running executors is $numOfExecutorRunning. Consider turning on" + + s" ${KEEPALIVE_ON_MIN_EXECUTORS.key} if running executors is sufficient" + } + }" } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index e591f166e823..b3c6a0f57967 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -144,11 +144,13 @@ class ExecutorPodsAllocator( snapshotsStore.addSubscriber(podAllocationDelay) { executorPodsSnapshot => onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot) if (getNumExecutorsFailed > maxNumExecutorFailures && - schedulerBackend.insufficientResourcesRetained()) { + (!failureTracker.keepaliveOnMinExecutors || + !schedulerBackend.sufficientResourcesRegistered())) { val errorMsg = SchedulerBackendUtils.formatExecutorFailureError( maxNumExecutorFailures, schedulerBackend.getNumExecutorsRunning, - schedulerBackend.maxExecutors) + schedulerBackend.initialExecutors, + failureTracker.keepaliveOnMinExecutors) logError(errorMsg) stopApplication(EXCEED_MAX_EXECUTOR_FAILURES) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 93b6ca8adc36..a3ec2a17ff27 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -61,7 +61,7 @@ private[spark] class KubernetesClusterSchedulerBackend( super.minRegisteredRatio } - private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + private[k8s] val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) private val shouldDeleteDriverService = conf.get(KUBERNETES_DRIVER_SERVICE_DELETE_ON_TERMINATION) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 7a6f677e17b9..b9905bc49d70 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -140,14 +140,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) - when(schedulerBackend.insufficientResourcesRetained()).thenCallRealMethod() + when(schedulerBackend.sufficientResourcesRegistered()).thenCallRealMethod() val totalRegisteredExecutors = PrivateMethod[AtomicInteger](Symbol("totalRegisteredExecutors"))() - val minSurviveRatio = PrivateMethod[Double](Symbol("minSurviveRatio"))() + val minRegisteredRatio = PrivateMethod[Double](Symbol("minRegisteredRatio"))() when(schedulerBackend.invokePrivate[AtomicInteger](totalRegisteredExecutors)) .thenReturn(numRegisteredExecutors) - when(schedulerBackend.maxExecutors).thenReturn(6) - when(schedulerBackend.invokePrivate(minSurviveRatio)).thenReturn(0.5) + when(schedulerBackend.initialExecutors).thenReturn(6) + when(schedulerBackend.invokePrivate[Double](minRegisteredRatio)).thenReturn(0.5) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) @@ -197,11 +197,40 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(_exitCode === SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES) } - test("SPARK-45873: Make FailureTracker more tolerant when app remains sufficient resources") { + test("SPARK-45873: Stop app directly when keepaliveOnMinExecutors off") { var _exitCode = 0 val _conf = conf.clone .set(MAX_EXECUTOR_FAILURES.key, "2") .set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s") + .set(KEEPALIVE_ON_MIN_EXECUTORS.key, "false") + podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr, + executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) { + override private[spark] def stopApplication(exitCode: Int): Unit = { + _exitCode = exitCode + } + } + val originalLiveExecs = numRegisteredExecutors.get + try { + numRegisteredExecutors.incrementAndGet() + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + (1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i))) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(schedulerBackend.sufficientResourcesRegistered()) + assert(_exitCode === 11) + } finally { + numRegisteredExecutors.set(originalLiveExecs) + } + } + + test("SPARK-45873: Stops app depends on current executors when keepaliveOnMinExecutors on") { + var _exitCode = 0 + val _conf = conf.clone + .set(MAX_EXECUTOR_FAILURES.key, "2") + .set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s") + .set(KEEPALIVE_ON_MIN_EXECUTORS.key, "true") + podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) { override private[spark] def stopApplication(exitCode: Int): Unit = { @@ -210,20 +239,32 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { } podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) - numRegisteredExecutors.incrementAndGet() - (1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i))) - snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) - assert(!schedulerBackend.insufficientResourcesRetained()) - assert(_exitCode === 0, - "although we hit max executor failure, but we still have sufficient executors") - numRegisteredExecutors.decrementAndGet() - assert(schedulerBackend.insufficientResourcesRetained()) - snapshotsStore.notifySubscribers() - assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) - assert(_exitCode === 11, - "we hit max executor failure and do not have enough executors") + val originalLiveExecs = numRegisteredExecutors.get + + try { + numRegisteredExecutors.incrementAndGet() + (1 to 3).foreach(i => snapshotsStore.updatePod(failedExecutorWithoutDeletion(i))) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(schedulerBackend.sufficientResourcesRegistered()) + assert(_exitCode === 0, + "although we hit max executor failure, but we still have sufficient executors") + + } finally { + numRegisteredExecutors.set(originalLiveExecs) + } + + try { + numRegisteredExecutors.decrementAndGet() + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3) + assert(!schedulerBackend.sufficientResourcesRegistered()) + assert(_exitCode === 11, + "we hit max executor failure and do not have enough executors") + } finally { + numRegisteredExecutors.set(originalLiveExecs) + } } test("SPARK-36052: test splitSlots") { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2603b668fc59..585553f31962 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -562,10 +562,12 @@ private[spark] class ApplicationMaster( private def allocationThreadImpl(): Unit = { // The number of failures in a row until the allocation thread gives up. val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) - val maxExecutors = SchedulerBackendUtils.getMaxTargetExecutorNumber(sparkConf) - val minSurviveRatio: Double = sparkConf.get(SCHEDULER_MIN_RESOURCES_TO_SURVIVE_RATIO) + val minExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf) + val minRegisteredRatio = sparkConf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.8) + val keepaliveOnMinExecutors = allocator.failureTracker.keepaliveOnMinExecutors def insufficientResources: Boolean = { - allocator.getNumExecutorsRunning < maxExecutors * minSurviveRatio + !keepaliveOnMinExecutors || + allocator.getNumExecutorsRunning < minExecutors * minRegisteredRatio } var failureCount = 0 while (!finished) { @@ -574,7 +576,8 @@ private[spark] class ApplicationMaster( val errorMsg = SchedulerBackendUtils.formatExecutorFailureError( maxNumExecutorFailures, allocator.getNumExecutorsRunning, - maxExecutors) + minExecutors, + keepaliveOnMinExecutors) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, errorMsg) } else if (allocator.isAllNodeExcluded) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 68d77f889787..1cfb3d955704 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -36,7 +36,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** * Abstract Yarn scheduler backend that contains common logic @@ -88,6 +88,10 @@ private[spark] abstract class YarnSchedulerBackend( private val minMergersStaticThreshold = conf.get(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD) + private val maxNumExecutors = conf.get(config.DYN_ALLOCATION_MAX_EXECUTORS) + + private val numExecutors = conf.get(config.EXECUTOR_INSTANCES).getOrElse(0) + /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -174,6 +178,11 @@ private[spark] abstract class YarnSchedulerBackend( resourceProfileId: Int): Seq[BlockManagerId] = { // TODO (SPARK-33481) This is a naive way of calculating numMergersDesired for a stage, // TODO we can use better heuristics to calculate numMergersDesired for a stage. + val maxExecutors = if (Utils.isDynamicAllocationEnabled(sc.getConf)) { + maxNumExecutors + } else { + numExecutors + } val tasksPerExecutor = sc.resourceProfileManager .resourceProfileFromId(resourceProfileId).maxTasksPerExecutor(sc.conf) val numMergersDesired = math.min(