From d4408c931697dc03d7a357b28d8e2c8b5ad379d8 Mon Sep 17 00:00:00 2001 From: ehnalis Date: Tue, 12 May 2015 15:16:21 +0200 Subject: [PATCH 1/6] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats. Added faster RM-heartbeats on pending container allocations with multiplicative back-off. Also updated related documentations. --- docs/running-on-yarn.md | 17 +++++------ .../spark/deploy/yarn/ApplicationMaster.scala | 29 ++++++++++++++----- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 51c1339165024..ba0e5c34c4156 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -74,6 +74,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes 5000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. + To avoid the application master to be expired by late reporting, if a higher value is provided, the interval will be set to the half of the expiry interval in YARN's configuration (yarn.am.liveness-monitor.expiry-interval-ms / 2). + + + + spark.yarn.scheduler.allocation.interval-ms + 200 + + The interval in ms in which the Spark application master eagerly heartbeats to the YARN ResourceManager on pending container allocations. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will double on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms reached. @@ -220,15 +228,6 @@ Most of the configs are the same for Spark on YARN as for other deployment modes Otherwise, the client process will exit after submission. - - spark.yarn.executor.nodeLabelExpression - (none) - - A YARN node label expression that restricts the set of nodes executors will be scheduled on. - Only versions of YARN greater than or equal to 2.6 support node label expressions, so when - running against earlier versions, this property will be ignored. - - # Launching Spark on YARN diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 29752969e6152..3abbb6099de49 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -300,11 +300,14 @@ private[spark] class ApplicationMaster( val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. - val schedulerInterval = - sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s") + val heartbeatInterval = math.max(0, math.min(expiryInterval / 2, + sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s"))) - // must be <= expiryInterval / 2. - val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) + // we want to check more frequently for pending containers + val eagerAllocationInterval = math.min(heartbeatInterval, + sparkConf.getTimeAsMs("spark.yarn.scheduler.allocation.interval-ms", "200")) + + var currentAllocationInterval = eagerAllocationInterval // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) @@ -331,14 +334,25 @@ private[spark] class ApplicationMaster( finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + s"${failureCount} time(s) from Reporter thread.") - } else { logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) } } } try { - Thread.sleep(interval) + val numPendingAllocate = allocator.getNumPendingAllocate + if (numPendingAllocate > 0) { + currentAllocationInterval = + math.min(heartbeatInterval,currentAllocationInterval * 2) + logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + + "Sleeping for " + currentAllocationInterval) + Thread.sleep(currentAllocationInterval) + } else { + logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + + "Sleeping for " + heartbeatInterval) + currentAllocationInterval = eagerAllocationInterval + Thread.sleep(heartbeatInterval) + } } catch { case e: InterruptedException => } @@ -349,7 +363,8 @@ private[spark] class ApplicationMaster( t.setDaemon(true) t.setName("Reporter") t.start() - logInfo("Started progress reporter thread - sleep time : " + interval) + logInfo("Started progress reporter thread with (heartbeat : " + heartbeatInterval + + ", eager allocation : " + eagerAllocationInterval + ") intervals") t } From 073d28319734a0465331898a6231fcc9faf19955 Mon Sep 17 00:00:00 2001 From: ehnalis Date: Tue, 12 May 2015 15:21:14 +0200 Subject: [PATCH 2/6] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats. Added faster RM-heartbeats on pending container allocations with multiplicative back-off. Also updated related documentations. --- docs/running-on-yarn.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index ba0e5c34c4156..d1d77c7659a31 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -228,6 +228,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes Otherwise, the client process will exit after submission. + + spark.yarn.executor.nodeLabelExpression + (none) + + A YARN node label expression that restricts the set of nodes executors will be scheduled on. + Only versions of YARN greater than or equal to 2.6 support node label expressions, so when + running against earlier versions, this property will be ignored. + + # Launching Spark on YARN From 08bac63d70fd2888d3153ef52f49ec63c6dae98a Mon Sep 17 00:00:00 2001 From: ehnalis Date: Wed, 13 May 2015 09:21:07 +0200 Subject: [PATCH 3/6] Refined style, grammar, removed duplicated code. --- docs/running-on-yarn.md | 13 ++++--- .../spark/deploy/yarn/ApplicationMaster.scala | 34 +++++++++---------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index d1d77c7659a31..65a5c325bc322 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -74,14 +74,19 @@ Most of the configs are the same for Spark on YARN as for other deployment modes 5000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. - To avoid the application master to be expired by late reporting, if a higher value is provided, the interval will be set to the half of the expiry interval in YARN's configuration (yarn.am.liveness-monitor.expiry-interval-ms / 2). + The value is capped at half the value of YARN's configuration for the expiry interval + (yarn.am.liveness-monitor.expiry-interval-ms). - spark.yarn.scheduler.allocation.interval-ms - 200 + spark.yarn.scheduler.initial-allocation.interval + 200ms - The interval in ms in which the Spark application master eagerly heartbeats to the YARN ResourceManager on pending container allocations. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will double on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms reached. + The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager + when there are pending container allocation requests. It should be no larger than + spark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will double on + successive eager heartbeats if pending containers still exist, until + spark.yarn.scheduler.heartbeat.interval-ms is reached. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3abbb6099de49..9934e3f9e115b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -304,10 +304,10 @@ private[spark] class ApplicationMaster( sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s"))) // we want to check more frequently for pending containers - val eagerAllocationInterval = math.min(heartbeatInterval, - sparkConf.getTimeAsMs("spark.yarn.scheduler.allocation.interval-ms", "200")) + val initialAllocationInterval = math.min(heartbeatInterval, + sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms")) - var currentAllocationInterval = eagerAllocationInterval + var currentAllocationInterval = initialAllocationInterval // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) @@ -341,18 +341,18 @@ private[spark] class ApplicationMaster( } try { val numPendingAllocate = allocator.getNumPendingAllocate - if (numPendingAllocate > 0) { - currentAllocationInterval = - math.min(heartbeatInterval,currentAllocationInterval * 2) - logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + - "Sleeping for " + currentAllocationInterval) - Thread.sleep(currentAllocationInterval) - } else { - logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + - "Sleeping for " + heartbeatInterval) - currentAllocationInterval = eagerAllocationInterval - Thread.sleep(heartbeatInterval) - } + val sleepInterval = + if (numPendingAllocate > 0) { + currentAllocationInterval = + math.min(heartbeatInterval, currentAllocationInterval * 2) + currentAllocationInterval + } else { + currentAllocationInterval = initialAllocationInterval + heartbeatInterval + } + logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + + s"Sleeping for ${sleepInterval}.") + Thread.sleep(sleepInterval) } catch { case e: InterruptedException => } @@ -363,8 +363,8 @@ private[spark] class ApplicationMaster( t.setDaemon(true) t.setName("Reporter") t.start() - logInfo("Started progress reporter thread with (heartbeat : " + heartbeatInterval + - ", eager allocation : " + eagerAllocationInterval + ") intervals") + logInfo(s"Started progress reporter thread with (heartbeat : ${heartbeatInterval}, " + + s"initial allocation : ${initialAllocationInterval}) intervals") t } From 6120295268509d517357750df1c0b8588b1ee03d Mon Sep 17 00:00:00 2001 From: ehnalis Date: Thu, 14 May 2015 20:49:30 +0200 Subject: [PATCH 4/6] Removed the bug, when allocation heartbeat would not start from initial value. --- .../spark/deploy/yarn/ApplicationMaster.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9934e3f9e115b..c7356966f7eb7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -307,7 +307,7 @@ private[spark] class ApplicationMaster( val initialAllocationInterval = math.min(heartbeatInterval, sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms")) - var currentAllocationInterval = initialAllocationInterval + var nextAllocationInterval = initialAllocationInterval // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) @@ -333,9 +333,9 @@ private[spark] class ApplicationMaster( if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + - s"${failureCount} time(s) from Reporter thread.") + s"$failureCount time(s) from Reporter thread.") } else { - logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) + logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) } } } @@ -343,15 +343,16 @@ private[spark] class ApplicationMaster( val numPendingAllocate = allocator.getNumPendingAllocate val sleepInterval = if (numPendingAllocate > 0) { - currentAllocationInterval = - math.min(heartbeatInterval, currentAllocationInterval * 2) + val currentAllocationInterval = + math.min(heartbeatInterval, nextAllocationInterval) + nextAllocationInterval *= 2 currentAllocationInterval } else { - currentAllocationInterval = initialAllocationInterval + nextAllocationInterval = initialAllocationInterval heartbeatInterval } - logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + - s"Sleeping for ${sleepInterval}.") + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Sleeping for $sleepInterval.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => @@ -363,8 +364,8 @@ private[spark] class ApplicationMaster( t.setDaemon(true) t.setName("Reporter") t.start() - logInfo(s"Started progress reporter thread with (heartbeat : ${heartbeatInterval}, " + - s"initial allocation : ${initialAllocationInterval}) intervals") + logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " + + s"initial allocation : $initialAllocationInterval) intervals") t } From 90f8ba4bcde5b0eb050f109d3143fc003e521e6d Mon Sep 17 00:00:00 2001 From: ehnalis Date: Thu, 14 May 2015 20:52:22 +0200 Subject: [PATCH 5/6] Changed default HB values. --- docs/running-on-yarn.md | 2 +- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 65a5c325bc322..ff38ce3dfecb2 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -71,7 +71,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.scheduler.heartbeat.interval-ms - 5000 + 3000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN's configuration for the expiry interval diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c7356966f7eb7..63a6f2e9472c1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -301,7 +301,7 @@ private[spark] class ApplicationMaster( // we want to be reasonably responsive without causing too many requests to RM. val heartbeatInterval = math.max(0, math.min(expiryInterval / 2, - sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s"))) + sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s"))) // we want to check more frequently for pending containers val initialAllocationInterval = math.min(heartbeatInterval, From a1d2101a33b7cb3d75e5bc93fbdc5db6689109bd Mon Sep 17 00:00:00 2001 From: ehnalis Date: Mon, 18 May 2015 20:30:06 +0200 Subject: [PATCH 6/6] MIss-spell fixed. --- docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index ff38ce3dfecb2..9d55f435e80ad 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -84,7 +84,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger than - spark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will double on + spark.yarn.scheduler.heartbeat.interval-ms. The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached.