diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 655b161734d94..1b28336d5b55b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -18,8 +18,11 @@ package org.apache.spark.streaming.kafka import java.io.File +import java.util.Collections +import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -101,8 +104,7 @@ class DirectKafkaStreamSuite ssc, kafkaParams, topics) } - val allReceived = - new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] + val allReceived = new ConcurrentLinkedQueue[(String, String)] // hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array[OffsetRange]() @@ -131,11 +133,11 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + "didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n")) } ssc.stop() } @@ -173,8 +175,8 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] - stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } + val collectedData = new ConcurrentLinkedQueue[String]() + stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -219,8 +221,8 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] - stream.foreachRDD { rdd => collectedData ++= rdd.collect() } + val collectedData = new ConcurrentLinkedQueue[String]() + stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -265,7 +267,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.appendAll(data) + DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) } // This is ensure all the data is eventually receiving only once @@ -335,14 +337,13 @@ class DirectKafkaStreamSuite ssc, kafkaParams, Set(topic)) } - val allReceived = - new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] + val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + "didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n")) // Calculate all the record number collected in the StreamingListener. assert(collector.numRecordsSubmitted.get() === totalSent) @@ -389,17 +390,16 @@ class DirectKafkaStreamSuite } } - val collectedData = - new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] + val collectedData = new ConcurrentLinkedQueue[Array[String]]() // Used for assertion failure messages. def dataToString: String = - collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") + collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - collectedData += data + collectedData.add(data) } ssc.start() @@ -415,7 +415,7 @@ class DirectKafkaStreamSuite eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. - assert(collectedData.exists(_.size == expectedSize), + assert(collectedData.asScala.exists(_.size == expectedSize), s" - No arrays of size $expectedSize for rate $rate found in $dataToString") } } @@ -433,7 +433,7 @@ class DirectKafkaStreamSuite } object DirectKafkaStreamSuite { - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L class InputInfoCollector extends StreamingListener { diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 967a482ba4f9b..64619d2108999 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -169,6 +169,18 @@ This file is divided into 3 sections: ]]> + + mutable\.SynchronizedBuffer + + + Class\.forName logInfo(s"Received a new rate limit: $eps.") - registeredBlockGenerators.foreach { bg => + registeredBlockGenerators.asScala.foreach { bg => bg.updateRate(eps) } } @@ -92,8 +94,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Unique block ids if one wants to add blocks directly */ private val newBlockId = new AtomicLong(System.currentTimeMillis()) - private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] - with mutable.SynchronizedBuffer[BlockGenerator] + private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator] /** Divides received data records into data blocks for pushing in BlockManager. */ private val defaultBlockGeneratorListener = new BlockGeneratorListener { @@ -170,11 +171,11 @@ private[streaming] class ReceiverSupervisorImpl( } override protected def onStart() { - registeredBlockGenerators.foreach { _.start() } + registeredBlockGenerators.asScala.foreach { _.start() } } override protected def onStop(message: String, error: Option[Throwable]) { - registeredBlockGenerators.foreach { _.stop() } + registeredBlockGenerators.asScala.foreach { _.stop() } env.rpcEnv.stop(endpoint) } @@ -194,10 +195,11 @@ private[streaming] class ReceiverSupervisorImpl( override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped - registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() } + val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() } + stoppedGenerators.foreach(registeredBlockGenerators.remove(_)) val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) - registeredBlockGenerators += newBlockGenerator + registeredBlockGenerators.add(newBlockGenerator) newBlockGenerator } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 25e7ae8262a5f..fe971b96519b5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.language.existentials @@ -645,8 +647,8 @@ class BasicOperationsSuite extends TestSuiteBase { val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val mappedStream = networkStream.map(_ + ".").persist() - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(mappedStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(mappedStream, outputQueue) outputStream.register() ssc.start() @@ -685,7 +687,7 @@ class BasicOperationsSuite extends TestSuiteBase { testServer.stop() // verify data has been received - assert(outputBuffer.size > 0) + assert(!outputQueue.isEmpty) assert(blockRdds.size > 0) assert(persistentRddIds.size > 0) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 786703eb9a84e..26ea0901ff4e9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import java.util.concurrent.ConcurrentLinkedQueue +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -105,7 +107,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => val operatedStream = operation(inputStream) operatedStream.print() val outputStream = new TestOutputStreamWithPartitions(operatedStream, - new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) + new ConcurrentLinkedQueue[Seq[Seq[V]]]) outputStream.register() ssc.checkpoint(checkpointDir) @@ -166,7 +168,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => // are written to make sure that both of them have been written. assert(checkpointFilesOfLatestTime.size === 2) } - outputStream.output.map(_.flatten) + outputStream.output.asScala.map(_.flatten).toSeq } finally { ssc.stop(stopSparkContext = stopSparkContext) @@ -591,7 +593,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester // Set up the streaming context and input streams val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's. val testDir = Utils.createTempDir() - val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + val outputBuffer = new ConcurrentLinkedQueue[Seq[Int]] /** * Writes a file named `i` (which contains the number `i`) to the test directory and sets its @@ -671,7 +673,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester ssc.stop() // Check that we shut down while the third batch was being processed assert(batchCounter.getNumCompletedBatches === 2) - assert(outputStream.output.flatten === Seq(1, 3)) + assert(outputStream.output.asScala.toSeq.flatten === Seq(1, 3)) } // The original StreamingContext has now been stopped. @@ -721,7 +723,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1) } } - logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]")) + logInfo("Output after restart = " + outputStream.output.asScala.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() @@ -730,11 +732,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(recordedFiles(ssc) === (1 to 9)) // Append the new output to the old buffer - outputBuffer ++= outputStream.output + outputBuffer.addAll(outputStream.output) // Verify whether all the elements received are as expected val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) - assert(outputBuffer.flatten.toSet === expectedOutput.toSet) + assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet) } } finally { Utils.deleteRecursively(testDir) @@ -894,7 +896,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester * Advances the manual clock on the streaming scheduler by given number of batches. * It also waits for the expected amount of time for each batch. */ - def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = + def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): + Iterable[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.getTimeMillis()) @@ -908,7 +911,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val outputStream = ssc.graph.getOutputStreams().filter { dstream => dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] }.head.asInstanceOf[TestOutputStreamWithPartitions[V]] - outputStream.output.map(_.flatten) + outputStream.output.asScala.map(_.flatten) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 75591f04ca00d..89cf7e83afbcd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.streaming import java.io.{BufferedWriter, File, OutputStreamWriter} import java.net.{ServerSocket, Socket, SocketException} import java.nio.charset.Charset -import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, Executors, TimeUnit} +import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer, SynchronizedQueue} +import scala.collection.JavaConverters._ +import scala.collection.mutable.SynchronizedQueue import scala.language.postfixOps import com.google.common.io.Files @@ -58,8 +59,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchCounter = new BatchCounter(ssc) val networkStream = ssc.socketTextStream( "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(networkStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputQueue) outputStream.register() ssc.start() @@ -90,9 +91,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -100,7 +101,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) - val output: ArrayBuffer[String] = outputBuffer.flatMap(x => x) + val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray assert(output.size === expectedOutput.size) for (i <- 0 until output.size) { assert(output(i) === expectedOutput(i)) @@ -119,8 +120,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchCounter = new BatchCounter(ssc) val networkStream = ssc.socketTextStream( "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(networkStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputQueue) outputStream.register() ssc.start() @@ -156,9 +157,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { clock.setTime(existingFile.lastModified + batchDuration.milliseconds) val batchCounter = new BatchCounter(ssc) val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) - val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] - with SynchronizedBuffer[Seq[Array[Byte]]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[Array[Byte]]] + val outputStream = new TestOutputStream(fileStream, outputQueue) outputStream.register() ssc.start() @@ -183,8 +183,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } val expectedOutput = input.map(i => i.toByte) - val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) - assert(obtainedOutput === expectedOutput) + val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte) + assert(obtainedOutput.toSeq === expectedOutput) } } finally { if (testDir != null) Utils.deleteRecursively(testDir) @@ -206,15 +206,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val numTotalRecords = numThreads * numRecordsPerThread val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread) MultiThreadTestReceiver.haveAllThreadsFinished = false - val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] - def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x) + val outputQueue = new ConcurrentLinkedQueue[Seq[Long]] + def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x) // set up the network stream using the test receiver withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val networkStream = ssc.receiverStream[Int](testReceiver) val countStream = networkStream.count - val outputStream = new TestOutputStream(countStream, outputBuffer) + val outputStream = new TestOutputStream(countStream, outputQueue) outputStream.register() ssc.start() @@ -231,9 +231,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") assert(output.sum === numTotalRecords) } @@ -241,14 +241,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("queue input stream - oneAtATime = true") { val input = Seq("1", "2", "3", "4", "5") val expectedOutput = input.map(Seq(_)) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0) // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val queue = new SynchronizedQueue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = true) - val outputStream = new TestOutputStream(queueStream, outputBuffer) + val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() ssc.start() @@ -266,9 +266,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -276,14 +276,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { - assert(output(i) === expectedOutput(i)) - } + output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))} } test("queue input stream - oneAtATime = false") { - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + def output: ArrayBuffer[Seq[String]] = outputQueue.asScala.filter(_.size > 0) val input = Seq("1", "2", "3", "4", "5") val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5")) @@ -291,7 +289,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val queue = new SynchronizedQueue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = false) - val outputStream = new TestOutputStream(queueStream, outputBuffer) + val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() ssc.start() @@ -312,9 +310,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -322,9 +320,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { - assert(output(i) === expectedOutput(i)) - } + output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))} } test("test track the number of input stream") { @@ -373,8 +369,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) outputStream.register() ssc.start() @@ -404,7 +400,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } else { (Seq(0) ++ input).map(_.toString).toSet } - assert(outputBuffer.flatten.toSet === expectedOutput) + assert(outputQueue.asScala.flatten.toSet === expectedOutput) } } finally { if (testDir != null) Utils.deleteRecursively(testDir) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 2984fd2b298dc..cdb29ef75293a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.streaming import java.io.File +import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} @@ -550,9 +551,9 @@ class MapWithStateSuite extends SparkFunSuite val ssc = new StreamingContext(sc, Seconds(1)) val inputStream = new TestInputStream(ssc, input, numPartitions = 2) val trackeStateStream = inputStream.map(x => (x, 1)).mapWithState(mapWithStateSpec) - val collectedOutputs = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + val collectedOutputs = new ConcurrentLinkedQueue[Seq[T]] val outputStream = new TestOutputStream(trackeStateStream, collectedOutputs) - val collectedStateSnapshots = new ArrayBuffer[Seq[(K, S)]] with SynchronizedBuffer[Seq[(K, S)]] + val collectedStateSnapshots = new ConcurrentLinkedQueue[Seq[(K, S)]] val stateSnapshotStream = new TestOutputStream( trackeStateStream.stateSnapshots(), collectedStateSnapshots) outputStream.register() @@ -567,7 +568,7 @@ class MapWithStateSuite extends SparkFunSuite batchCounter.waitUntilBatchesCompleted(numBatches, 10000) ssc.stop(stopSparkContext = false) - (collectedOutputs, collectedStateSnapshots) + (collectedOutputs.asScala.toSeq, collectedStateSnapshots.asScala.toSeq) } private def assert[U](expected: Seq[Seq[U]], collected: Seq[Seq[U]], typ: String) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 239b10894ad2c..8c29130fff67d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -18,9 +18,9 @@ package org.apache.spark.streaming import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.SynchronizedBuffer +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag @@ -93,8 +93,7 @@ class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], n */ class TestOutputStream[T: ClassTag]( parent: DStream[T], - val output: SynchronizedBuffer[Seq[T]] = - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]] ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected @@ -117,8 +116,8 @@ class TestOutputStream[T: ClassTag]( */ class TestOutputStreamWithPartitions[T: ClassTag]( parent: DStream[T], - val output: SynchronizedBuffer[Seq[Seq[T]]] = - new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]]) + val output: ConcurrentLinkedQueue[Seq[Seq[T]]] = + new ConcurrentLinkedQueue[Seq[Seq[T]]]) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) output += collected @@ -322,7 +321,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { val inputStream = new TestInputStream(ssc, input, numPartitions) val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, - new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) + new ConcurrentLinkedQueue[Seq[Seq[V]]]) outputStream.register() ssc } @@ -347,7 +346,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, - new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) + new ConcurrentLinkedQueue[Seq[Seq[W]]]) outputStream.register() ssc } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index f5ec0ff60aa27..76a83ee535b5a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.receiver +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.mutable import scala.language.reflectiveCalls @@ -231,9 +233,9 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { /** A listener for BlockGenerator that records the data in the callbacks */ private class TestBlockGeneratorListener extends BlockGeneratorListener { - val pushedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any] - val addedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any] - val addedMetadata = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any] + val pushedData = new ConcurrentLinkedQueue[Any] + val addedData = new ConcurrentLinkedQueue[Any] + val addedMetadata = new ConcurrentLinkedQueue[Any] @volatile var onGenerateBlockCalled = false @volatile var onAddDataCalled = false @volatile var onPushBlockCalled = false diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala index 0544972d95c03..08632ec8112c6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.util +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.mutable import scala.concurrent.duration._ @@ -30,7 +32,7 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester { test("basic") { val clock = new ManualClock() - val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long] + val results = new ConcurrentLinkedQueue[Long]() val timer = new RecurringTimer(clock, 100, time => { results += time }, "RecurringTimerSuite-basic") @@ -51,7 +53,7 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester { test("SPARK-10224: call 'callback' after stopping") { val clock = new ManualClock() - val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long] + val results = new ConcurrentLinkedQueue[Long]() val timer = new RecurringTimer(clock, 100, time => { results += time }, "RecurringTimerSuite-SPARK-10224")