Skip to content

Commit

Permalink
Round-robin scheduling of streaming receivers
Browse files Browse the repository at this point in the history
  • Loading branch information
nishkamravi2 committed Jun 3, 2015
1 parent ce320cb commit fff1b2e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}

override def preferredLocation: Option[String] = Option(host)
preferredLocation = Some(host)

/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
*/
def onStop()

/** Override this to specify a preferred location (hostname). */
def preferredLocation : Option[String] = None
/** Specify a preferred location (hostname) */
var preferredLocation = None: Option[String]

/**
* Store a single item of received data to Spark's memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Get the list of executors excluding driver
*/
private def getExecutors(ssc: StreamingContext): List[String] = {
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
executors.diff(List(driver))
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand All @@ -281,18 +290,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
})

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)

Expand All @@ -308,12 +305,37 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
supervisor.start()
supervisor.awaitTermination()
}

// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// Right now, we only honor preferences if all receivers have them
var hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// If no location preferences are specified, set preferredLocation for each receiver
// so as to distribute them evenly over executors in a round-robin fashion
if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
val executors = getExecutors(ssc)
if (!executors.isEmpty) {
var i = 0;
for (i <- 0 to (receivers.length - 1)) {
receivers(i).preferredLocation = Some(executors(i % executors.length))
}
hasLocationPreferences = true;
}
}

val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
Expand Down

0 comments on commit fff1b2e

Please sign in to comment.