From 42e9469afe72d47dcd610a88027862ea1e816c34 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 15 Oct 2015 00:40:03 +0000 Subject: [PATCH 1/2] allow 2*maxExecutors failures when dyn-alloc'ing --- .../scala/org/apache/spark/SparkConf.scala | 4 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 19 +++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 1a0ac3d01759c..58d3b846fd80d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -595,7 +595,9 @@ private[spark] object SparkConf extends Logging { "spark.rpc.lookupTimeout" -> Seq( AlternateConfig("spark.akka.lookupTimeout", "1.4")), "spark.streaming.fileStream.minRememberDuration" -> Seq( - AlternateConfig("spark.streaming.minRememberDuration", "1.5")) + AlternateConfig("spark.streaming.minRememberDuration", "1.5")), + "spark.yarn.max.executor.failures" -> Seq( + AlternateConfig("spark.yarn.max.worker.failures", "1.5")) ) /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3791eea5bf178..93f6fbd27e1a6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -62,10 +62,21 @@ private[spark] class ApplicationMaster( .asInstanceOf[YarnConfiguration] private val isClusterMode = args.userClass != null - // Default to numExecutors * 2, with minimum of 3 - private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", - sparkConf.getInt("spark.yarn.max.worker.failures", - math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3))) + // Default to twice the number of executors (twice the maximum number of executors if dynamic + // allocation is enabled), with a minimum of 3. + + private val maxNumExecutorFailures = { + val defaultKey = + if (Utils.isDynamicAllocationEnabled(sparkConf)) { + "spark.dynamicAllocation.maxExecutors" + } else { + "spark.executor.instances" + } + val effectiveNumExecutors = sparkConf.getInt(defaultKey, 0) + val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors) + + sparkConf.getInt("spark.yarn.max.executor.failures", defaultMaxNumExecutorFailures) + } @volatile private var exitCode = 0 @volatile private var unregistered = false From aa5d3491f15828c2b9ae93a16ab7a4503c8a4341 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 15 Oct 2015 02:05:16 +0000 Subject: [PATCH 2/2] add host info to container-failure log msgs --- .../spark/deploy/yarn/YarnAllocator.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 9e1ef1b3b4229..1deaa3743ddfa 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -430,17 +430,20 @@ private[yarn] class YarnAllocator( for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId val alreadyReleased = releasedContainers.remove(containerId) + val hostOpt = allocatedContainerToHostMap.get(containerId) + val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("") val exitReason = if (!alreadyReleased) { // Decrement the number of executors running. The next iteration of // the ApplicationMaster's reporting thread will take care of allocating. numExecutorsRunning -= 1 - logInfo("Completed container %s (state: %s, exit status: %s)".format( + logInfo("Completed container %s%s (state: %s, exit status: %s)".format( containerId, + onHostStr, completedContainer.getState, completedContainer.getExitStatus)) // Hadoop 2.2.X added a ContainerExitStatus we should switch to use // there are some exit status' we shouldn't necessarily count against us, but for - // now I think its ok as none of the containers are expected to exit + // now I think its ok as none of the containers are expected to exit. val exitStatus = completedContainer.getExitStatus val (isNormalExit, containerExitReason) = exitStatus match { case ContainerExitStatus.SUCCESS => @@ -449,7 +452,7 @@ private[yarn] class YarnAllocator( // Preemption should count as a normal exit, since YARN preempts containers merely // to do resource sharing, and tasks that fail due to preempted executors could // just as easily finish on any other executor. See SPARK-8167. - (true, s"Container $containerId was preempted.") + (true, s"Container ${containerId}${onHostStr} was preempted.") // Should probably still count memory exceeded exit codes towards task failures case VMEM_EXCEEDED_EXIT_CODE => (false, memLimitExceededLogMessage( @@ -461,7 +464,7 @@ private[yarn] class YarnAllocator( PMEM_EXCEEDED_PATTERN)) case unknown => numExecutorsFailed += 1 - (false, "Container marked as failed: " + containerId + + (false, "Container marked as failed: " + containerId + onHostStr + ". Exit status: " + completedContainer.getExitStatus + ". Diagnostics: " + completedContainer.getDiagnostics) @@ -479,10 +482,10 @@ private[yarn] class YarnAllocator( s"Container $containerId exited from explicit termination request.") } - if (allocatedContainerToHostMap.contains(containerId)) { - val host = allocatedContainerToHostMap.get(containerId).get - val containerSet = allocatedHostToContainersMap.get(host).get - + for { + host <- hostOpt + containerSet <- allocatedHostToContainersMap.get(host) + } { containerSet.remove(containerId) if (containerSet.isEmpty) { allocatedHostToContainersMap.remove(host)