Skip to content

Commit

Permalink
[SPARK-19355][SQL] Use map output statistics to improve global limit'…
Browse files Browse the repository at this point in the history
…s parallelism

## What changes were proposed in this pull request?

A logical `Limit` is performed physically by two operations `LocalLimit` and `GlobalLimit`.

Most of time, we gather all data into a single partition in order to run `GlobalLimit`. If we use a very big limit number, shuffling data causes performance issue also reduces parallelism.

We can avoid shuffling into single partition if we don't care data ordering. This patch implements this idea by doing a map stage during global limit. It collects the info of row numbers at each partition. For each partition, we locally retrieves limited data without any shuffling to finish this global limit.

For example, we have three partitions with rows (100, 100, 50) respectively. In global limit of 100 rows, we may take (34, 33, 33) rows for each partition locally. After global limit we still have three partitions.

If the data partition has certain ordering, we can't distribute required rows evenly to each partitions because it could change data ordering. But we still can avoid shuffling.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16677 from viirya/improve-global-limit-parallelism.
  • Loading branch information
viirya authored and hvanhovell committed Aug 10, 2018
1 parent 9abe09b commit 4f17585
Show file tree
Hide file tree
Showing 26 changed files with 322 additions and 140 deletions.
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, 0);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -167,7 +167,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
}

@VisibleForTesting
Expand Down
Expand Up @@ -248,7 +248,8 @@ void closeAndWriteOutput() throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
}

@VisibleForTesting
Expand Down
Expand Up @@ -23,5 +23,9 @@ package org.apache.spark
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
* @param recordsByPartitionId number of output records for each map output partition
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
private[spark] class MapOutputStatistics(
val shuffleId: Int,
val bytesByPartitionId: Array[Long],
val recordsByPartitionId: Array[Long])
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Expand Up @@ -522,16 +522,19 @@ private[spark] class MapOutputTrackerMaster(
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
val recordsByMapTask = new Array[Long](statuses.length)

val parallelAggThreshold = conf.get(
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
val parallelism = math.min(
Runtime.getRuntime.availableProcessors(),
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt
if (parallelism <= 1) {
for (s <- statuses) {
statuses.zipWithIndex.foreach { case (s, index) =>
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
recordsByMapTask(index) = s.numberOfOutput
}
} else {
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
Expand All @@ -548,8 +551,11 @@ private[spark] class MapOutputTrackerMaster(
} finally {
threadPool.shutdown()
}
statuses.zipWithIndex.foreach { case (s, index) =>
recordsByMapTask(index) = s.numberOfOutput
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
new MapOutputStatistics(dep.shuffleId, totalSizes, recordsByMapTask)
}
}

Expand Down
43 changes: 31 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.util.Utils

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* task ran on, the sizes of outputs for each reducer, and the number of outputs of the map task,
* for passing on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
/** Location where this task was run. */
Expand All @@ -44,18 +45,23 @@ private[spark] sealed trait MapStatus {
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
*/
def getSizeForBlock(reduceId: Int): Long

/**
* The number of outputs for the map task.
*/
def numberOfOutput: Long
}


private[spark] object MapStatus {

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long): MapStatus = {
if (uncompressedSizes.length > Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
HighlyCompressedMapStatus(loc, uncompressedSizes, numOutput)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
new CompressedMapStatus(loc, uncompressedSizes, numOutput)
}
}

Expand Down Expand Up @@ -98,29 +104,34 @@ private[spark] object MapStatus {
*/
private[spark] class CompressedMapStatus(
private[this] var loc: BlockManagerId,
private[this] var compressedSizes: Array[Byte])
private[this] var compressedSizes: Array[Byte],
private[this] var numOutput: Long)
extends MapStatus with Externalizable {

protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1) // For deserialization only

def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.map(MapStatus.compressSize))
def this(loc: BlockManagerId, uncompressedSizes: Array[Long], numOutput: Long) {
this(loc, uncompressedSizes.map(MapStatus.compressSize), numOutput)
}

override def location: BlockManagerId = loc

override def numberOfOutput: Long = numOutput

override def getSizeForBlock(reduceId: Int): Long = {
MapStatus.decompressSize(compressedSizes(reduceId))
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeLong(numOutput)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
numOutput = in.readLong()
val len = in.readInt()
compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
Expand All @@ -143,17 +154,20 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private var hugeBlockSizes: Map[Int, Byte])
private var hugeBlockSizes: Map[Int, Byte],
private[this] var numOutput: Long)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, null) // For deserialization only
protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc

override def numberOfOutput: Long = numOutput

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
Expand All @@ -168,6 +182,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeLong(numOutput)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeInt(hugeBlockSizes.size)
Expand All @@ -179,6 +194,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
numOutput = in.readLong()
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
Expand All @@ -194,7 +210,10 @@ private[spark] class HighlyCompressedMapStatus private (
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
def apply(
loc: BlockManagerId,
uncompressedSizes: Array[Long],
numOutput: Long): HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
Expand Down Expand Up @@ -235,6 +254,6 @@ private[spark] object HighlyCompressedMapStatus {
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizesArray.toMap)
hugeBlockSizesArray.toMap, numOutput)
}
}
Expand Up @@ -70,7 +70,8 @@ private[spark] class SortShuffleWriter[K, V, C](
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths,
writeMetrics.recordsWritten)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
Expand Down
Expand Up @@ -233,6 +233,7 @@ public void writeEmptyIterator() throws Exception {
writer.write(Iterators.emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertEquals(0, mapStatus.get().numberOfOutput());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().recordsWritten());
Expand All @@ -252,6 +253,7 @@ public void writeWithoutSpilling() throws Exception {
writer.write(dataToWrite.iterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertEquals(NUM_PARTITITONS, mapStatus.get().numberOfOutput());
assertTrue(mergedOutputFile.exists());

long sumOfPartitionSizes = 0;
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Expand Up @@ -62,9 +62,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1000L, 10000L)))
Array(1000L, 10000L), 10))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(10000L, 1000L)))
Array(10000L, 1000L), 10))
val statuses = tracker.getMapSizesByExecutorId(10, 0)
assert(statuses.toSet ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))),
Expand All @@ -84,9 +84,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val compressedSize1000 = MapStatus.compressSize(1000L)
val compressedSize10000 = MapStatus.compressSize(10000L)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
Array(compressedSize1000, compressedSize10000), 10))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
Array(compressedSize10000, compressedSize1000), 10))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty)
assert(0 == tracker.getNumCachedSerializedBroadcast)
Expand All @@ -107,9 +107,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val compressedSize1000 = MapStatus.compressSize(1000L)
val compressedSize10000 = MapStatus.compressSize(10000L)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
Array(compressedSize1000, compressedSize1000, compressedSize1000), 10))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
Array(compressedSize10000, compressedSize1000, compressedSize1000), 10))

