Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19659][CORE][FOLLOW-UP] Fetch big blocks to disk when shuffle-read #18117

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -214,11 +214,12 @@ final class ShuffleBlockFetcherIterator(
}
}

// Shuffle remote blocks to disk when the request is too large.
// TODO: Encryption and compression should be considered.
// Fetch remote shuffle blocks to disk when the request is too large. Since the shuffle data is
// already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch
// the data and write it to file directly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is really good. Sorry for my ambiguous.

if (req.size > maxReqSizeShuffleToMem) {
val shuffleFiles = blockIds.map {
bId => blockManager.diskBlockManager.createTempLocalBlock()._2
val shuffleFiles = blockIds.map { _ =>
blockManager.diskBlockManager.createTempLocalBlock()._2
}.toArray
shuffleFilesSet ++= shuffleFiles
shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.network.util.LimitedInputStream
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.Utils


class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester {
Expand Down Expand Up @@ -420,9 +421,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
doReturn(localBmId).when(blockManager).blockManagerId

val diskBlockManager = mock(classOf[DiskBlockManager])
val tmpDir = Utils.createTempDir()
doReturn{
var blockId = new TempLocalBlockId(UUID.randomUUID())
(blockId, new File(blockId.name))
val blockId = TempLocalBlockId(UUID.randomUUID())
(blockId, new File(tmpDir, blockId.name))
}.when(diskBlockManager).createTempLocalBlock()
doReturn(diskBlockManager).when(blockManager).diskBlockManager

Expand All @@ -443,34 +445,34 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
}
})

def fetchShuffleBlock(blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])]): Unit = {
// Set `maxBytesInFlight` and `maxReqsInFlight` to `Int.MaxValue`, so that during the
// construction of `ShuffleBlockFetcherIterator`, all requests to fetch remote shuffle blocks
// are issued. The `maxReqSizeShuffleToMem` is hard-coded as 200 here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress,
(_, in) => in,
maxBytesInFlight = Int.MaxValue,
maxReqsInFlight = Int.MaxValue,
maxReqSizeShuffleToMem = 200,
detectCorrupt = true)
}

val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 100L)).toSeq))
// Set maxReqSizeShuffleToMem to be 200.
val iterator1 = new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress1,
(_, in) => in,
Int.MaxValue,
Int.MaxValue,
200,
true)
fetchShuffleBlock(blocksByAddress1)
// `maxReqSizeShuffleToMem` is 200, which is greater than the block size 100, so don't fetch
// shuffle block to disk.
assert(shuffleFiles === null)

val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
// Set maxReqSizeShuffleToMem to be 200.
val iterator2 = new ShuffleBlockFetcherIterator(
TaskContext.empty(),
transfer,
blockManager,
blocksByAddress2,
(_, in) => in,
Int.MaxValue,
Int.MaxValue,
200,
true)
fetchShuffleBlock(blocksByAddress2)
// `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 300, so fetch
// shuffle block to disk.
assert(shuffleFiles != null)
}
}