Skip to content

Commit

Permalink
[SPARK-35910][CORE][SHUFFLE] Update remoteBlockBytes based on merged …
Browse files Browse the repository at this point in the history
…block info to reduce task time

### What changes were proposed in this pull request?

Currently, we calculate the `remoteBlockBytes` based on the original block info list. It's not efficient. Usually, it costs more ~25% time to be spent here.

If the original reducer size is big but the actual reducer size is small due to automatically partition coalescing of AQE, the reducer will take more time to calculate `remoteBlockBytes`.

We can reduce this cost via remote requests which contain merged block info lists.

### Why are the changes needed?

improve task performance

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new unit tests and verified manually.

Closes #33109 from yaooqinn/SPARK-35910.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Jun 28, 2021
1 parent c660650 commit 9c157a4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ final class ShuffleBlockFetcherIterator(
val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
var localBlockBytes = 0L
var hostLocalBlockBytes = 0L
var remoteBlockBytes = 0L

val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
for ((address, blockInfos) <- blocksByAddress) {
Expand All @@ -379,14 +378,14 @@ final class ShuffleBlockFetcherIterator(
hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3))
hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
} else {
remoteBlockBytes += blockInfos.map(_._2).sum
val (_, timeCost) = Utils.timeTakenMs[Unit] {
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
}
logDebug(s"Collected remote fetch requests for $address in $timeCost ms")
}
}
val numRemoteBlocks = collectedRemoteRequests.map(_.blocks.size).sum
val (remoteBlockBytes, numRemoteBlocks) =
collectedRemoteRequests.foldLeft((0L, 0))((x, y) => (x._1 + y.size, x._2 + y.blocks.size))
val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size + numRemoteBlocks,
s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the number of local " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import io.netty.util.internal.OutOfDirectMemoryError
import org.apache.log4j.Level
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.stubbing.Answer
Expand Down Expand Up @@ -467,6 +468,29 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
assert(numResults == 6)
}

test("SPARK-35910: Update remoteBlockBytes based on merged fetch request") {
val blockManager = createMockBlockManager()
val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
val remoteBlocks = Seq[ShuffleBlockId](
ShuffleBlockId(0, 3, 0),
ShuffleBlockId(0, 3, 1),
ShuffleBlockId(0, 3, 2),
ShuffleBlockId(0, 4, 1))
val remoteBlockList = remoteBlocks.map(id => (id, id.mapId * 31L + id.reduceId, 1)).toSeq
val expectedSizeInBytes = Utils.bytesToString(remoteBlockList.map(_._2).sum)
val appender = new LogAppender(expectedSizeInBytes)
withLogAppender(appender, level = Some(Level.INFO)) {
createShuffleBlockIteratorWithDefaults(
Map(remoteBmId -> remoteBlockList),
blockManager = Some(blockManager),
doBatchFetch = true
)
}
assert(appender.loggingEvents.exists(
_.getRenderedMessage.contains(s"2 ($expectedSizeInBytes) remote blocks")),
"remote blocks should be merged to 2 blocks and kept the actual size")
}

test("fetch continuous blocks in batch should respect maxBlocksInFlightPerAddress") {
// Make sure remote blocks would return the merged block
val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 1)
Expand Down

0 comments on commit 9c157a4

Please sign in to comment.