Skip to content

Commit

Permalink
Used a (configurable) thread pool for shuffle migrations to allow use…
Browse files Browse the repository at this point in the history
…rs to limit the number of concurrent migrations. We still limit to one runnable per peer executor so we don't overwhelm any particular target even at large values of the thread pool.
  • Loading branch information
holdenk committed Jun 16, 2020
1 parent 3c904a1 commit 7da5130
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,15 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads")
.doc("Maximum number of threads to use in migrating shuffle files.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The maximum number of threads should be positive")
.createWithDefault(10)


private[spark] val STORAGE_RDD_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.rddBlocks.enabled")
.doc("Whether to transfer RDD blocks during block manager decommissioning.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[storage] class BlockManagerDecommissioner(
@volatile private var stopped = false

private val migrationPeers =
mutable.HashMap[BlockManagerId, (ShuffleMigrationRunnable, ExecutorService)]()
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()

private lazy val blockMigrationExecutor =
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission")
Expand Down Expand Up @@ -163,6 +163,9 @@ private[storage] class BlockManagerDecommissioner(
}
}

lazy val shuffleMigrationPool = ThreadUtils.newDaemonCachedThreadPool(
"migrate-shuffles",
conf.get(config.STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS))
/**
* Tries to offload all shuffle blocks that are registered with the shuffle service locally.
* Note: this does not delete the shuffle files in-case there is an in-progress fetch
Expand All @@ -186,14 +189,13 @@ private[storage] class BlockManagerDecommissioner(
val newPeers = livePeerSet.&~(currentPeerSet)
migrationPeers ++= newPeers.map { peer =>
logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
val executor = ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}")
val runnable = new ShuffleMigrationRunnable(peer)
executor.submit(runnable)
(peer, (runnable, executor))
shuffleMigrationPool.submit(runnable)
(peer, runnable)
}
// A peer may have entered a decommissioning state, don't transfer any new blocks
deadPeers.foreach { peer =>
migrationPeers.get(peer).foreach(_._1.running = false)
migrationPeers.get(peer).foreach(_.running = false)
}
}

Expand All @@ -203,12 +205,9 @@ private[storage] class BlockManagerDecommissioner(
private[storage] def stopOffloadingShuffleBlocks(): Unit = {
logInfo("Stopping offloading shuffle blocks.")
// Stop as gracefully as possible.
migrationPeers.values.foreach{case (runnable, service) =>
runnable.running = false}
migrationPeers.values.foreach{case (runnable, service) =>
service.shutdown()}
migrationPeers.values.foreach{case (runnable, service) =>
service.shutdownNow()}
migrationPeers.values.foreach{_.running = false}
shuffleMigrationPool.shutdown()
shuffleMigrationPool.shutdownNow()
}

/**
Expand Down

0 comments on commit 7da5130

Please sign in to comment.