assert(0 == tracker.getNumCachedSerializedBroadcast)
// As if we had two simultaneous fetch failures
Expand Down Expand Up @@ -145,7 +145,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {

val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("a", "hostA", 1000), Array(1000L)))
BlockManagerId("a", "hostA", 1000), Array(1000L), 10))
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
Expand Down Expand Up @@ -182,7 +182,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// Message size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 0))
val senderAddress = RpcAddress("localhost", 12345)
val rpcCallContext = mock(classOf[RpcCallContext])
when(rpcCallContext.senderAddress).thenReturn(senderAddress)
Expand Down Expand Up @@ -216,11 +216,11 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// on hostB with output size 3
tracker.registerShuffle(10, 3)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L)))
Array(2L), 1))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(2L)))
Array(2L), 1))
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(3L)))
Array(3L), 1))

// When the threshold is 50%, only host A should be returned as a preferred location
// as it has 4 out of 7 bytes of output.
Expand Down Expand Up @@ -260,7 +260,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 0))
}
val senderAddress = RpcAddress("localhost", 12345)
val rpcCallContext = mock(classOf[RpcCallContext])
Expand Down Expand Up @@ -309,9 +309,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(size0, size1000, size0, size10000)))
Array(size0, size1000, size0, size10000), 1))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(size10000, size0, size1000, size0)))
Array(size10000, size0, size1000, size0), 1))
assert(tracker.containsShuffle(10))
assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
Seq(
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Expand Up @@ -391,6 +391,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
assert(mapOutput2.isDefined)
assert(mapOutput1.get.location === mapOutput2.get.location)
assert(mapOutput1.get.getSizeForBlock(0) === mapOutput1.get.getSizeForBlock(0))
assert(mapOutput1.get.numberOfOutput === mapOutput2.get.numberOfOutput)

// register one of the map outputs -- doesn't matter which one
mapOutput1.foreach { case mapStatus =>
Expand Down
Expand Up @@ -423,17 +423,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success, makeMapStatus("hostB", 1))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), 1)),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
Expand Down Expand Up @@ -2576,7 +2576,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

object DAGSchedulerSuite {
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 1)

def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
Expand Down

0 comments on commit 4f17585

Please sign in to comment.