From 57fefc2a3effcb1127d23b8ca4ec1d97d40e3e49 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 29 May 2015 13:47:07 -0700 Subject: [PATCH 01/14] [SPARK-7955][Core] Ensure executors with cached RDD blocks are not removed if dynamic allocation is enabled. This is a work in progress. This patch ensures that an executor that has cached RDD blocks are not removed, but makes no attempt to find another executor to remove. This is meant to get some feedback on the current approach, and if it makes sense then I will look at choosing another executor to remove. --- .../spark/ExecutorAllocationManager.scala | 25 +++++++++++++++++-- .../apache/spark/storage/BlockManager.scala | 6 +++++ .../spark/storage/BlockManagerMessages.scala | 1 + .../storage/BlockManagerSlaveEndpoint.scala | 4 +++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 951460475264..005139023bc5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -23,8 +23,11 @@ import scala.collection.mutable import com.codahale.metrics.{Gauge, MetricRegistry} +import org.apache.spark.executor.ExecutorEndpoint +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler._ import org.apache.spark.metrics.source.Source +import org.apache.spark.storage.BlockManagerMessages.HasCachedBlocks import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} /** @@ -150,6 +153,9 @@ private[spark] class ExecutorAllocationManager( // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. val executorAllocationManagerSource = new ExecutorAllocationManagerSource + private lazy val sparkEnv = SparkEnv.get + + private val executorEndpoints = new mutable.HashMap[String, RpcEndpointRef]() /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -239,10 +245,13 @@ private[spark] class ExecutorAllocationManager( removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime - if (expired) { + var removed = false + val executorEndpoint = executorEndpoints.get(executorId) + if (expired && !executorEndpoint.exists(_.askWithRetry[Boolean](HasCachedBlocks))) { removeExecutor(executorId) + removed = true } - !expired + !removed } } @@ -388,6 +397,17 @@ private[spark] class ExecutorAllocationManager( // however, we are no longer at the lower bound, and so we must mark executor X // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) + val hostAndPort = + sparkEnv.blockManager.master.getRpcHostPortForExecutor(executorId) + hostAndPort match { + case Some((host, port)) => + executorEndpoints(executorId) = + sparkEnv.rpcEnv.setupEndpointRef( + SparkEnv.executorActorSystemName, + RpcAddress(host, port), + ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME) + case None => + } logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") } else { logWarning(s"Duplicate executor $executorId has registered") @@ -407,6 +427,7 @@ private[spark] class ExecutorAllocationManager( logDebug(s"Executor $executorId is no longer pending to " + s"be removed (${executorsPendingToRemove.size} left)") } +// executorEndpoints -= executorId } else { logWarning(s"Unknown executor $executorId has been removed!") } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5048c7dab240..0bca53eee956 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1241,6 +1241,12 @@ private[spark] class BlockManager( futureExecutionContext.shutdownNow() logInfo("BlockManager stopped") } + + private[storage] def hasCachedBlocks: Boolean = { + blockInfo.exists { kv => + getCurrentBlockStatus(kv._1, kv._2).isCached + } + } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 1683576067fe..2a00f7bef695 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -42,6 +42,7 @@ private[spark] object BlockManagerMessages { case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) extends ToBlockManagerSlave + case object HasCachedBlocks extends ToBlockManagerSlave ////////////////////////////////////////////////////////////////////////////////// // Messages from slaves to the master. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 543df4e1350d..b50cc679c006 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -70,6 +70,10 @@ class BlockManagerSlaveEndpoint( case GetMatchingBlockIds(filter, _) => context.reply(blockManager.getMatchingBlockIds(filter)) + + case HasCachedBlocks => + context.reply(blockManager.hasCachedBlocks) + } private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) { From 5a1993fd9d5924a6b2630b323c05997f9098e861 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 29 May 2015 16:41:48 -0700 Subject: [PATCH 02/14] Add timeout for cache executors. Ignore broadcast blocks while checking if there are cached blocks. --- .../spark/ExecutorAllocationManager.scala | 34 +++++++++++++++---- .../apache/spark/storage/BlockManager.scala | 4 +-- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 005139023bc5..934ccf6709e2 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -104,6 +104,11 @@ private[spark] class ExecutorAllocationManager( private val executorIdleTimeoutS = conf.getTimeAsSeconds( "spark.dynamicAllocation.executorIdleTimeout", "60s") + private val cachedExecutorTimeoutS = conf.getTimeAsSeconds( + "spark.dynamicAllocation.executorIdleTimeout", s"${Long.MaxValue / 60}s") + + val executorsWithCachedBlocks = new mutable.HashSet[String]() + // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) @@ -245,13 +250,10 @@ private[spark] class ExecutorAllocationManager( removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime - var removed = false - val executorEndpoint = executorEndpoints.get(executorId) - if (expired && !executorEndpoint.exists(_.askWithRetry[Boolean](HasCachedBlocks))) { + if (expired) { removeExecutor(executorId) - removed = true } - !removed + !expired } } @@ -464,9 +466,27 @@ private[spark] class ExecutorAllocationManager( private def onExecutorIdle(executorId: String): Unit = synchronized { if (executorIds.contains(executorId)) { if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { + + val hasCachedBlocks = + executorsWithCachedBlocks.contains(executorId) || + executorEndpoints.get(executorId).exists(_.askWithRetry[Boolean](HasCachedBlocks)) + + if (hasCachedBlocks) executorsWithCachedBlocks += executorId + + val now = clock.getTimeMillis() + val timeout = { + if (hasCachedBlocks) { + val newExpiry = now + cachedExecutorTimeoutS * 1000 + if (newExpiry < 0) Long.MaxValue // Overflow + else newExpiry + } else { + now + executorIdleTimeoutS * 1000 + } + } + + removeTimes(executorId) = timeout logDebug(s"Starting idle timer for $executorId because there are no more tasks " + - s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)") - removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000 + s"scheduled to run on the executor") } } else { logWarning(s"Attempted to mark unknown executor $executorId idle") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0bca53eee956..e3dc6f41fa96 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1243,8 +1243,8 @@ private[spark] class BlockManager( } private[storage] def hasCachedBlocks: Boolean = { - blockInfo.exists { kv => - getCurrentBlockStatus(kv._1, kv._2).isCached + blockInfo.exists { case (blockId, info) => + !blockId.isBroadcast && getCurrentBlockStatus(blockId, info).isCached } } } From 272969d833520aff34b18e30c4165d3f57e76574 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 29 May 2015 17:29:13 -0700 Subject: [PATCH 03/14] Fix seconds to millis bug. --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 934ccf6709e2..cb6753392a04 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -105,7 +105,7 @@ private[spark] class ExecutorAllocationManager( "spark.dynamicAllocation.executorIdleTimeout", "60s") private val cachedExecutorTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.executorIdleTimeout", s"${Long.MaxValue / 60}s") + "spark.dynamicAllocation.executorIdleTimeout", s"${Long.MaxValue / 1000}s") val executorsWithCachedBlocks = new mutable.HashSet[String]() From ae932ffa53d49b99ae6392843a36e5cbc0c8e9f5 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 29 May 2015 17:29:49 -0700 Subject: [PATCH 04/14] Fix config param name. --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index cb6753392a04..d6077e16554d 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -105,7 +105,7 @@ private[spark] class ExecutorAllocationManager( "spark.dynamicAllocation.executorIdleTimeout", "60s") private val cachedExecutorTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.executorIdleTimeout", s"${Long.MaxValue / 1000}s") + "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Long.MaxValue / 1000}s") val executorsWithCachedBlocks = new mutable.HashSet[String]() From 193af4c00d8950bc1b37b355b81b6bb25327e0f1 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 3 Jun 2015 11:16:20 -0700 Subject: [PATCH 05/14] WIP. Use local state rather than via RPC. --- .../spark/ExecutorAllocationManager.scala | 21 +++---------------- .../spark/storage/BlockManagerMaster.scala | 8 +++++++ .../storage/BlockManagerMasterEndpoint.scala | 2 ++ .../spark/storage/BlockManagerMessages.scala | 4 ++-- .../storage/BlockManagerSlaveEndpoint.scala | 3 --- 5 files changed, 15 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index d6077e16554d..bc55ba58ae1e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -158,9 +158,8 @@ private[spark] class ExecutorAllocationManager( // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. val executorAllocationManagerSource = new ExecutorAllocationManagerSource - private lazy val sparkEnv = SparkEnv.get + private val sparkEnv = SparkEnv.get - private val executorEndpoints = new mutable.HashMap[String, RpcEndpointRef]() /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -399,17 +398,7 @@ private[spark] class ExecutorAllocationManager( // however, we are no longer at the lower bound, and so we must mark executor X // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) - val hostAndPort = - sparkEnv.blockManager.master.getRpcHostPortForExecutor(executorId) - hostAndPort match { - case Some((host, port)) => - executorEndpoints(executorId) = - sparkEnv.rpcEnv.setupEndpointRef( - SparkEnv.executorActorSystemName, - RpcAddress(host, port), - ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME) - case None => - } + logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") } else { logWarning(s"Duplicate executor $executorId has registered") @@ -467,11 +456,7 @@ private[spark] class ExecutorAllocationManager( if (executorIds.contains(executorId)) { if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { - val hasCachedBlocks = - executorsWithCachedBlocks.contains(executorId) || - executorEndpoints.get(executorId).exists(_.askWithRetry[Boolean](HasCachedBlocks)) - - if (hasCachedBlocks) executorsWithCachedBlocks += executorId + val hasCachedBlocks = sparkEnv.blockManager.master. val now = clock.getTimeMillis() val timeout = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index abcad9438bf2..12ea117477ce 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -202,6 +202,14 @@ class BlockManagerMaster( Await.result(future, timeout) } + /** + * Find out if the executor has cached blocks. This method checks only if the executor holds + * cached blocks, which are not broadcast blocks. + */ + def hasCachedBlocks(executorId: String): Boolean = { + driverEndpoint.askWithRetry[Boolean](hasCachedBlocks(executorId)) + } + /** Stop the driver endpoint, called only on the Spark driver node */ def stop() { if (driverEndpoint != null && isDriver) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 3afb4c3c02e2..93be3fa1f80e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -48,6 +48,8 @@ class BlockManagerMasterEndpoint( // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] + private val cachedBlocksByExecutor = new mutable.HashMap[String, mutable.Set[BlockId]] + // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 2a00f7bef695..376e9eb48843 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -42,8 +42,6 @@ private[spark] object BlockManagerMessages { case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) extends ToBlockManagerSlave - case object HasCachedBlocks extends ToBlockManagerSlave - ////////////////////////////////////////////////////////////////////////////////// // Messages from slaves to the master. ////////////////////////////////////////////////////////////////////////////////// @@ -109,4 +107,6 @@ private[spark] object BlockManagerMessages { extends ToBlockManagerMaster case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + + case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index b50cc679c006..dc8f376dcf1c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -71,9 +71,6 @@ class BlockManagerSlaveEndpoint( case GetMatchingBlockIds(filter, _) => context.reply(blockManager.getMatchingBlockIds(filter)) - case HasCachedBlocks => - context.reply(blockManager.hasCachedBlocks) - } private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) { From 5d10fad34511d7235ca6ce053991fe34bd06732d Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 4 Jun 2015 00:15:13 -0700 Subject: [PATCH 06/14] Update cached blocks status using local info, rather than doing an RPC. --- .../storage/BlockManagerMasterEndpoint.scala | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 93be3fa1f80e..160105d9f01c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -48,8 +48,6 @@ class BlockManagerMasterEndpoint( // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] - private val cachedBlocksByExecutor = new mutable.HashMap[String, mutable.Set[BlockId]] - // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] @@ -114,6 +112,12 @@ class BlockManagerMasterEndpoint( case BlockManagerHeartbeat(blockManagerId) => context.reply(heartbeatReceived(blockManagerId)) + case HasCachedBlocks(executorId) => + val blockManager = blockManagerIdByExecutor.get(executorId) + blockManager.foreach { bm => + blockManagerInfo.get(bm).foreach { info => context.reply(info.cachedBlocks.nonEmpty) } + } + } private def removeRdd(rddId: Int): Future[Seq[Int]] = { @@ -303,7 +307,7 @@ class BlockManagerMasterEndpoint( id.hostPort, Utils.bytesToString(maxMemSize), id)) blockManagerIdByExecutor(id.executorId) = id - + blockManagerInfo(id) = new BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) } @@ -420,6 +424,8 @@ private[spark] class BlockManagerInfo( // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] + private val _cachedBlocks = new mutable.HashSet[BlockId] + def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId)) def updateLastSeenMs() { @@ -453,23 +459,28 @@ private[spark] class BlockManagerInfo( * and the diskSize here indicates the data size in or dropped to disk. * They can be both larger than 0, when a block is dropped from memory to disk. * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */ + var blockStatus: BlockStatus = null if (storageLevel.useMemory) { - _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0)) + blockStatus = BlockStatus(storageLevel, memSize, 0, 0) + _blocks.put(blockId, blockStatus) _remainingMem -= memSize logInfo("Added %s in memory on %s (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0)) + blockStatus = BlockStatus(storageLevel, 0, diskSize, 0) + _blocks.put(blockId, blockStatus) logInfo("Added %s on disk on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } if (storageLevel.useOffHeap) { - _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)) + blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize) + _blocks.put(blockId, blockStatus) logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize))) } + if (!blockId.isBroadcast && blockStatus.isCached) _cachedBlocks += blockId } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. val blockStatus: BlockStatus = _blocks.get(blockId) @@ -488,6 +499,7 @@ private[spark] class BlockManagerInfo( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.externalBlockStoreSize))) } + if (!blockId.isBroadcast && blockStatus.isCached) _cachedBlocks += blockId } } @@ -496,6 +508,7 @@ private[spark] class BlockManagerInfo( _remainingMem += _blocks.get(blockId).memSize _blocks.remove(blockId) } + _cachedBlocks -= blockId } def remainingMem: Long = _remainingMem @@ -504,6 +517,8 @@ private[spark] class BlockManagerInfo( def blocks: JHashMap[BlockId, BlockStatus] = _blocks + def cachedBlocks: mutable.HashSet[BlockId] = _cachedBlocks + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem def clear() { From ec2fd7e771dc09dca51ff98ebc0f44c2c108c374 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 4 Jun 2015 00:33:05 -0700 Subject: [PATCH 07/14] Add file missed in last commit --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bc55ba58ae1e..9490768d0399 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -456,7 +456,7 @@ private[spark] class ExecutorAllocationManager( if (executorIds.contains(executorId)) { if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { - val hasCachedBlocks = sparkEnv.blockManager.master. + val hasCachedBlocks = sparkEnv.blockManager.master.hasCachedBlocks(executorId) val now = clock.getTimeMillis() val timeout = { From 063985c4e35c95759a0c970bf4c2b3bfb3b680e6 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 4 Jun 2015 12:01:23 -0700 Subject: [PATCH 08/14] Send correct message instead of recursively calling same method. --- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 12ea117477ce..9aacbb3a8f04 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -207,7 +207,7 @@ class BlockManagerMaster( * cached blocks, which are not broadcast blocks. */ def hasCachedBlocks(executorId: String): Boolean = { - driverEndpoint.askWithRetry[Boolean](hasCachedBlocks(executorId)) + driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId)) } /** Stop the driver endpoint, called only on the Spark driver node */ From 90ad7117e76ed1c71bb6dbe5fee6c63e925bcc31 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 4 Jun 2015 12:12:21 -0700 Subject: [PATCH 09/14] Remove unused imports and unused methods. --- .../apache/spark/ExecutorAllocationManager.scala | 16 +++++----------- .../org/apache/spark/storage/BlockManager.scala | 6 ------ .../storage/BlockManagerSlaveEndpoint.scala | 1 - 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9490768d0399..daf0f81725e3 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -23,11 +23,8 @@ import scala.collection.mutable import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.executor.ExecutorEndpoint -import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler._ import org.apache.spark.metrics.source.Source -import org.apache.spark.storage.BlockManagerMessages.HasCachedBlocks import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} /** @@ -107,8 +104,6 @@ private[spark] class ExecutorAllocationManager( private val cachedExecutorTimeoutS = conf.getTimeAsSeconds( "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Long.MaxValue / 1000}s") - val executorsWithCachedBlocks = new mutable.HashSet[String]() - // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) @@ -418,7 +413,6 @@ private[spark] class ExecutorAllocationManager( logDebug(s"Executor $executorId is no longer pending to " + s"be removed (${executorsPendingToRemove.size} left)") } -// executorEndpoints -= executorId } else { logWarning(s"Unknown executor $executorId has been removed!") } @@ -461,17 +455,17 @@ private[spark] class ExecutorAllocationManager( val now = clock.getTimeMillis() val timeout = { if (hasCachedBlocks) { - val newExpiry = now + cachedExecutorTimeoutS * 1000 - if (newExpiry < 0) Long.MaxValue // Overflow - else newExpiry + now + cachedExecutorTimeoutS * 1000 } else { now + executorIdleTimeoutS * 1000 } } - removeTimes(executorId) = timeout + val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow + removeTimes(executorId) = realTimeout + logDebug(s"Starting idle timer for $executorId because there are no more tasks " + - s"scheduled to run on the executor") + s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)") } } else { logWarning(s"Attempted to mark unknown executor $executorId idle") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e3dc6f41fa96..5048c7dab240 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1241,12 +1241,6 @@ private[spark] class BlockManager( futureExecutionContext.shutdownNow() logInfo("BlockManager stopped") } - - private[storage] def hasCachedBlocks: Boolean = { - blockInfo.exists { case (blockId, info) => - !blockId.isBroadcast && getCurrentBlockStatus(blockId, info).isCached - } - } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index dc8f376dcf1c..543df4e1350d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -70,7 +70,6 @@ class BlockManagerSlaveEndpoint( case GetMatchingBlockIds(filter, _) => context.reply(blockManager.getMatchingBlockIds(filter)) - } private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) { From 39940cad66b703ae571db23a3c88b34371de1138 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 4 Jun 2015 14:52:33 -0700 Subject: [PATCH 10/14] Handle the case where the executor has not yet registered. --- .../spark/storage/BlockManagerMasterEndpoint.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 160105d9f01c..3fc2947ca5ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -113,11 +113,13 @@ class BlockManagerMasterEndpoint( context.reply(heartbeatReceived(blockManagerId)) case HasCachedBlocks(executorId) => - val blockManager = blockManagerIdByExecutor.get(executorId) - blockManager.foreach { bm => - blockManagerInfo.get(bm).foreach { info => context.reply(info.cachedBlocks.nonEmpty) } + blockManagerIdByExecutor.get(executorId) match { + case Some(bm) => + blockManagerInfo.get(bm).map { + info => context.reply(info.cachedBlocks.nonEmpty) + }.getOrElse(context.reply(false)) + case None => context.reply(false) } - } private def removeRdd(rddId: Int): Future[Seq[Int]] = { From 875916aaaaff3fe38e364ab7cad5703827786e4c Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Thu, 4 Jun 2015 17:35:47 -0700 Subject: [PATCH 11/14] Make some code more readable. --- .../spark/storage/BlockManagerMasterEndpoint.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 3fc2947ca5ef..92853ef55e9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -115,9 +115,12 @@ class BlockManagerMasterEndpoint( case HasCachedBlocks(executorId) => blockManagerIdByExecutor.get(executorId) match { case Some(bm) => - blockManagerInfo.get(bm).map { - info => context.reply(info.cachedBlocks.nonEmpty) - }.getOrElse(context.reply(false)) + if (blockManagerInfo.contains(bm)) { + val bmInfo = blockManagerInfo(bm) + context.reply(bmInfo.cachedBlocks.nonEmpty) + } else { + context.reply(false) + } case None => context.reply(false) } } From 5417b53d9bf2a3c7cc946f92ad03e3e5c148973d Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 5 Jun 2015 13:46:33 -0700 Subject: [PATCH 12/14] Add documentation for new config. Remove block from cachedBlocks when it is dropped. --- .../spark/ExecutorAllocationManager.scala | 19 ++++++++----------- .../spark/storage/BlockManagerMaster.scala | 4 ++-- .../storage/BlockManagerMasterEndpoint.scala | 11 ++++++++--- docs/configuration.md | 10 ++++++++++ 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index daf0f81725e3..d038ab930b2d 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -101,8 +101,8 @@ private[spark] class ExecutorAllocationManager( private val executorIdleTimeoutS = conf.getTimeAsSeconds( "spark.dynamicAllocation.executorIdleTimeout", "60s") - private val cachedExecutorTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Long.MaxValue / 1000}s") + private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds( + "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${2 * executorIdleTimeoutS}s") // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) @@ -153,8 +153,6 @@ private[spark] class ExecutorAllocationManager( // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. val executorAllocationManagerSource = new ExecutorAllocationManagerSource - private val sparkEnv = SparkEnv.get - /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -393,7 +391,6 @@ private[spark] class ExecutorAllocationManager( // however, we are no longer at the lower bound, and so we must mark executor X // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) - logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") } else { logWarning(s"Duplicate executor $executorId has registered") @@ -449,21 +446,21 @@ private[spark] class ExecutorAllocationManager( private def onExecutorIdle(executorId: String): Unit = synchronized { if (executorIds.contains(executorId)) { if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) { - - val hasCachedBlocks = sparkEnv.blockManager.master.hasCachedBlocks(executorId) - + // Note that it is not necessary to query the executors since all the cached + // blocks we are concerned with are reported to the driver. Note that this + // does not include broadcast blocks. + val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId) val now = clock.getTimeMillis() val timeout = { if (hasCachedBlocks) { - now + cachedExecutorTimeoutS * 1000 + // Use a different timeout if the executor has cached blocks. + now + cachedExecutorIdleTimeoutS * 1000 } else { now + executorIdleTimeoutS * 1000 } } - val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow removeTimes(executorId) = realTimeout - logDebug(s"Starting idle timer for $executorId because there are no more tasks " + s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)") } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 9aacbb3a8f04..7cdae22b0e25 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -203,8 +203,8 @@ class BlockManagerMaster( } /** - * Find out if the executor has cached blocks. This method checks only if the executor holds - * cached blocks, which are not broadcast blocks. + * Find out if the executor has cached blocks. This method does not consider broadcast blocks, + * since they are not reported the master. */ def hasCachedBlocks(executorId: String): Boolean = { driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 92853ef55e9e..8a8b9c8cd439 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.util.{HashMap => JHashMap} +import scala.collection.immutable.HashSet import scala.collection.mutable import scala.collection.JavaConversions._ import scala.concurrent.{ExecutionContext, Future} @@ -429,6 +430,7 @@ private[spark] class BlockManagerInfo( // Mapping from block id to its status. private val _blocks = new JHashMap[BlockId, BlockStatus] + // Cached blocks held by this BlockManager. This does not include broadcast blocks. private val _cachedBlocks = new mutable.HashSet[BlockId] def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId)) @@ -485,11 +487,14 @@ private[spark] class BlockManagerInfo( logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize))) } - if (!blockId.isBroadcast && blockStatus.isCached) _cachedBlocks += blockId + if (!blockId.isBroadcast && blockStatus.isCached) { + _cachedBlocks += blockId + } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. val blockStatus: BlockStatus = _blocks.get(blockId) _blocks.remove(blockId) + cachedBlocks -= blockId if (blockStatus.storageLevel.useMemory) { logInfo("Removed %s on %s in memory (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize), @@ -504,7 +509,6 @@ private[spark] class BlockManagerInfo( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.externalBlockStoreSize))) } - if (!blockId.isBroadcast && blockStatus.isCached) _cachedBlocks += blockId } } @@ -522,7 +526,8 @@ private[spark] class BlockManagerInfo( def blocks: JHashMap[BlockId, BlockStatus] = _blocks - def cachedBlocks: mutable.HashSet[BlockId] = _cachedBlocks + // This does not include broadcast blocks. + def cachedBlocks: collection.Set[BlockId] = _cachedBlocks override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem diff --git a/docs/configuration.md b/docs/configuration.md index 30508a617fdd..8662db315cf2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1201,6 +1201,16 @@ Apart from these, the following properties are also available, and may be useful description. + + spark.dynamicAllocation.cachedExecutorIdleTimeout + 120s + + If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, + the executor will be removed. If this parameter is not specified, it defaults to twice the value specified by + spark.dynamicAllocation.executorIdleTimeout. For more details, see this + description. + + spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors From 10130e238160ac2d9a9e9f27d273cb9736134fff Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 5 Jun 2015 13:58:29 -0700 Subject: [PATCH 13/14] Fix compile issue. --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 8a8b9c8cd439..d5f08d01e755 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -494,7 +494,7 @@ private[spark] class BlockManagerInfo( // If isValid is not true, drop the block. val blockStatus: BlockStatus = _blocks.get(blockId) _blocks.remove(blockId) - cachedBlocks -= blockId + _cachedBlocks -= blockId if (blockStatus.storageLevel.useMemory) { logInfo("Removed %s on %s in memory (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize), From dddf1eb1cd94f3db83594148d3de65896bd27aa6 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 5 Jun 2015 15:02:17 -0700 Subject: [PATCH 14/14] Minor configuration description update. --- docs/configuration.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8662db315cf2..9fb134ff8189 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1203,11 +1203,10 @@ Apart from these, the following properties are also available, and may be useful spark.dynamicAllocation.cachedExecutorIdleTimeout - 120s + 2 * executorIdleTimeout If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, - the executor will be removed. If this parameter is not specified, it defaults to twice the value specified by - spark.dynamicAllocation.executorIdleTimeout. For more details, see this + the executor will be removed. For more details, see this description.