Skip to content

Commit

Permalink
Merge pull request #3 from danking/tp_partitioning_2
Browse files Browse the repository at this point in the history
slightly simplify KeySortIterators
  • Loading branch information
tpoterba committed Aug 12, 2016
2 parents 384b492 + fc52a8c commit f4051f7
Showing 1 changed file with 19 additions and 56 deletions.
75 changes: 19 additions & 56 deletions src/main/scala/org/broadinstitute/hail/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,65 +589,28 @@ class RichPairTraversableOnce[K, V](val t: TraversableOnce[(K, V)]) extends AnyV
}
}


class KeySortIterator[T, K, V](it: Iterator[(K, V)])(implicit tOrd: Ordering[T], kOrd: Ordering[K],
ev: (K) => T) extends Iterator[(K, V)] {

val b = mutable.ArrayBuffer[(K, V)]()
b.clear()
var takingSortedIterator: Iterator[(K, V)] = Iterator()

override def hasNext: Boolean = takingSortedIterator.nonEmpty || b.nonEmpty || it.nonEmpty

def next: (K, V) = {
if (takingSortedIterator.hasNext)
takingSortedIterator.next()
else if (it.isEmpty) {
assert(b.nonEmpty)
if (b.size == 1) {
val toReturn = b.head
b.clear()
toReturn
}
else {
takingSortedIterator = b.sortBy(_._1).iterator
b.clear()
next()
}
} else if (b.isEmpty) {
b += it.next()
next()
} else {
val (k, v) = it.next()
val t = ev(k)
val lastT = ev(b.last._1)
assert(tOrd.gteq(t, lastT), "iterator not K-sorted")
if (tOrd.equiv(t, lastT)) {
b += ((k, v))
next()
} else {
if (b.size == 1) {
// common case
val last = b(0)
b(0) = (k, v)
last
} else {
takingSortedIterator = b.sortBy(_._1).iterator
b.clear()
b += ((k, v))
next()
}
}
}
}
}


class RichPairIterator[K, V](val it: Iterator[(K, V)]) extends AnyVal {

def localKeySort[T](implicit ord: Ordering[T], kOrd: Ordering[K], ev: (K) => T): Iterator[(K, V)] = {
new KeySortIterator[T, K, V](it)
}
lazyKSorted(it)
}

/**
* Precondition: the iterator is T-sorted. Moreover, ev must be monotonic. We lazily K-sort each block of
* T-equivalent elements.
*
*/
private def lazyKSorted[T](it: Iterator[(K, V)])
(implicit tOrd: Ordering[T], kOrd: Ordering[K], ev: (K) => T) : Iterator[(K, V)] =
if (it.isEmpty)
Iterator.empty
else {
val t = ev(it.next._1)
val (tEquivalents, greater) = it.span({ case (k, _) => tOrd.equiv(t, ev(k)) })
val kSorted = tEquivalents.toSeq.sortBy(_._1)
// NB: ++ is lazy in its second argument
kSorted.iterator ++ lazyKSorted(greater)
}

def sortedLeftJoinDistinct[W](other: Iterator[(K, W)])
(implicit ordering: Ordering[K]): Iterator[(K, (V, Option[W]))] = {
Expand Down

0 comments on commit f4051f7

Please sign in to comment.