Skip to content

Commit

Permalink
Always transfer shuffle blocks as put, take out the spark.network.max…
Browse files Browse the repository at this point in the history
…RemoteBlockSizeFetchToMem test that we don't need anymore, add back in submitting the thread I accidently took out in applying some CR feedback (a little fast on the ctrl-k)
  • Loading branch information
holdenk committed Jun 4, 2020
1 parent 6c1b364 commit a904030
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 11 deletions.
Expand Up @@ -168,7 +168,8 @@ private[spark] class NettyBlockTransferService(
// Everything else is encoded using our binary protocol.
val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))

val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
val asStream = (blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) ||
blockId.isInternalShuffle || blockId.isShuffle)
val callback = new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
Expand Down
Expand Up @@ -664,7 +664,7 @@ private[spark] class BlockManager(
classTag: ClassTag[_]): StreamCallbackWithID = {

if (blockManagerDecommissioning) {
throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
}

if (blockId.isShuffle || blockId.isInternalShuffle) {
Expand Down Expand Up @@ -1907,6 +1907,7 @@ private[spark] class BlockManager(
logDebug(s"Starting thread to migrate shuffle blocks to ${peer}")
val executor = ThreadUtils.newDaemonSingleThreadExecutor(s"migrate-shuffle-to-${peer}")
val runnable = new ShuffleMigrationRunnable(peer)
executor.submit(runnable)
(peer, runnable)
}
// A peer may have entered a decommissioning state, don't transfer any new blocks
Expand Down
Expand Up @@ -41,10 +41,6 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
runDecomTest(true, false, true)
}

test(s"verify that shuffle blocks are migrated with force to disk") {
runDecomTest(false, true, false, remoteBlockSize = "1")
}

test(s"verify that shuffle blocks are migrated") {
runDecomTest(false, true, false)
}
Expand All @@ -53,8 +49,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
runDecomTest(true, true, false)
}

private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean,
remoteBlockSize: String = "100000") = {
private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = {

val master = s"local-cluster[${numExecs}, 1, 1024]"
val conf = new SparkConf().setAppName("test").setMaster(master)
Expand All @@ -66,9 +61,6 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)

// Allow force fetching to local disk
conf.set("spark.network.maxRemoteBlockSizeFetchToMem", remoteBlockSize)

sc = new SparkContext(master, "test", conf)

// Create input RDD with 10 partitions
Expand Down

0 comments on commit a904030

Please sign in to comment.