diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 080480663200c..1d5f5a3b61e25 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -291,20 +291,26 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // so as to distribute them evenly over executors in a round-robin fashion // If num_executors > num_receivers, distribute executors among receivers val locations = new Array[ArrayBuffer[String]](receivers.length) + var roundRobin = false if (!hasLocationPreferences && !ssc.sparkContext.isLocal) { val executors = getExecutors(ssc) var i = 0 - for (i <- 0 to (receivers.length - 1)) { - locations(i) = new ArrayBuffer[String]() - } - if (receivers.length >= executors.length) { + if (!executors.isEmpty) { for (i <- 0 to (receivers.length - 1)) { - locations(i) += executors(i % executors.length) + locations(i) = new ArrayBuffer[String]() } - } else { - for (i <- 0 to (executors.length - 1)) { - locations(i % receivers.length) += executors(i) + if (receivers.length >= executors.length) { + for (i <- 0 to (receivers.length - 1)) { + locations(i) += executors(i % executors.length) + System.out.println("RECEIVERS > executors " + executors(i % executors.length)) + } + } else { + for (i <- 0 to (executors.length - 1)) { + locations(i % receivers.length) += executors(i) + System.out.println("EXECUTORS > receivers " + executors(i)) + } } + roundRobin = true } } @@ -312,9 +318,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) - } else { + } else if (roundRobin) { + // ssc.sc.makeRDD(receivers, receivers.size) val roundRobinReceivers = (0 to (receivers.length - 1)).map(i => (receivers(i), locations(i))) ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers) + } else { + ssc.sc.makeRDD(receivers, receivers.size) } tempRDD }