diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 4555cc1d9be28..5b5a3fe648602 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -256,9 +256,6 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable /** Identifier of the stream this receiver is associated with. */ private var id: Int = -1 - /** Location of the receiver for scheduling purposes. */ - private[streaming] var host: Option[String] = None - /** Handler object that runs the receiver. This is instantiated lazily in the worker. */ private[streaming] var executor_ : ReceiverSupervisor = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index e64e59f2f0bfb..a6ba31e6dc3f7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -17,8 +17,9 @@ package org.apache.spark.streaming.scheduler -import scala.collection.mutable.{HashMap, SynchronizedMap} +import scala.collection.mutable.{HashMap, SynchronizedMap, ArrayBuffer} import scala.language.existentials +import org.apache.spark.rdd._ import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException} @@ -279,6 +280,45 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false executors.diff(List(driver)) } + /* Schedule receivers using preferredLocation if specified + * and round-robin otherwise + */ + private def scheduleReceivers(receivers: Seq[Receiver[_]]): RDD[Receiver[_]] = { + // Location preferences are honored if all receivers have them + val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) + + // If no location preferences are specified, set host location for each receiver + // so as to distribute them evenly over executors in a round-robin fashion + // If num_executors > num_receivers, distribute executors among receivers + val locations = new Array[ArrayBuffer[String]](receivers.length) + if (!hasLocationPreferences && !ssc.sparkContext.isLocal) { + val executors = getExecutors(ssc) + var i = 0 + for (i <- 0 to (receivers.length - 1)) { + locations(i) = new ArrayBuffer[String]() + } + if (receivers.length >= executors.length) { + for (i <- 0 to (receivers.length - 1)) { + locations(i) += executors(i % executors.length) + } + } else { + for (i <- 0 to (executors.length - 1)) { + locations(i % receivers.length) += executors(i) + } + } + } + + val tempRDD = + if (hasLocationPreferences) { + val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) + ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) + } else { + val roundRobinReceivers = (0 to (receivers.length - 1)).map(i => (receivers(i), locations(i))) + ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers) + } + tempRDD + } + /** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. @@ -312,33 +352,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } - // Right now, we only honor preferences if all receivers have them - val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) - - // If no location preferences are specified, set host location for each receiver - // so as to distribute them evenly over executors in a round-robin fashion - var roundRobin = false; - if (!hasLocationPreferences && !ssc.sparkContext.isLocal) { - val executors = getExecutors(ssc) - if (!executors.isEmpty) { - var i = 0; - for (i <- 0 to (receivers.length - 1)) { - receivers(i).host = Some(executors(i % executors.length)) - } - roundRobin = true - } - } - - val tempRDD = - if (hasLocationPreferences) { - val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) - ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) - } else if(roundRobin) { - val roundRobinReceivers = receivers.map(r => (r, Seq(r.host.get))) - ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers) - } else { - ssc.sc.makeRDD(receivers, receivers.size) - } + val tempRDD = scheduleReceivers(receivers) // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers")