Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/benchmarks/MapStatusesConvertBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
================================================================================================
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1025-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1330 1359 26 0.0 1329827185.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2648 2666 20 0.0 2647944453.0 0.5X
Num Maps: 50000 Fetch partitions:1500 4155 4436 383 0.0 4154563448.0 0.3X


50 changes: 25 additions & 25 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ private[spark] object MapOutputTracker extends Logging {
mapStatuses: Array[MapStatus],
startMapIndex : Int,
endMapIndex: Int,
mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
mergeStatusesOpt: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = {
assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]]
var enableBatchFetch = true
Expand All @@ -1608,39 +1608,39 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle,
// TODO: improve push based shuffle to read partial merged blocks satisfying the start/end
// TODO: map indexes
if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0
if (mergeStatusesOpt.exists(_.exists(_ != null)) && startMapIndex == 0
&& endMapIndex == mapStatuses.length) {
enableBatchFetch = false
logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.")
// We have MergeStatus and full range of mapIds are requested so return a merged block.
val numMaps = mapStatuses.length
mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach {
case (mergeStatus, partId) =>
val remainingMapStatuses = if (mergeStatus != null && mergeStatus.totalSize > 0) {
// If MergeStatus is available for the given partition, add location of the
// pre-merged shuffle partition for this partition ID. Here we create a
// ShuffleMergedBlockId to indicate this is a merged shuffle block.
splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId),
mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
// For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
// shuffle partition blocks, fetch the original map produced shuffle partition blocks
val mapStatusesWithIndex = mapStatuses.zipWithIndex
mergeStatus.getMissingMaps(numMaps).map(mapStatusesWithIndex)
} else {
// If MergeStatus is not available for the given partition, fall back to
// fetching all the original mapper shuffle partition blocks
mapStatuses.zipWithIndex.toSeq
}
// Add location for the mapper shuffle partition blocks
for ((mapStatus, mapIndex) <- remainingMapStatuses) {
validateStatus(mapStatus, shuffleId, partId)
val mergeStatuses = mergeStatusesOpt.get
for (partId <- startPartition until endPartition) {
val mergeStatus = mergeStatuses(partId)
if (mergeStatus != null && mergeStatus.totalSize > 0) {
// If MergeStatus is available for the given partition, add location of the
// pre-merged shuffle partition for this partition ID. Here we create a
// ShuffleMergedBlockId to indicate this is a merged shuffle block.
splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) +=
((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId),
mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
}
}

// Add location for the mapper shuffle partition blocks
for ((mapStatus, mapIndex) <- mapStatuses.iterator.zipWithIndex) {
validateStatus(mapStatus, shuffleId, startPartition)
for (partId <- startPartition until endPartition) {
// For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper
// shuffle partition blocks, fetch the original map produced shuffle partition blocks
val mergeStatus = mergeStatuses(partId)
if (mergeStatus == null || mergeStatus.totalSize == 0 ||
!mergeStatus.tracker.contains(mapIndex)) {
val size = mapStatus.getSizeForBlock(partId)
if (size != 0) {
splitsByAddress.getOrElseUpdate(mapStatus.location, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapStatus.mapId, partId), size, mapIndex))
}
}
}
}
} else {
val iter = mapStatuses.iterator.zipWithIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ private[spark] class MergeStatus(

def tracker: RoaringBitmap = mapTracker

/**
* Get the list of mapper IDs for missing mapper partition blocks that are not merged.
* The reducer will use this information to decide which shuffle partition blocks to
* fetch in the original way.
*/
def getMissingMaps(numMaps: Int): Seq[Int] = {
(0 until numMaps).filter(i => !mapTracker.contains(i))
}

/**
* Get the number of missing map outputs for missing mapper partition blocks that are not merged.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus, MergeStatus}
import org.apache.spark.storage.BlockManagerId

/**
* Benchmark to measure performance for converting mapStatuses and mergeStatuses.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar>
* 2. build/sbt "core/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
* Results will be written to "benchmarks/MapStatusesConvertBenchmark-results.txt".
* }}}
* */
object MapStatusesConvertBenchmark extends BenchmarkBase {

private def convertMapStatus(numIters: Int): Unit = {

val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)

val blockManagerNumber = 1000
val mapNumber = 50000
val shufflePartitions = 10000

val shuffleId: Int = 0
// First reduce task will fetch map data from startPartition to endPartition
val startPartition = 0
val startMapIndex = 0
val endMapIndex = mapNumber
val blockManagers = Array.tabulate(blockManagerNumber) { i =>
BlockManagerId("a", "host" + i, 7337)
}
val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId =>
HighlyCompressedMapStatus(
blockManagers(mapTaskId % blockManagerNumber),
Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
mapTaskId)
}
val bitmap = new RoaringBitmap()
Range(0, 4000).foreach(bitmap.add(_))
val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100)
}

Array(499, 999, 1499).foreach { endPartition =>
benchmark.addCase(
s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}",
numIters) { _ =>
MapOutputTracker.convertMapStatuses(
shuffleId,
startPartition,
endPartition,
mapStatuses,
startMapIndex,
endMapIndex,
Some(mergeStatuses))
}
}

benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val numIters = 3
runBenchmark("MapStatuses Convert Benchmark") {
convertMapStatus(numIters)
}
}
}