Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22537][core] Aggregation of map output statistics on driver faces single point bottleneck #19763

Closed
wants to merge 13 commits into from
38 changes: 35 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Expand Up @@ -23,11 +23,14 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
Expand Down Expand Up @@ -472,16 +475,45 @@ private[spark] class MapOutputTrackerMaster(
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}

/**
* Try to equally divide Range(0, num) to divisor slices
*/
def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = {
assert(divisor > 0, "Divisor should be positive")
val (each, remain) = (num / divisor, num % divisor)
val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each)
Copy link
Contributor

@cloud-fan cloud-fan Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comment to describe the algorithm? I'd expect something like:

to equally divide n elements to m buckets
each bucket should have n/m elements
for the remaining n%m elements
pick the first n%m buckets and add one more element

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my proposal

def equallyDivide(numElements: Int, numBuckets: Int) {
  val elementsPerBucket = numElements / numBuckets
  val remaining = numElements % numBuckets
  if (remaining == 0) {
    0.until(num).grouped(elementsPerBucket)
  } else {
    val splitPoint = (elementsPerBucket + 1) * remaining
    0.to(splitPoint).grouped(elementsPerBucket + 1) ++ (splitPoint + 1).until(numElements).grouped(elementsPerBucket)
  }
}

if (each != 0) {
smaller.grouped(each) ++ bigger.grouped(each + 1)
} else {
bigger.grouped(each + 1)
}
}

/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
if (statuses.length * totalSizes.length <=
conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val parallelAggThreshold = ...
if (statuses.length * totalSizes.length < parallelAggThreshold)

for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
}
} else {
val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_CORES)
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-statistics")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put threadPool.shutdown in finally to shut down the thread pool

val executionContext = ExecutionContext.fromExecutor(threadPool)
val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map {
reduceIds => Future {
for (s <- statuses; i <- reduceIds) {
totalSizes(i) += s.getSizeForBlock(i)
}
} (executionContext)
}
ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf)
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Expand Up @@ -485,4 +485,20 @@ package object config {
"array in the sorter.")
.intConf
.createWithDefault(Integer.MAX_VALUE)

private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.adaptive.map.statistics.cores should also be a config entry like this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.adaptive.xxx already exists, will this be a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? I grep the code base but can't find it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's not a big problem, adaptive execution need both core and sql code, so both confs are needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. You showed me that spark.sql.adaptive.xxx have config entries, why spark.adaptive.map.statistics.cores doesn't need config entry?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.adaptive.map.statistics.cores needs config entry, but I thought adaptive.xxx item has been put under spark.sql. already, so it might be inconsitent. Now I think it's no big deal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a spark.shuffle.mapOutput.dispatcher.numThreads in this file without config entry, do I need to add one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea let's add it. BTW shall we also use mapOutput instead of mapOutputStatistics?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there are 3 confs like that... all need?

ConfigBuilder("spark.shuffle.mapOutputStatistics.parallelAggregationThreshold")
.internal()
.doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " +
"threshold.")
.intConf
.createWithDefault(10000000)

private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_CORES =
ConfigBuilder("spark.shuffle.mapOutputStatistics.cores")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cores -> parallelism

.internal()
.doc("The cores will be used during map output statistics parallel aggregation.")
.intConf
.createWithDefault(8)

}
23 changes: 23 additions & 0 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Expand Up @@ -275,4 +275,27 @@ class MapOutputTrackerSuite extends SparkFunSuite {
}
}

test("equally divide map statistics tasks") {
val func = newTrackerMaster().equallyDivide _
val cases = Seq((0, 5), (4, 5), (15, 5), (16, 5), (17, 5), (18, 5), (19, 5), (20, 5))
val expects = Seq(
Seq(0, 0, 0, 0, 0),
Seq(1, 1, 1, 1, 0),
Seq(3, 3, 3, 3, 3),
Seq(3, 3, 3, 3, 4),
Seq(3, 3, 3, 4, 4),
Seq(3, 3, 4, 4, 4),
Seq(3, 4, 4, 4, 4),
Seq(4, 4, 4, 4, 4))
cases.zip(expects).foreach { case ((num, divisor), expect) =>
val answer = func(num, divisor).toSeq
var wholeSplit = (0 until num)
answer.zip(expect).foreach { case (split, expectSplitLength) =>
val (currentSplit, rest) = wholeSplit.splitAt(expectSplitLength)
assert(currentSplit.toSet == split.toSet)
wholeSplit = rest
}
}
}

}