Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

SPY-287 updated streaming iterable #6

Merged
merged 1 commit into from

3 participants

@markhamstra

Pulls in the current state of https://github.com/apache/incubator-spark/pull/421, allowing n-partition jobs instead of only one partition at a time.

@tbfenet tbfenet updated streaming iterable
Conflicts:
	core/src/main/scala/org/apache/spark/rdd/RDD.scala
	core/src/main/scala/org/apache/spark/util/RDDiterable.scala
ede65ec
@jhartlaub jhartlaub merged commit 12280b5 into clearstorydata:master-csd
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 21, 2014
  1. @tbfenet @markhamstra

    updated streaming iterable

    tbfenet authored markhamstra committed
    Conflicts:
    	core/src/main/scala/org/apache/spark/rdd/RDD.scala
    	core/src/main/scala/org/apache/spark/util/RDDiterable.scala
This page is out of date. Refresh to see the latest.
View
16 core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -38,7 +38,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{RDDiterable, Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue}
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -576,8 +576,6 @@ abstract class RDD[T: ClassManifest](
sc.runJob(this, (iter: Iterator[T]) => f(iter))
}
-
-
/**
* Return an array that contains all of the elements in this RDD.
*/
@@ -599,14 +597,16 @@ abstract class RDD[T: ClassManifest](
}
/**
- * Return iterable that lazily fetches partitions
- * @param prefetchPartitions How many partitions to prefetch. Larger value increases parallelism but also increases
- * driver memory requirement
+ * Return iterator that lazily fetches partitions
+ * @param prefetchPartitions How many partitions to prefetch. Larger value increases parallelism
+ * but also increases driver memory requirement.
+ * @param partitionBatchSize How many partitions fetch per job
* @param timeOut how long to wait for each partition fetch
* @return Iterable of every element in this RDD
*/
- def toIterable(prefetchPartitions: Int = 1, timeOut: Duration = Duration(30, TimeUnit.SECONDS)) = {
- new RDDiterable[T](this, prefetchPartitions, timeOut)
+ def toIterator(prefetchPartitions: Int = 1, partitionBatchSize: Int = 10,
+ timeOut: Duration = Duration(30, TimeUnit.SECONDS)):Iterator[T] = {
+ new RDDiterator[T](this, prefetchPartitions,partitionBatchSize, timeOut)
}
/**
View
76 core/src/main/scala/org/apache/spark/rdd/RDDiterator.scala
@@ -0,0 +1,76 @@
+package org.apache.spark.rdd
+
+import scala.concurrent.{Await, Future}
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+import scala.annotation.tailrec
+import scala.collection.mutable
+import org.apache.spark.rdd.RDDiterator._
+import org.apache.spark.FutureAction
+
+/**
+ * Iterable whose iterator iterates over all elements of an RDD without fetching all partitions
+ * to the driver process
+ *
+ * @param rdd RDD to iterate
+ * @param prefetchPartitions The number of partitions to prefetch.
+ * If <1 will not prefetch.
+ * partitions prefetched = min(prefetchPartitions, partitionBatchSize)
+ * @param partitionBatchSize How many partitions to fetch per job
+ * @param timeOut How long to wait for each partition before failing.
+ */
+class RDDiterator[T: ClassManifest](rdd: RDD[T], prefetchPartitions: Int, partitionBatchSize: Int,
+ timeOut: Duration)
+ extends Iterator[T] {
+
+ val batchSize = math.max(1,partitionBatchSize)
+ var partitionsBatches: Iterator[Seq[Int]] = Range(0, rdd.partitions.size).grouped(batchSize)
+ var pendingFetchesQueue = mutable.Queue.empty[Future[Seq[Seq[T]]]]
+ //add prefetchPartitions prefetch
+ 0.until(math.max(0, prefetchPartitions / batchSize)).foreach(x=>enqueueDataFetch())
+
+ var currentIterator: Iterator[T] = Iterator.empty
+ @tailrec
+ final def hasNext = {
+ if (currentIterator.hasNext) {
+ //Still values in the current partition
+ true
+ } else {
+ //Move on to the next partition
+ //Queue new prefetch of a partition
+ enqueueDataFetch()
+ if (pendingFetchesQueue.isEmpty) {
+ //No more partitions
+ currentIterator = Iterator.empty
+ false
+ } else {
+ val future = pendingFetchesQueue.dequeue()
+ currentIterator = Await.result(future, timeOut).flatMap(x => x).iterator
+ //Next partition might be empty so check again.
+ this.hasNext
+ }
+ }
+ }
+ def next() = {
+ hasNext
+ currentIterator.next()
+ }
+
+ def enqueueDataFetch() ={
+ if (partitionsBatches.hasNext) {
+ pendingFetchesQueue.enqueue(fetchData(partitionsBatches.next(), rdd))
+ }
+ }
+}
+
+object RDDiterator {
+ private def fetchData[T: ClassManifest](partitionIndexes: Seq[Int],
+ rdd: RDD[T]): FutureAction[Seq[Seq[T]]] = {
+ val results = new ArrayBuffer[Seq[T]]()
+ rdd.context.submitJob[T, Array[T], Seq[Seq[T]]](rdd,
+ x => x.toArray,
+ partitionIndexes,
+ (inx: Int, res: Array[T]) => results.append(res),
+ results.toSeq)
+ }
+}
View
59 core/src/main/scala/org/apache/spark/util/RDDiterable.scala
@@ -1,59 +0,0 @@
-package org.apache.spark.util
-
-import scala.collection.immutable.Queue
-import scala.concurrent.{Await, Future}
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.Duration
-import scala.annotation.tailrec
-import org.apache.spark.rdd.RDD
-
-/**Iterable whose iterator iterates over all elements of an RDD without fetching all partitions to the driver process
- *
- * @param rdd RDD to iterate
- * @param prefetchPartitions The number of partitions to prefetch
- * @param timeOut How long to wait for each partition before failing.
- * @tparam T
- */
-class RDDiterable[T: ClassManifest](rdd: RDD[T], prefetchPartitions: Int, timeOut: Duration) extends Serializable with Iterable[T] {
-
- def iterator = new Iterator[T] {
- var partitions = Range(0, rdd.partitions.size)
- var pendingFetches = Queue.empty.enqueue(partitions.take(prefetchPartitions).map(par => fetchData(par)))
- partitions = partitions.drop(prefetchPartitions)
- var currentIterator: Iterator[T] = Iterator.empty
- @tailrec
- def hasNext() = {
- if (currentIterator.hasNext) {
- true
- } else {
- pendingFetches = partitions.headOption.map {
- partitionNo =>
- pendingFetches.enqueue(fetchData(partitionNo))
- }.getOrElse(pendingFetches)
- partitions = partitions.drop(1)
-
- if (pendingFetches.isEmpty) {
- currentIterator = Iterator.empty
- false
- } else {
- val (future, pendingFetchesN) = pendingFetches.dequeue
- pendingFetches = pendingFetchesN
- currentIterator = Await.result(future, timeOut).iterator
- this.hasNext()
- }
- }
- }
- def next() = {
- hasNext()
- currentIterator.next()
- }
- }
- private def fetchData(partitionIndex: Int): Future[Seq[T]] = {
- val results = new ArrayBuffer[T]()
- rdd.context.submitJob[T, Array[T], Seq[T]](rdd,
- x => x.toArray,
- List(partitionIndex),
- (inx: Int, res: Array[T]) => results.appendAll(res),
- results.toSeq)
- }
-}
View
28 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -342,23 +342,33 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("toIterable") {
var nums = sc.makeRDD(Range(1, 1000), 100)
- assert(nums.toIterable(prefetchPartitions = 10).size === 999)
- assert(nums.toIterable().toArray === (1 to 999).toArray)
+ assert(nums.toIterator(prefetchPartitions = 10).size === 999)
+ assert(nums.toIterator().toArray === (1 to 999).toArray)
nums = sc.makeRDD(Range(1000, 1, -1), 100)
- assert(nums.toIterable(prefetchPartitions = 10).size === 999)
- assert(nums.toIterable(prefetchPartitions = 10).toArray === Range(1000, 1, -1).toArray)
+ assert(nums.toIterator(prefetchPartitions = 10).size === 999)
+ assert(nums.toIterator(prefetchPartitions = 10).toArray === Range(1000, 1, -1).toArray)
nums = sc.makeRDD(Range(1, 100), 1000)
- assert(nums.toIterable(prefetchPartitions = 10).size === 99)
- assert(nums.toIterable(prefetchPartitions = 10).toArray === Range(1, 100).toArray)
+ assert(nums.toIterator(prefetchPartitions = 10).size === 99)
+ assert(nums.toIterator(prefetchPartitions = 10).toArray === Range(1, 100).toArray)
nums = sc.makeRDD(Range(1, 1000), 100)
- assert(nums.toIterable(prefetchPartitions = -1).size === 999)
- assert(nums.toIterable().toArray === (1 to 999).toArray)
- }
+ assert(nums.toIterator(prefetchPartitions = -1).size === 999)
+ assert(nums.toIterator().toArray === (1 to 999).toArray)
+
+ nums = sc.makeRDD(Range(1, 1000), 100)
+ assert(nums.toIterator(prefetchPartitions = 3,partitionBatchSize = 10).size === 999)
+ assert(nums.toIterator().toArray === (1 to 999).toArray)
+ nums = sc.makeRDD(Range(1, 1000), 100)
+ assert(nums.toIterator(prefetchPartitions = -1,partitionBatchSize = 0).size === 999)
+ assert(nums.toIterator().toArray === (1 to 999).toArray)
+ nums = sc.makeRDD(Range(1, 1000), 100)
+ assert(nums.toIterator(prefetchPartitions = -1).size === 999)
+ assert(nums.toIterator().toArray === (1 to 999).toArray)
+ }
test("take") {
var nums = sc.makeRDD(Range(1, 1000), 1)
Something went wrong with that request. Please try again.