diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala index f919ad4e21dc9..6d388d9624d92 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala @@ -36,10 +36,10 @@ class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll { StreamingContext.getActive().map { _.stop() } } - testWithoutWAL("createBlockRDD creates empty BlockRDD when no block info") { receiverStream => val rdd = receiverStream.createBlockRDD(Time(0), Seq.empty) assert(rdd.isInstanceOf[BlockRDD[_]]) + assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]]) assert(rdd.isEmpty()) } @@ -52,6 +52,7 @@ class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll { val rdd = receiverStream.createBlockRDD(Time(0), blockInfos) assert(rdd.isInstanceOf[BlockRDD[_]]) + assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]]) val blockRDD = rdd.asInstanceOf[BlockRDD[_]] assert(blockRDD.blockIds.toSeq === blockIds) } @@ -111,7 +112,7 @@ class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll { } } - private def testWithWAL(msg: String)(body: ReceiverInputDStream[_] => Unit):Unit = { + private def testWithWAL(msg: String)(body: ReceiverInputDStream[_] => Unit): Unit = { test(s"With WAL enabled: $msg") { runTest(enableWAL = true, body) } @@ -131,6 +132,12 @@ class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll { } } + /** + * Create a block info for input to the ReceiverInputDStream.createBlockRDD + * @param withWALInfo Create block with WAL info in it + * @param createBlock Actually create the block in the BlockManager + * @return + */ private def createBlockInfo( withWALInfo: Boolean, createBlock: Boolean = true): ReceivedBlockInfo = {