[SPARK-31182][CORE][ML] PairRDD support aggregateByKeyWithinPartitions#27947
[SPARK-31182][CORE][ML] PairRDD support aggregateByKeyWithinPartitions#27947zhengruifeng wants to merge 2 commits intoapache:masterfrom
Conversation
| * @note V and C can be different -- for example, one might group an RDD of type | ||
| * (Int, Int) into an RDD of type (Int, Seq[Int]). | ||
| */ | ||
| def combineByKeyWithClassTagWithinPartitions[C]( |
There was a problem hiding this comment.
this impl follows combineByKeyWithClassTag (treat as if self.partitioner == Some(partitioner))
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}| }.reduceByKey { case (s1, s2) => s1.compress.merge(s2.compress) } | ||
| ).mapPartitionsWithIndex { case (pid, iter) => | ||
| val p = pid % scale | ||
| iter.map { case (col, s) => ((p, col), s.compress) } |
There was a problem hiding this comment.
here we can trigger compression at the map side
| ).mapPartitionsWithIndex { case (pid, iter) => | ||
| val p = pid % scale | ||
| iter.map { case (col, s) => ((p, col), s.compress) } | ||
| }.reduceByKey { case (s1, s2) => s1.merge(s2) |
There was a problem hiding this comment.
then no longer to confirm compression like s1.compress.merge(s2.compress)
|
Test build #119982 has finished for PR 27947 at commit
|
|
friendly ping @srowen |
|
Naive question from a newbie: you are introducing new methods for PairRDDs, so would it make sense to also expose them in the Java API (as methods of |
|
Well, I think that's part of the issue here - if it's public you kind of need to support it everywhere and for a long time. I don't know if it's worth it but I've lost the thread on this PR and would have to recall the motivation. |
|
I tend to close it, since I can always workaround it. Maybe it is not necessary. |
What changes were proposed in this pull request?
1, impl
aggregateByKeyWithinPartitionsandreduceByKeyWithinPartitions2, use
aggregateByKeyWithinPartitionsinRobustScalerWhy are the changes needed?
When implementing
RobustScaler, I was looking for a way to guarantee that theQuantileSummariesinaggregateByKeyare compressed at the map side.(before
mergeandqurey, theQuantileSummariesmust be compressed)Then I only found a tricky method to work around (yet not applied), and there was no method for this.
previous discussions were here
Does this PR introduce any user-facing change?
Yes, add new methods for PairRDD
How was this patch tested?
added testsuites and existing ones