Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-3461. Support external groupByKey using repartitionAndSortWithinPa... #3198

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 97 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Logging, Partitioner, RangePartitioner}
import org.apache.spark.{HashPartitioner, Logging, Partitioner, RangePartitioner}
import org.apache.spark.annotation.DeveloperApi

/**
Expand Down Expand Up @@ -58,8 +58,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] =
{
: RDD[(K, V)] = {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
Expand All @@ -76,4 +75,99 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

/**
* Group the values for each key in the RDD into a single sequence, and, within each partition,
* return the keys in sorted order. This replicates the semantics of the Hadoop MapReduce shuffle
* and avoids pulling all the records in a group into memory at once.
*
* Allows controlling the partitioning of the resulting key-value pair RDD by passing a
* Partitioner.
*
* Note: When iterating over the resulting records, the iterator for a group is rendered invalid
* after moving on to the next group.
*/
def groupByKeyAndSortWithinPartitions(): RDD[(K, Iterable[V])] = {
groupByKeyAndSortWithinPartitions(Partitioner.defaultPartitioner(self))
}

/**
* Group the values for each key in the RDD into a single sequence, and, within each partition,
* return the keys in sorted order. This replicates the semantics of the Hadoop MapReduce shuffle
* and avoids pulling all the records in a group into memory at once.
*
* Allows controlling the partitioning of the resulting key-value pair RDD by passing a
* Partitioner.
*
* Note: When iterating over the resulting records, the iterator for a group is rendered invalid
* after moving on to the next group.
*/
def groupByKeyAndSortWithinPartitions(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKeyAndSortWithinPartitions(new HashPartitioner(numPartitions))
}

/**
* Group the values for each key in the RDD into a single sequence, and, within each partition,
* return the keys in sorted order. This replicates the semantics of the Hadoop MapReduce shuffle
* and avoids pulling all the records in a group into memory at once.
*
* Allows controlling the partitioning of the resulting key-value pair RDD by passing a
* Partitioner.
*
* Note: When iterating over the resulting records, the iterator for a group is rendered invalid
* after moving on to the next group.
*/
def groupByKeyAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
val shuffled = repartitionAndSortWithinPartitions(partitioner)
shuffled.mapPartitions(iter => {
new Iterator[(K, Iterable[V])] with Serializable {
var cur: (K, V) = if (iter.hasNext) iter.next() else null
var curIter: Iterator[V] = null

private def groupIterator(key: K) = new Iterator[V] with Serializable {
def next(): V = {
if (cur._1 == null) {
throw new NoSuchElementException("Next on empty iterator.")
}
if (cur._1 != key) {
throw new IllegalStateException(
"Can't use group iterator after moving to next key.")
}

val ret = cur
cur = if (iter.hasNext) iter.next() else null
ret._2
}

def hasNext(): Boolean = cur != null && cur._1 == key
}

/** Move on to the next key. */
private def drainCur() {
if (curIter != null) {
while (curIter.hasNext) {
curIter.next()
}
}
}

def hasNext(): Boolean = {
drainCur()
cur == null
}

def next(): (K, Iterable[V]) = {
drainCur()
if (cur == null) {
throw new NoSuchElementException("Next on empty iterator.")
}
curIter = groupIterator(cur._1)
(cur._1, new Iterable[V] with Serializable {
val iter = curIter
def iterator = iter
})
}
}
})
}

}