From a7a4cb9e8ca5053e9e82ed72ef02e56003ebe3c2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 24 Apr 2015 15:03:57 -0700 Subject: [PATCH 1/2] Added extra method to BlockGenerator. --- .../spark/streaming/receiver/BlockGenerator.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index f4963a78e1d18..adf8f80b0336e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -126,6 +126,20 @@ private[streaming] class BlockGenerator( listener.onAddData(data, metadata) } + /** + * Push multiple data items into the buffer. After buffering the data, the + * `BlockGeneratorListener.onAddData` callback will be called. All received data items + * will be periodically pushed into BlockManager. Note that all the data items is guaranteed + * to be present in a single block. + */ + def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any) = synchronized { + dataIterator.foreach { data => + waitToPush() + currentBuffer += data + } + listener.onAddData(dataIterator, metadata) + } + /** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { From a35cf7df9d028b5cc4f5b184c88ffea637ec2a39 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 28 Apr 2015 15:57:32 -0700 Subject: [PATCH 2/2] Fixed style. --- .../org/apache/spark/streaming/receiver/BlockGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index adf8f80b0336e..4bebcc5aa7ca0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -132,7 +132,7 @@ private[streaming] class BlockGenerator( * will be periodically pushed into BlockManager. Note that all the data items is guaranteed * to be present in a single block. */ - def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any) = synchronized { + def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized { dataIterator.foreach { data => waitToPush() currentBuffer += data