Skip to content

Commit

Permalink
Generalize the scheduling algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
nishkamravi2 committed Jun 5, 2015
1 parent bb5e09b commit 3cac21b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,6 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/** Identifier of the stream this receiver is associated with. */
private var id: Int = -1

/** Location of the receiver for scheduling purposes. */
private[streaming] var host: Option[String] = None

/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
private[streaming] var executor_ : ReceiverSupervisor = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.collection.mutable.{HashMap, SynchronizedMap, ArrayBuffer}
import scala.language.existentials
import org.apache.spark.rdd._

import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
Expand Down Expand Up @@ -279,6 +280,45 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
executors.diff(List(driver))
}

/* Schedule receivers using preferredLocation if specified
* and round-robin otherwise
*/
private def scheduleReceivers(receivers: Seq[Receiver[_]]): RDD[Receiver[_]] = {
// Location preferences are honored if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// If no location preferences are specified, set host location for each receiver
// so as to distribute them evenly over executors in a round-robin fashion
// If num_executors > num_receivers, distribute executors among receivers
val locations = new Array[ArrayBuffer[String]](receivers.length)
if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
val executors = getExecutors(ssc)
var i = 0
for (i <- 0 to (receivers.length - 1)) {
locations(i) = new ArrayBuffer[String]()
}
if (receivers.length >= executors.length) {
for (i <- 0 to (receivers.length - 1)) {
locations(i) += executors(i % executors.length)
}
} else {
for (i <- 0 to (executors.length - 1)) {
locations(i % receivers.length) += executors(i)
}
}
}

val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
val roundRobinReceivers = (0 to (receivers.length - 1)).map(i => (receivers(i), locations(i)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
}
tempRDD
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand Down Expand Up @@ -312,33 +352,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// If no location preferences are specified, set host location for each receiver
// so as to distribute them evenly over executors in a round-robin fashion
var roundRobin = false;
if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
val executors = getExecutors(ssc)
if (!executors.isEmpty) {
var i = 0;
for (i <- 0 to (receivers.length - 1)) {
receivers(i).host = Some(executors(i % executors.length))
}
roundRobin = true
}
}

val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else if(roundRobin) {
val roundRobinReceivers = receivers.map(r => (r, Seq(r.host.get)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
val tempRDD = scheduleReceivers(receivers)

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
Expand Down

0 comments on commit 3cac21b

Please sign in to comment.