[SPARK-22098][CORE] Add new method aggregateByKeyLocally in RDD#19317
[SPARK-22098][CORE] Add new method aggregateByKeyLocally in RDD#19317ConeyLiu wants to merge 4 commits intoapache:masterfrom
Conversation
|
cc @VinceShieh |
|
Can one of the admins verify this patch? |
|
cc @WeichenXu123 mind take a look? |
WeichenXu123
left a comment
There was a problem hiding this comment.
Current implementation of treeAggregate already aggregate locally on each partition and then send result to driver. So I don't understand what's the difference of your impl between current impl and why your impl is faster. Can you verify the perf on more dataset and post the perf result?
|
And I have to point out that your impl have high risk causing OOM. The current impl will auto spill when local hashmap is too large and can take advantage of spark unified memory management mechanism. |
|
Nice catch. thanks. the perf gain is truly narrow. |
|
Yes. I guess the perf gain is because, this PR use local hashmap which can use unlimited memory, but current spark aggregation impl, will auto spill local hashmap when exceeding a threshold. |
|
@jiangxb1987 ,@WeichenXu123, thanks for your reviewing. This change is inspired by the // TODO: Calling aggregateByKey and collect creates two stages, we can implement something
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd
.map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2)))
}.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))(
seqOp = {
case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
requireValues(features)
BLAS.axpy(weight, features, featureSum)
(weightSum + weight, featureSum)
},
combOp = {
case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
BLAS.axpy(1.0, featureSum2, featureSum1)
(weightSum1 + weightSum2, featureSum1)
}).collect().sortBy(_._1)
|
|
Oh I get your point. This is different from |
|
Does not |
|
@ConeyLiu Yes tree aggregate introduce extra shuffle. But it is possible to improve perf when driver total collecting data size from executors are large and there're many partitions. |
|
OK, just keep it. Does this need more test or more improvements ? |
|
It is better adding more perf test for |
|
Test case: test("performance of aggregateByKeyLocally ") {
val random = new Random(1)
val pairs = sc.parallelize(0 until 10000000, 20)
.map(p => (random.nextInt(100), p))
.persist(StorageLevel.MEMORY_ONLY)
pairs.count()
val start = System.currentTimeMillis()
// val jHashMap = pairs.aggregateByKeyLocallyWithJHashMap(new HashSet[Int]())(_ += _, _ ++= _).toArray
val openHashMap = pairs.aggregateByKeyLocally(new HashSet[Int]())(_ += _, _ ++= _).toArray
println(System.currentTimeMillis() - start)
}Test result:
Looks almost the same performance. |
|
Hi @WeichenXu123, any comments on this? |
|
|
||
| /** | ||
| * Aggregate the values of each key, using given combine functions and a neutral "zero value". | ||
| * This function can return a different result type, U, than the type of the values in this RDD, |
There was a problem hiding this comment.
doesn't aggregateByKey perform map side combine?
There was a problem hiding this comment.
Yeah, it will. Here the 'difference' means it directly returns a map to the driver rather than an RDD.
There was a problem hiding this comment.
what's the difference between aggregateByKeyLocally and aggregateByKey(...).toLocalIterator?
There was a problem hiding this comment.
aggregateByKey(...).toLocalIterator need a shuffle for aggregateByKey and then collect the RDD to driver as a iterator. But aggregateByKeyLocally seems like the aggregateByKey, while there isn't a shuffle. It calculates the combines in each task and then collect all the map direcly to driver and do the finally combines on driver.
There was a problem hiding this comment.
collect all the
mapdirectly to driver
technically it's a shuffle too, and generally aggregateByKey is better for the following reasons:
- The final combine is executed on multiple reducers, which has better parallelism than doing it on the driver.
- We should always prefer doing computation on executors instead of the driver, because the driver is responsible for scheduling and has a high cost of failure recovery.
- using spark shuffle is better for fault tolerant. If one reducer failed, you don't need to rerun all mappers.
So I'm -1 on this API. If there are special cases we wanna to local aggregate, just call RDD.map.collect and do the local aggregate manually.
There was a problem hiding this comment.
BTW collect all the map directly to driver may easily OOM the driver, while shuffling to multiple reducers can reduce the memory pressure. Even if only shuffle to one reducer, it still better as the executor usually have more memory than the driver.
There was a problem hiding this comment.
thansk for the advance, I'll close it and try mapPartitions(...).collect in NaiveBayes.
What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-22096
NaiveBayes currently takes aggreateByKey followed by a collect to calculate frequency for each feature/label. We can implement a new function 'aggregateByKeyLocally' in RDD that merges locally on each mapper before sending results to a reducer to save one stage.
We tested on NaiveBayes and see ~20% performance gain with these changes.
This is a subtask of our improvement.
How was this patch tested?
New UT.