Skip to content

Commit

Permalink
Replace SynchronizeQueue with synchronized access to a Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen committed Feb 9, 2016
1 parent d9ba4d2 commit 75bafe6
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.examples.streaming

import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.Queue

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
Expand All @@ -34,7 +34,7 @@ object QueueStream {

// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
val rddQueue = new SynchronizedQueue[RDD[Int]]()
val rddQueue = new Queue[RDD[Int]]()

// Create the QueueInputDStream and use it do some processing
val inputStream = ssc.queueStream(rddQueue)
Expand All @@ -45,7 +45,9 @@ object QueueStream {

// Create and push some RDDs into
for (i <- 1 to 30) {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
}
Thread.sleep(1000)
}
ssc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class StreamingContext private[streaming] (
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
* @param queue Queue of RDDs
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
Expand All @@ -477,7 +477,7 @@ class StreamingContext private[streaming] (
* NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
* those RDDs, so `queueStream` doesn't support checkpointing.
*
* @param queue Queue of RDDs
* @param queue Queue of RDDs. Modifications to this data structure must be synchronized.
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
* Set as null if no RDD should be returned when empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ class QueueInputDStream[T: ClassTag](

override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
buffer ++= queue.dequeueAll(_ => true)
queue.synchronized {
if (oneAtATime && queue.nonEmpty) {
buffer += queue.dequeue()
} else {
buffer ++= queue
queue.clear()
}
}
if (buffer.size > 0) {
if (buffer.nonEmpty) {
if (oneAtATime) {
Some(buffer.head)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable
import scala.language.postfixOps

import com.google.common.io.Files
Expand All @@ -40,7 +40,6 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
import org.apache.spark.util.{ManualClock, Utils}

class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Expand All @@ -67,7 +66,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val expectedOutput = input.map(_.toString)
for (i <- 0 until input.size) {
for (i <- input.indices) {
testServer.send(input(i).toString + "\n")
Thread.sleep(500)
clock.advance(batchDuration.milliseconds)
Expand Down Expand Up @@ -102,8 +101,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output.length === expectedOutput.size)
for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
}
}
Expand Down Expand Up @@ -242,11 +241,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val queue = new SynchronizedQueue[RDD[String]]()
val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
Expand All @@ -256,9 +255,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]

val inputIterator = input.toIterator
for (i <- 0 until input.size) {
for (i <- input.indices) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
inputIterator.take(2).foreach { i =>
queue.synchronized {
queue += ssc.sparkContext.makeRDD(Seq(i))
}
}
clock.advance(batchDuration.milliseconds)
}
Thread.sleep(1000)
Expand All @@ -281,13 +284,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

test("queue input stream - oneAtATime = false") {
val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.nonEmpty)
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val queue = new SynchronizedQueue[RDD[String]]()
val queue = new mutable.Queue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
Expand All @@ -298,12 +301,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

// Enqueue the first 3 items (one by one), they should be merged in the next batch
val inputIterator = input.toIterator
inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
inputIterator.take(3).foreach { i =>
queue.synchronized {
queue += ssc.sparkContext.makeRDD(Seq(i))
}
}
clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)

// Enqueue the remaining items (again one by one), merged in the final batch
inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
inputIterator.foreach { i =>
queue.synchronized {
queue += ssc.sparkContext.makeRDD(Seq(i))
}
}
clock.advance(batchDuration.milliseconds)
Thread.sleep(1000)
}
Expand Down

0 comments on commit 75bafe6

Please sign in to comment.