From 4375f65982dea740dbba3bbe868ee0b7dd55eacf Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 13:32:07 -0800 Subject: [PATCH 01/11] Add scalastyle rule banning use of mutable.SynchronizedBuffer --- scalastyle-config.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 967a482ba4f9b..64619d2108999 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -169,6 +169,18 @@ This file is divided into 3 sections: ]]> + + mutable\.SynchronizedBuffer + + + Class\.forName Date: Thu, 4 Feb 2016 14:05:02 -0800 Subject: [PATCH 02/11] Replace mutable.SynchronizedBuffer with ConcurrentLinkedQueue --- .../kafka/DirectKafkaStreamSuite.scala | 17 +++++++---------- .../receiver/ReceiverSupervisorImpl.scala | 5 ++--- .../apache/spark/streaming/TestSuiteBase.scala | 13 ++++++------- .../receiver/BlockGeneratorSuite.scala | 8 +++++--- .../streaming/util/RecurringTimerSuite.scala | 6 ++++-- 5 files changed, 24 insertions(+), 25 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 655b161734d94..ce84ccb240deb 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kafka import java.io.File -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ConcurrentLinkedQueue, atomic.AtomicLong} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -101,8 +101,7 @@ class DirectKafkaStreamSuite ssc, kafkaParams, topics) } - val allReceived = - new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] + val allReceived = new ConcurrentLinkedQueue[(String, String)] // hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array[OffsetRange]() @@ -173,7 +172,7 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + val collectedData = new ConcurrentLinkedQueue[String]() stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) @@ -219,7 +218,7 @@ class DirectKafkaStreamSuite "Start offset not from latest" ) - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + val collectedData = new ConcurrentLinkedQueue[String]() stream.foreachRDD { rdd => collectedData ++= rdd.collect() } ssc.start() val newData = Map("b" -> 10) @@ -335,8 +334,7 @@ class DirectKafkaStreamSuite ssc, kafkaParams, Set(topic)) } - val allReceived = - new ArrayBuffer[(String, String)] with mutable.SynchronizedBuffer[(String, String)] + val allReceived = new ConcurrentLinkedQueue[(String, String)] stream.foreachRDD { rdd => allReceived ++= rdd.collect() } ssc.start() @@ -389,8 +387,7 @@ class DirectKafkaStreamSuite } } - val collectedData = - new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]] + val collectedData = new ConcurrentLinkedQueue[Array[String]]() // Used for assertion failure messages. def dataToString: String = @@ -433,7 +430,7 @@ class DirectKafkaStreamSuite } object DirectKafkaStreamSuite { - val collectedData = new mutable.ArrayBuffer[String]() with mutable.SynchronizedBuffer[String] + val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L class InputInfoCollector extends StreamingListener { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index b774b6b9a55d1..fb63e23bb36e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.receiver import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ConcurrentLinkedQueue, atomic.AtomicLong} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -92,8 +92,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Unique block ids if one wants to add blocks directly */ private val newBlockId = new AtomicLong(System.currentTimeMillis()) - private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] - with mutable.SynchronizedBuffer[BlockGenerator] + private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator] /** Divides received data records into data blocks for pushing in BlockManager. */ private val defaultBlockGeneratorListener = new BlockGeneratorListener { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 239b10894ad2c..15ca13d69152c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -18,9 +18,9 @@ package org.apache.spark.streaming import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.SynchronizedBuffer import scala.language.implicitConversions import scala.reflect.ClassTag @@ -93,8 +93,7 @@ class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], n */ class TestOutputStream[T: ClassTag]( parent: DStream[T], - val output: SynchronizedBuffer[Seq[T]] = - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]] ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected @@ -117,8 +116,8 @@ class TestOutputStream[T: ClassTag]( */ class TestOutputStreamWithPartitions[T: ClassTag]( parent: DStream[T], - val output: SynchronizedBuffer[Seq[Seq[T]]] = - new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]]) + val output: ConcurrentLinkedQueue[Seq[Seq[T]]] = + new ConcurrentLinkedQueue[Seq[Seq[T]]]) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) output += collected @@ -322,7 +321,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { val inputStream = new TestInputStream(ssc, input, numPartitions) val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, - new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) + new ConcurrentLinkedQueue[Seq[Seq[V]]]) outputStream.register() ssc } @@ -347,7 +346,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, - new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) + new ConcurrentLinkedQueue[Seq[Seq[W]]]) outputStream.register() ssc } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala index f5ec0ff60aa27..76a83ee535b5a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.receiver +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.mutable import scala.language.reflectiveCalls @@ -231,9 +233,9 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter { /** A listener for BlockGenerator that records the data in the callbacks */ private class TestBlockGeneratorListener extends BlockGeneratorListener { - val pushedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any] - val addedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any] - val addedMetadata = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any] + val pushedData = new ConcurrentLinkedQueue[Any] + val addedData = new ConcurrentLinkedQueue[Any] + val addedMetadata = new ConcurrentLinkedQueue[Any] @volatile var onGenerateBlockCalled = false @volatile var onAddDataCalled = false @volatile var onPushBlockCalled = false diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala index 0544972d95c03..08632ec8112c6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.util +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.mutable import scala.concurrent.duration._ @@ -30,7 +32,7 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester { test("basic") { val clock = new ManualClock() - val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long] + val results = new ConcurrentLinkedQueue[Long]() val timer = new RecurringTimer(clock, 100, time => { results += time }, "RecurringTimerSuite-basic") @@ -51,7 +53,7 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester { test("SPARK-10224: call 'callback' after stopping") { val clock = new ManualClock() - val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long] + val results = new ConcurrentLinkedQueue[Long]() val timer = new RecurringTimer(clock, 100, time => { results += time }, "RecurringTimerSuite-SPARK-10224") From f5666a8e100b9d971fcb34ce131de8d10430f64e Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 14:38:34 -0800 Subject: [PATCH 03/11] Fix compilation --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 3 ++- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index ce84ccb240deb..4fe30488c7b51 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.streaming.kafka import java.io.File -import java.util.concurrent.{ConcurrentLinkedQueue, atomic.AtomicLong} +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index fb63e23bb36e0..3ddcf321ae8f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -18,7 +18,8 @@ package org.apache.spark.streaming.receiver import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentLinkedQueue, atomic.AtomicLong} +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable import scala.collection.mutable.ArrayBuffer From a566d911cad71a0b51ed2d080bd3502a2f6a39d8 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 15:07:32 -0800 Subject: [PATCH 04/11] Fix compilation --- .../streaming/receiver/ReceiverSupervisorImpl.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 3ddcf321ae8f3..53e210a1bdfe6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -84,7 +85,7 @@ private[streaming] class ReceiverSupervisorImpl( cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") - registeredBlockGenerators.foreach { bg => + registeredBlockGenerators.asScala.foreach { bg => bg.updateRate(eps) } } @@ -170,11 +171,11 @@ private[streaming] class ReceiverSupervisorImpl( } override protected def onStart() { - registeredBlockGenerators.foreach { _.start() } + registeredBlockGenerators.asScala.foreach { _.start() } } override protected def onStop(message: String, error: Option[Throwable]) { - registeredBlockGenerators.foreach { _.stop() } + registeredBlockGenerators.asScala.foreach { _.stop() } env.rpcEnv.stop(endpoint) } @@ -194,10 +195,10 @@ private[streaming] class ReceiverSupervisorImpl( override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped - registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() } + registeredBlockGenerators.removeAll(registeredBlockGenerators.asScala.filter{ _.isStopped() }) val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) - registeredBlockGenerators += newBlockGenerator + registeredBlockGenerators.add(newBlockGenerator) newBlockGenerator } From b63c13663315ababf2fc6ee10e15a707c2eb936f Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 15:38:15 -0800 Subject: [PATCH 05/11] Fix compilation --- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 53e210a1bdfe6..0cf1b72fd0124 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -195,7 +195,8 @@ private[streaming] class ReceiverSupervisorImpl( override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped - registeredBlockGenerators.removeAll(registeredBlockGenerators.asScala.filter{ _.isStopped() }) + val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() } + stoppedGenerators.foreach(registeredBlockGenerators.remove(_)) val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) registeredBlockGenerators.add(newBlockGenerator) From ebf3509d58c7d232b147c2ff94f8da226f492eca Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 18:25:02 -0800 Subject: [PATCH 06/11] Address compilation errors --- .../kafka/DirectKafkaStreamSuite.scala | 21 +++--- .../streaming/BasicOperationsSuite.scala | 8 ++- .../spark/streaming/CheckpointSuite.scala | 22 +++--- .../spark/streaming/InputStreamsSuite.scala | 72 +++++++++---------- .../spark/streaming/MapWithStateSuite.scala | 9 +-- .../spark/streaming/TestSuiteBase.scala | 3 +- 6 files changed, 70 insertions(+), 65 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 4fe30488c7b51..224e275424ded 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -131,11 +132,11 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + stream.foreachRDD { rdd => allReceived.addAll(rdd.collect()) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + "didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n")) } ssc.stop() } @@ -174,7 +175,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() } + stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(rdd.collect()) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -220,7 +221,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.foreachRDD { rdd => collectedData ++= rdd.collect() } + stream.foreachRDD { rdd => collectedData.addAll(rdd.collect()) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -265,7 +266,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.appendAll(data) + DirectKafkaStreamSuite.collectedData.addAll(data) } // This is ensure all the data is eventually receiving only once @@ -337,11 +338,11 @@ class DirectKafkaStreamSuite val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + stream.foreachRDD { rdd => allReceived.addAll(rdd.collect()) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + "didn't get expected number of messages, messages:\n" + allReceived.asScala.mkString("\n")) // Calculate all the record number collected in the StreamingListener. assert(collector.numRecordsSubmitted.get() === totalSent) @@ -392,12 +393,12 @@ class DirectKafkaStreamSuite // Used for assertion failure messages. def dataToString: String = - collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") + collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - collectedData += data + collectedData.add(data) } ssc.start() @@ -413,7 +414,7 @@ class DirectKafkaStreamSuite eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. - assert(collectedData.exists(_.size == expectedSize), + assert(collectedData.asScala.exists(_.size == expectedSize), s" - No arrays of size $expectedSize for rate $rate found in $dataToString") } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 25e7ae8262a5f..fe971b96519b5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming +import java.util.concurrent.ConcurrentLinkedQueue + import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.language.existentials @@ -645,8 +647,8 @@ class BasicOperationsSuite extends TestSuiteBase { val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val mappedStream = networkStream.map(_ + ".").persist() - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(mappedStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(mappedStream, outputQueue) outputStream.register() ssc.start() @@ -685,7 +687,7 @@ class BasicOperationsSuite extends TestSuiteBase { testServer.stop() // verify data has been received - assert(outputBuffer.size > 0) + assert(!outputQueue.isEmpty) assert(blockRdds.size > 0) assert(persistentRddIds.size > 0) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 786703eb9a84e..64ad35a0687e8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,6 +18,9 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -105,7 +108,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => val operatedStream = operation(inputStream) operatedStream.print() val outputStream = new TestOutputStreamWithPartitions(operatedStream, - new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) + new ConcurrentLinkedQueue[Seq[Seq[V]]]) outputStream.register() ssc.checkpoint(checkpointDir) @@ -166,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => // are written to make sure that both of them have been written. assert(checkpointFilesOfLatestTime.size === 2) } - outputStream.output.map(_.flatten) + outputStream.output.asScala.map(_.flatten).toSeq } finally { ssc.stop(stopSparkContext = stopSparkContext) @@ -591,7 +594,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester // Set up the streaming context and input streams val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's. val testDir = Utils.createTempDir() - val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]] + val outputBuffer = new ConcurrentLinkedQueue[Seq[Int]] /** * Writes a file named `i` (which contains the number `i`) to the test directory and sets its @@ -671,7 +674,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester ssc.stop() // Check that we shut down while the third batch was being processed assert(batchCounter.getNumCompletedBatches === 2) - assert(outputStream.output.flatten === Seq(1, 3)) + assert(outputStream.output.asScala.toSeq.flatten === Seq(1, 3)) } // The original StreamingContext has now been stopped. @@ -721,7 +724,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1) } } - logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]")) + logInfo("Output after restart = " + outputStream.output.asScala.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() @@ -730,11 +733,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester assert(recordedFiles(ssc) === (1 to 9)) // Append the new output to the old buffer - outputBuffer ++= outputStream.output + outputBuffer.addAll(outputStream.output) // Verify whether all the elements received are as expected val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) - assert(outputBuffer.flatten.toSet === expectedOutput.toSet) + assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet) } } finally { Utils.deleteRecursively(testDir) @@ -894,7 +897,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester * Advances the manual clock on the streaming scheduler by given number of batches. * It also waits for the expected amount of time for each batch. */ - def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = + def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): + Iterable[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.getTimeMillis()) @@ -908,7 +912,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val outputStream = ssc.graph.getOutputStreams().filter { dstream => dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] }.head.asInstanceOf[TestOutputStreamWithPartitions[V]] - outputStream.output.map(_.flatten) + outputStream.output.asScala.map(_.flatten) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 75591f04ca00d..89cf7e83afbcd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.streaming import java.io.{BufferedWriter, File, OutputStreamWriter} import java.net.{ServerSocket, Socket, SocketException} import java.nio.charset.Charset -import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, Executors, TimeUnit} +import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer, SynchronizedQueue} +import scala.collection.JavaConverters._ +import scala.collection.mutable.SynchronizedQueue import scala.language.postfixOps import com.google.common.io.Files @@ -58,8 +59,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchCounter = new BatchCounter(ssc) val networkStream = ssc.socketTextStream( "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(networkStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputQueue) outputStream.register() ssc.start() @@ -90,9 +91,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -100,7 +101,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) - val output: ArrayBuffer[String] = outputBuffer.flatMap(x => x) + val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray assert(output.size === expectedOutput.size) for (i <- 0 until output.size) { assert(output(i) === expectedOutput(i)) @@ -119,8 +120,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchCounter = new BatchCounter(ssc) val networkStream = ssc.socketTextStream( "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(networkStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputQueue) outputStream.register() ssc.start() @@ -156,9 +157,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { clock.setTime(existingFile.lastModified + batchDuration.milliseconds) val batchCounter = new BatchCounter(ssc) val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) - val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] - with SynchronizedBuffer[Seq[Array[Byte]]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[Array[Byte]]] + val outputStream = new TestOutputStream(fileStream, outputQueue) outputStream.register() ssc.start() @@ -183,8 +183,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } val expectedOutput = input.map(i => i.toByte) - val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) - assert(obtainedOutput === expectedOutput) + val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte) + assert(obtainedOutput.toSeq === expectedOutput) } } finally { if (testDir != null) Utils.deleteRecursively(testDir) @@ -206,15 +206,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val numTotalRecords = numThreads * numRecordsPerThread val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread) MultiThreadTestReceiver.haveAllThreadsFinished = false - val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] - def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x) + val outputQueue = new ConcurrentLinkedQueue[Seq[Long]] + def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x) // set up the network stream using the test receiver withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val networkStream = ssc.receiverStream[Int](testReceiver) val countStream = networkStream.count - val outputStream = new TestOutputStream(countStream, outputBuffer) + val outputStream = new TestOutputStream(countStream, outputQueue) outputStream.register() ssc.start() @@ -231,9 +231,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") assert(output.sum === numTotalRecords) } @@ -241,14 +241,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("queue input stream - oneAtATime = true") { val input = Seq("1", "2", "3", "4", "5") val expectedOutput = input.map(Seq(_)) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0) // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val queue = new SynchronizedQueue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = true) - val outputStream = new TestOutputStream(queueStream, outputBuffer) + val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() ssc.start() @@ -266,9 +266,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -276,14 +276,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { - assert(output(i) === expectedOutput(i)) - } + output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))} } test("queue input stream - oneAtATime = false") { - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + def output: ArrayBuffer[Seq[String]] = outputQueue.asScala.filter(_.size > 0) val input = Seq("1", "2", "3", "4", "5") val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5")) @@ -291,7 +289,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => val queue = new SynchronizedQueue[RDD[String]]() val queueStream = ssc.queueStream(queue, oneAtATime = false) - val outputStream = new TestOutputStream(queueStream, outputBuffer) + val outputStream = new TestOutputStream(queueStream, outputQueue) outputStream.register() ssc.start() @@ -312,9 +310,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + outputQueue.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -322,9 +320,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { - assert(output(i) === expectedOutput(i)) - } + output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))} } test("test track the number of input stream") { @@ -373,8 +369,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) outputStream.register() ssc.start() @@ -404,7 +400,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } else { (Seq(0) ++ input).map(_.toString).toSet } - assert(outputBuffer.flatten.toSet === expectedOutput) + assert(outputQueue.asScala.flatten.toSet === expectedOutput) } } finally { if (testDir != null) Utils.deleteRecursively(testDir) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala index 2984fd2b298dc..cdb29ef75293a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.streaming import java.io.File +import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} @@ -550,9 +551,9 @@ class MapWithStateSuite extends SparkFunSuite val ssc = new StreamingContext(sc, Seconds(1)) val inputStream = new TestInputStream(ssc, input, numPartitions = 2) val trackeStateStream = inputStream.map(x => (x, 1)).mapWithState(mapWithStateSpec) - val collectedOutputs = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + val collectedOutputs = new ConcurrentLinkedQueue[Seq[T]] val outputStream = new TestOutputStream(trackeStateStream, collectedOutputs) - val collectedStateSnapshots = new ArrayBuffer[Seq[(K, S)]] with SynchronizedBuffer[Seq[(K, S)]] + val collectedStateSnapshots = new ConcurrentLinkedQueue[Seq[(K, S)]] val stateSnapshotStream = new TestOutputStream( trackeStateStream.stateSnapshots(), collectedStateSnapshots) outputStream.register() @@ -567,7 +568,7 @@ class MapWithStateSuite extends SparkFunSuite batchCounter.waitUntilBatchesCompleted(numBatches, 10000) ssc.stop(stopSparkContext = false) - (collectedOutputs, collectedStateSnapshots) + (collectedOutputs.asScala.toSeq, collectedStateSnapshots.asScala.toSeq) } private def assert[U](expected: Seq[Seq[U]], collected: Seq[Seq[U]], typ: String) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 15ca13d69152c..b5088badab740 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -20,7 +20,8 @@ package org.apache.spark.streaming import java.io.{IOException, ObjectInputStream} import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ + import scala.language.implicitConversions import scala.reflect.ClassTag From 74a4c701cec27aab6d8684373a279c6597383f65 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 19:53:41 -0800 Subject: [PATCH 07/11] Remove empty line between imports --- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 1 - .../test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 64ad35a0687e8..26ea0901ff4e9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputS import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ - import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index b5088badab740..8c29130fff67d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -21,7 +21,6 @@ import java.io.{IOException, ObjectInputStream} import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ - import scala.language.implicitConversions import scala.reflect.ClassTag From 879eebed57f40e47e5ae803e947657a723c26936 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 4 Feb 2016 21:48:56 -0800 Subject: [PATCH 08/11] Try to fix compilation for DirectKafkaStreamSuite: --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 224e275424ded..6eac99093eb46 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -132,7 +132,7 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => allReceived.addAll(rdd.collect()) } + stream.foreachRDD { rdd => allReceived.addAll(rdd.collect().asJava) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, @@ -175,7 +175,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(rdd.collect()) } + stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(rdd.collect().asJava) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -221,7 +221,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.foreachRDD { rdd => collectedData.addAll(rdd.collect()) } + stream.foreachRDD { rdd => collectedData.addAll(rdd.collect().asJava) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -266,7 +266,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(data) + DirectKafkaStreamSuite.collectedData.addAll(data.asJava) } // This is ensure all the data is eventually receiving only once @@ -338,7 +338,7 @@ class DirectKafkaStreamSuite val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => allReceived.addAll(rdd.collect()) } + stream.foreachRDD { rdd => allReceived.addAll(rdd.collect().asJava) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, From 7faca67b334f4aee11dba1e0333bc326d790acbc Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 5 Feb 2016 03:51:42 -0800 Subject: [PATCH 09/11] Fix compilation for DirectKafkaStreamSuite --- .../streaming/kafka/DirectKafkaStreamSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 6eac99093eb46..249f36b0ccef2 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka import java.io.File +import java.util.Collections import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong @@ -132,7 +133,7 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => allReceived.addAll(rdd.collect().asJava) } + stream.foreachRDD { rdd => Collections.addAll(allReceived, rdd.collect()) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, @@ -175,7 +176,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(rdd.collect().asJava) } + stream.map { _._2 }.foreachRDD { rdd => Collections.addAll(collectedData, rdd.collect()) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -221,7 +222,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.foreachRDD { rdd => collectedData.addAll(rdd.collect().asJava) } + stream.foreachRDD { rdd => Collections.addAll(collectedData, rdd.collect()) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -266,7 +267,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(data.asJava) + Collections.addAll(DirectKafkaStreamSuite.collectedData, data) } // This is ensure all the data is eventually receiving only once @@ -338,7 +339,7 @@ class DirectKafkaStreamSuite val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => allReceived.addAll(rdd.collect().asJava) } + stream.foreachRDD { rdd => Collections.addAll(allReceived, rdd.collect()) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, From b84c8c78f3d25ac3a47bb2dcfdb72125ed8cf7f5 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 5 Feb 2016 12:05:44 -0800 Subject: [PATCH 10/11] Fix compilation for DirectKafkaStreamSuite --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 249f36b0ccef2..80b3071a864ea 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -133,7 +133,7 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => Collections.addAll(allReceived, rdd.collect()) } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect():_*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, @@ -176,7 +176,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.map { _._2 }.foreachRDD { rdd => Collections.addAll(collectedData, rdd.collect()) } + stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect():_*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -222,7 +222,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.foreachRDD { rdd => Collections.addAll(collectedData, rdd.collect()) } + stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect():_*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -267,7 +267,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - Collections.addAll(DirectKafkaStreamSuite.collectedData, data) + DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data:_*)) } // This is ensure all the data is eventually receiving only once @@ -339,7 +339,7 @@ class DirectKafkaStreamSuite val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => Collections.addAll(allReceived, rdd.collect()) } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect():_*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, From 5e770b034f9cc4b18473c757a653edf3d326196a Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 5 Feb 2016 12:37:04 -0800 Subject: [PATCH 11/11] Fix scalastyle warning --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 80b3071a864ea..1b28336d5b55b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -133,7 +133,7 @@ class DirectKafkaStreamSuite assert(partSize === rangeSize, "offset ranges are wrong") } } - stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect():_*)) } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, @@ -176,7 +176,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect():_*)) } + stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -222,7 +222,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect():_*)) } + stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() val newData = Map("b" -> 10) kafkaTestUtils.sendMessages(topic, newData) @@ -267,7 +267,7 @@ class DirectKafkaStreamSuite // This is to collect the raw data received from Kafka kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data:_*)) + DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) } // This is ensure all the data is eventually receiving only once @@ -339,7 +339,7 @@ class DirectKafkaStreamSuite val allReceived = new ConcurrentLinkedQueue[(String, String)] - stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect():_*)) } + stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } ssc.start() eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent,