Skip to content

Commit

Permalink
adding bounded cache to executor side
Browse files Browse the repository at this point in the history
  • Loading branch information
attilapiros committed Oct 4, 2019
1 parent e64ceb9 commit 54542f9
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 22 deletions.
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -2849,7 +2849,12 @@ object SparkContext extends Logging {
memoryPerSlaveInt, sc.executorMemory))
}

// for local cluster mode the SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED defaults to false
// For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
// to false because this mode is intended to be used for testing and in this case all the
// executors are running on the same host. So if host local reading was enabled here then
// testing of the remote fetching would be secondary as setting this config explicitly to
// false would be required in most of the unit test (despite the fact that remote fetching
// is much more frequent in production).
sc.conf.setIfMissing(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)

val scheduler = new TaskSchedulerImpl(sc)
Expand Down
Expand Up @@ -1036,9 +1036,11 @@ package object config {

private[spark] val STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE =
ConfigBuilder("spark.storage.localDiskByExecutors.cacheSize")
.doc("The maximum size of the cache of the local dirs for the executors. This cache will " +
"be used to avoid the network in case of fetching disk persisted RDD blocks or shuffle " +
"blocks (when `spark.shuffle.readHostLocalDisk.enabled` is set) from the same host.")
.doc("The max number of executors for which the local dirs are stored. This size is " +
"both applied for the driver and both for the executors side to avoid having an " +
"unbounded store. This cache will be used to avoid the network in case of fetching disk " +
"persisted RDD blocks or shuffle blocks (when `spark.shuffle.readHostLocalDisk.enabled` " +
"is set) from the same host.")
.intConf
.createWithDefault(1000)

Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -33,6 +33,7 @@ import scala.util.Random
import scala.util.control.NonFatal

import com.codahale.metrics.{MetricRegistry, MetricSet}
import com.google.common.cache.CacheBuilder
import org.apache.commons.io.IOUtils

import org.apache.spark._
Expand Down Expand Up @@ -206,7 +207,11 @@ private[spark] class BlockManager(
new BlockManager.RemoteBlockDownloadFileManager(this)
private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)

private val executorIdToLocalDirsCache = new mutable.HashMap[String, Array[String]]()
private val executorIdToLocalDirsCache =
CacheBuilder
.newBuilder()
.maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
.build[String, Array[String]]()

/**
* Abstraction for storing blocks from bytes, whether they start in memory or on disk.
Expand Down Expand Up @@ -1003,15 +1008,16 @@ private[spark] class BlockManager(
}

private[spark] def getHostLocalDirs(executorIds: Array[String])
: scala.collection.Map[String, Array[String]] = {
val cachedItems = executorIdToLocalDirsCache.filterKeys(executorIds.contains(_))
: scala.collection.Map[String, Array[String]] = synchronized {
import scala.collection.JavaConverters._
val cachedItems = executorIdToLocalDirsCache.getAllPresent(executorIds.toIterable.asJava)
if (cachedItems.size < executorIds.length) {
val notCachedItems = master
.getHostLocalDirs(executorIds.filterNot(executorIdToLocalDirsCache.contains))
executorIdToLocalDirsCache ++= notCachedItems
cachedItems ++ notCachedItems
.getHostLocalDirs(executorIds.filterNot(cachedItems.containsKey))
executorIdToLocalDirsCache.putAll(notCachedItems.asJava)
notCachedItems ++ cachedItems.asScala
} else {
cachedItems
cachedItems.asScala
}
}

Expand Down
Expand Up @@ -87,13 +87,11 @@ final class ShuffleBlockFetcherIterator(
// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)
private val targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)

/**
* Total number of blocks to fetch. This should be equal to the total number of blocks
* in [[blocksByAddress]] because we already filter out zero-sized blocks in [[blocksByAddress]].
*
* This should equal localBlocks.size + remoteBlocks.size + hostLocalBlocks.size
*/
private[this] var numBlocksToFetch = 0

Expand Down Expand Up @@ -300,6 +298,8 @@ final class ShuffleBlockFetcherIterator(
localBlocks ++= blockInfos.map(info => (info._1, info._3))
localBlockBytes += blockInfos.map(_._2).sum
} else if (enableHostLocalDiskReading && address.host == blockManager.blockManagerId.host) {
// because of STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE it can happen there is no
// local dir found for some of the blocks then those blocks will be fetched remotely
checkBlockSizes(blockInfos)
hostLocalBlocksByExecutor += address -> blockInfos
hostLocalBlocks ++= blockInfos.map(_._1)
Expand All @@ -312,9 +312,9 @@ final class ShuffleBlockFetcherIterator(
}
val totalBytes = localBlockBytes + remoteBlockBytes
logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)}) non-empty blocks " +
s"including ${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local blocks and " +
s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)}) host-local blocks " +
s"and $numRemoteBlocks (${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
s"including ${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local and " +
s"${hostLocalBlocks.size} (${Utils.bytesToString(hostLocalBlockBytes)}) potentially " +
s"host-local and $numRemoteBlocks (${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
collectedRemoteRequests
}

