From 4e3f715941f94cb2467ca68b205a5fa3630130a3 Mon Sep 17 00:00:00 2001 From: surq Date: Wed, 10 Dec 2014 18:49:54 +0800 Subject: [PATCH 1/2] Print the specified number of data and handle all of the elements in RDD --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++++++ .../apache/spark/streaming/dstream/DStream.scala | 14 ++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 214f22bc5b603..8a1771c6a5f38 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -787,6 +787,18 @@ abstract class RDD[T: ClassTag]( sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) } + /** + * Take all of the elements in this RDD, and print the first num elements of the RDD. + */ + def printTop (num: Int): Array[T] = { + val buf = new ArrayBuffer[T] + val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) + for (partition <- results; data <- partition if buf.size <= num){ + buf += data + } + buf.toArray + } + /** * Return an array that contains all of the elements in this RDD. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index dbf1ebbaf653a..7c8f3a68014a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -618,6 +618,20 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } + /** + * Print the first specified number elements of each RDD in this DStream. + */ + def printTop(num: Int) { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.printTop(num) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(num).foreach(println) + } + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + } + /** * Return a new DStream in which each RDD contains all the elements in seen in a * sliding window of time over this DStream. The new DStream generates RDDs with From 411b28709b55cfa94ebd04ced6d67df997ebf467 Mon Sep 17 00:00:00 2001 From: surq Date: Thu, 11 Dec 2014 16:18:38 +0800 Subject: [PATCH 2/2] change method's name --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8a1771c6a5f38..73092993ca531 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -788,9 +788,9 @@ abstract class RDD[T: ClassTag]( } /** - * Take all of the elements in this RDD, and print the first num elements of the RDD. + * Process all of the elements in this RDD, and take the first num elements of the RDD. */ - def printTop (num: Int): Array[T] = { + def processAllAndTake (num: Int): Array[T] = { val buf = new ArrayBuffer[T] val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) for (partition <- results; data <- partition if buf.size <= num){ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 7c8f3a68014a6..86e0de504de11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -621,9 +621,9 @@ abstract class DStream[T: ClassTag] ( /** * Print the first specified number elements of each RDD in this DStream. */ - def printTop(num: Int) { + def processAllAndPrintFirst(num: Int) { def foreachFunc = (rdd: RDD[T], time: Time) => { - val first11 = rdd.printTop(num) + val first11 = rdd.processAllAndTake(num) println ("-------------------------------------------") println ("Time: " + time) println ("-------------------------------------------")