New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22537][core] Aggregation of map output statistics on driver faces single point bottleneck #19763
Conversation
} | ||
mapStatusSubmitTasks.foreach(_.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part can be simplified by using scala's Future,
val futureArray = equallyDivide(totalSizes.length, taskSlices).map {
reduceIds => Future {
// whatever you want to do here
}
}
Await.result(Future.sequence(futureArray), Duration.Inf) // or some timeout value you prefer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, thx!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I use the scala.concurrent.ExecutionContext.Implicits.global
ExecutionContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use scala.concurrent.ExecutionContext.Implicits.global
. You need to create a thread pool.
my question is "how many times we have seen this operation of collecting statistics is the bottleneck?" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also show some benchmark numbers to demonstrate it is a bottleneck?
totalSizes(i) += s.getSizeForBlock(i) | ||
} | ||
val mapStatusSubmitTasks = ArrayBuffer[Future[_]]() | ||
var taskSlices = parallelism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why var
?
Seems like not a big deal for the end-to-end performance? |
Looks not a significant difference. |
Actually, the time gap is O(number of mappers * shuffle partitions). In this case, number of mappers is not very large, while users are more likely to get slowed down when they run on a big data set. |
cc @zsxwing |
totalSizes(i) += s.getSizeForBlock(i) | ||
val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8) | ||
|
||
val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this is not cheap. I would add a config and only run this in multiple threads when #mapper * #shuffle_partitions
is large.
@@ -485,4 +485,13 @@ package object config { | |||
"array in the sorter.") | |||
.intConf | |||
.createWithDefault(Integer.MAX_VALUE) | |||
|
|||
private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_MULTITHREAD_THRESHOLD = | |||
ConfigBuilder("spark.shuffle.mapOutputStatisticsMultithreadThreshold") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.shuffle.mapOutputStatistics.parallelAggregationThreshold
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's better!
@@ -485,4 +485,13 @@ package object config { | |||
"array in the sorter.") | |||
.intConf | |||
.createWithDefault(Integer.MAX_VALUE) | |||
|
|||
private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.adaptive.map.statistics.cores
should also be a config entry like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.adaptive.xxx
already exists, will this be a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really? I grep the code base but can't find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's not a big problem, adaptive execution need both core and sql code, so both confs are needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it. You showed me that spark.sql.adaptive.xxx
have config entries, why spark.adaptive.map.statistics.cores
doesn't need config entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.adaptive.map.statistics.cores
needs config entry, but I thought adaptive.xxx item has been put under spark.sql.
already, so it might be inconsitent. Now I think it's no big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also a spark.shuffle.mapOutput.dispatcher.numThreads
in this file without config entry, do I need to add one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea let's add it. BTW shall we also use mapOutput
instead of mapOutputStatistics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there are 3 confs like that... all need?
.doc("Multi-thread is used when the number of mappers * shuffle partitions exceeds this " + | ||
"threshold") | ||
.intConf | ||
.createWithDefault(100000000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow 100 million is really a large threshold, how do you pick this number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I also think it's a little bit large... In the case I mentioned, the 5s gap is created by 10^8 of this value. Maybe 10^7 or 2*10^7 is good?
} | ||
} | ||
} else { | ||
val parallelism = conf.getInt("spark.adaptive.map.statistics.cores", 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this related to adaptive
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought only adaptive execution code will call this. But actually it seems after all ShuffleMapTask
s(which is common) of a stage completed this will be called, right?
.createWithDefault(10000000) | ||
|
||
private[spark] val SHUFFLE_MAP_OUTPUT_STATISTICS_CORES = | ||
ConfigBuilder("spark.shuffle.mapOutputStatistics.cores") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cores
-> parallelism
for (i <- 0 until totalSizes.length) { | ||
totalSizes(i) += s.getSizeForBlock(i) | ||
if (statuses.length * totalSizes.length <= | ||
conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLEL_AGGREGATION_THRESHOLD)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val parallelAggThreshold = ...
if (statuses.length * totalSizes.length < parallelAggThreshold)
def equallyDivide(num: Int, divisor: Int): Iterator[Seq[Int]] = { | ||
assert(divisor > 0, "Divisor should be positive") | ||
val (each, remain) = (num / divisor, num % divisor) | ||
val (smaller, bigger) = (0 until num).splitAt((divisor-remain) * each) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some comment to describe the algorithm? I'd expect something like:
to equally divide n elements to m buckets
each bucket should have n/m elements
for the remaining n%m elements
pick the first n%m buckets and add one more element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my proposal
def equallyDivide(numElements: Int, numBuckets: Int) {
val elementsPerBucket = numElements / numBuckets
val remaining = numElements % numBuckets
if (remaining == 0) {
0.until(num).grouped(elementsPerBucket)
} else {
val splitPoint = (elementsPerBucket + 1) * remaining
0.to(splitPoint).grouped(elementsPerBucket + 1) ++ (splitPoint + 1).until(numElements).grouped(elementsPerBucket)
}
}
} | ||
} else { | ||
val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM) | ||
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-statistics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please put threadPool.shutdown
in finally
to shut down the thread pool
} | ||
} | ||
} else { | ||
val parallelism = conf.get(SHUFFLE_MAP_OUTPUT_STATISTICS_PARALLELISM) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about setting parallelism = math.min(Runtime.getRuntime.availableProcessors(), statuses.length.toLong * totalSizes.length / parallelAggThreshold)
rather than introducing a new config, such as:
val parallelism = math.min(
Runtime.getRuntime.availableProcessors(),
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1)
if (parallelism <= 1) {
...
} else {
....
}
0.until(numElements).grouped(elementsPerBucket) | ||
} else { | ||
val splitPoint = (elementsPerBucket + 1) * remaining | ||
0.to(splitPoint).grouped(elementsPerBucket + 1) ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grouped
is expensive here. I saw it generates Vector
rather than Range
:
scala> (1 to 100).grouped(10).foreach(g => println(g.getClass))
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
class scala.collection.immutable.Vector
It means we need to generate all of numbers between 0 and numElements
. Could you implement a special grouped
for Range instead?
} | ||
ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) | ||
} finally { | ||
threadpool.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @zsxwing do we really need to shut down the thread pool every time? This method may be called many times and is it better to cache this thread pool? like the dispatcher thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, with putting the thread pool in the class, the only lost is that: even if when single-thread is used, this pool still exists. The gain is reducing creating the pool after every shuffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can shut down the pool after some certain idle time, but not sure if it's worth the complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine to create a thread pool every time since this code path seems not run pretty frequently because
- Using a shared cached thread poll is just like creating new thread pool since the idle time of a thread is pretty large and is likely killed before the next call.
- Using a shared fixed thread pool is totally a waste for most of use cases.
- The cost of creating threads is trivial comparing the total time of a job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gczsjdy could you fix the compile error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing Actually I built using sbt/mvn, no errors...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gczsjdy Oh, sorry. I didn't realize there is already a threadpool
field in MapOutputTrackerMaster
. That's why there is no error. Here you are shutting down a wrong thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! I misread it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My fault!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan We can shut down the pool after some certain idle time, but not sure if it's worth the complexity
I know we don't need to do this now. But if we did it how to do?
OK to test |
Yeah, that's just what the cached thread pool does :) |
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) | ||
val parallelism = math.min( | ||
Runtime.getRuntime.availableProcessors(), | ||
statuses.length * totalSizes.length / parallelAggThreshold + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statuses.length.toLong
. It's easy to overflow here.
@cloud-fan Seems Jenkins's not started? |
retest this please |
LGTM |
Test build #84134 has finished for PR 19763 at commit
|
ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") | ||
.internal() | ||
.doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + | ||
"or equal to this threshold.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this condition to enable parallel aggregation still true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but didn't get you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like only parallelism
>= 2, this parallel aggregation is enabled. Is it equal to the number of mappers * shuffle partitions >= this threshold
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From above statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1
, looks like we need to have at least two times of this threshold to enable this parallel aggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1
>= 2 -> statuses.length.toLong * totalSizes.length >= parallelAggThreshold
, so it doesn't need to be 2 times, just not smaller than 1x is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's necessary to indicate the actual parallelism's calculation way here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok. I misread the equation. Nvm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to indicate the calculation way in config description. The current one is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After rethinking about this, I think it is better to indicate this threshold also determines the number of threads in parallelism. So it should not be set to zero or negative number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I will add some.
} | ||
} | ||
} else { | ||
val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of parallelism
seems making us not fully utilize all processors at all time? E.g, if availableProcessors
returns 8, but parallelism
is 2, we pick 2 as number of threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to fully utilize all available processors. parallelAggThreshold
is default to be 10^7, which means a relatively small task to deal with. Therefore the tasks don't need to be cut smaller in most cases.
For some cases where the split is a big task, parallelAggThreshold
should be tuned. This is not very direct because you don't have a xx.parallelism
config to set, but the benefit is we introduced less configs.
LGTM |
for (i <- 0 until totalSizes.length) { | ||
totalSizes(i) += s.getSizeForBlock(i) | ||
val parallelAggThreshold = conf.get( | ||
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a little picky, but should we do:
val parallelAggThreshold = conf.get(
SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + 1
...
val parallelism = math.min(
Runtime.getRuntime.availableProcessors(),
(statuses.length.toLong * totalSizes.length + 1) / parallelAggThreshold + 1).toInt
In case of the threshold being set to zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For zero or negative threshold, see my above comment: #19763 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that code will make people confused, and we need more comments to explain, that seems unworthy.
In most cases the default value is enough, so we just add some value check and docs explanation will be good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I left the comment before #19763 (comment). I think it is good enough to add more comment to the config entry.
retest this please |
Test build #84161 has finished for PR 19763 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
In adaptive execution, the map output statistics of all mappers will be aggregated after previous stage is successfully executed. Driver takes the aggregation job while it will get slow when the number of
mapper * shuffle partitions
is large, since it only uses single thread to compute. This PR uses multi-thread to deal with this single point bottleneck.How was this patch tested?
Test cases are in
MapOutputTrackerSuite.scala