diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 3767bdef3298b..a6e783861dbe6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -32,6 +32,7 @@ class ReceiverTrackerSuite extends TestSuiteBase { val executors: List[String] = List("0", "1", "2", "3") test("receiver scheduling - all or none have preferred location") { + def parse(s: String): Array[Array[String]] = { val outerSplit = s.split("\\|") val loc = new Array[Array[String]](outerSplit.length) @@ -41,18 +42,20 @@ class ReceiverTrackerSuite extends TestSuiteBase { } loc } + def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) { val receivers = if (preferredLocation) { - (0 until numReceivers).map(i => new DummyReceiver(host = + Array.tabulate(numReceivers)(i => new DummyReceiver(host = Some(((i + 1) % executors.length).toString))) } else { - (0 until numReceivers).map(i => new DummyReceiver) + Array.tabulate(numReceivers)(_ => new DummyReceiver) } val locations = launcher.scheduleReceivers(receivers, executors) val expectedLocations = parse(allocation) assert(locations.deep === expectedLocations.deep) } + testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0") testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2") testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0")