Expand Down Expand Up @@ -399,7 +399,7 @@ final class ShuffleBlockFetcherIterator(
* `ManagedBuffer`'s memory is allocated lazily when we create the input stream, so all we
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchHostLocalBlocks() {
private[this] def fetchHostLocalBlocks(): Unit = {
logDebug(s"Start fetching host-local blocks: ${hostLocalBlocks.mkString(", ")}")
val hostLocalExecutorIds = hostLocalBlocksByExecutor.keySet.map(_.executorId)
val readsWithoutLocalDir = LinkedHashMap[BlockManagerId, Seq[(BlockId, Long, Int)]]()
Expand Down Expand Up @@ -431,10 +431,10 @@ final class ShuffleBlockFetcherIterator(

if (readsWithoutLocalDir.nonEmpty) {
val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
readsWithoutLocalDir.foreach( { case (bmId, blockInfos) =>
readsWithoutLocalDir.foreach { case (bmId, blockInfos) =>
hostLocalBlocks --= blockInfos.map(_._1)
collectFetchRequests(bmId, blockInfos, collectedRemoteRequests)
})
}
logInfo(s"Add ${collectedRemoteRequests.size} new remote fetches as local dirs " +
"have not been cached for some executor")
fetchRequests ++= Utils.randomize(collectedRemoteRequests)
Expand Down Expand Up @@ -468,7 +468,6 @@ final class ShuffleBlockFetcherIterator(
fetchLocalBlocks()
logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")

// Get Host-local Blocks
if (hostLocalBlocks.nonEmpty) {
val hostLocalStartTimeNs = System.nanoTime()
fetchHostLocalBlocks()
Expand Down
Expand Up @@ -105,7 +105,6 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
sc = new SparkContext("local-cluster[2,1,1024]", "test", confWithHostLocalRead)
sc.getConf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true)
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])

// In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
Expand Down
Expand Up @@ -1690,6 +1690,58 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(locs(blockIds(0)) == expectedLocs)
}

test("caching of local disk directories for host local executors") {
val testExecutors = (0 to 2).map { index =>
s"hostLocal$index" -> Array(s"hostLocal${index}_dir1", s"hostLocal${index}_dir2")
}.toArray
val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
val confWithLimitedLocalDiskCache = conf.clone
confWithLimitedLocalDiskCache.set(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 1)
val store = makeBlockManager(
8000,
"executor1",
mockBlockManagerMaster,
testConf = Some(confWithLimitedLocalDiskCache))

def prepareHostLocalMockCall(indices: Int*): Unit = {
when(mockBlockManagerMaster.getHostLocalDirs(indices.map(testExecutors(_)._1).toArray))
.thenReturn(indices.map(testExecutors(_)).toMap)
}

def assertSameContents(
expected: scala.collection.Map[String, Array[String]],
actual: scala.collection.Map[String, Array[String]]): Unit = {
(expected.keys ++ actual.keys).foreach { key =>
val expectedValue = expected.get(key)
val actualValue = actual.get(key)
if (expectedValue.isDefined && actualValue.isDefined) {
assert(expectedValue.get.sameElements(actualValue.get),
s"actual is different from expected for '$key' key")
} else {
fail(s"actual is different from expected for '$key' key: $expectedValue != $actualValue")
}
}
}

def assertAnswer(indices: Int*): Unit =
assertSameContents(
store.getHostLocalDirs(indices.map(testExecutors(_)._1).toArray),
indices.map(testExecutors(_)).toMap)

prepareHostLocalMockCall(0)
assertAnswer(0)

// getHostLocalDirs call is expected to be called with only hostLocal1 as a parameter as
// hostLocal0 is already cached
prepareHostLocalMockCall(1)
assertAnswer(0, 1)

// getHostLocalDirs is expected to be called with the hostLocal0 and hostLocal2 as parameters
// as the cache size is 1 and hostLocal0 was removed when hostLocal2 was cached
prepareHostLocalMockCall(0, 2)
assertAnswer(0, 1, 2)
}

class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0
var tempFileManager: DownloadFileManager = null
Expand Down
Expand Up @@ -217,6 +217,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT

assert(iterator.hasNext, s"iterator should have 1 element")
val (_, inputStream) = iterator.next()
verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
verifyBufferRelease(mockBuf, inputStream)
assert(!iterator.hasNext, s"iterator should have only 1 element")
}
Expand Down

0 comments on commit 54542f9

Please sign in to comment.