Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7988][STREAMING] Round-robin scheduling of receivers by default #6607

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fff1b2e
Round-robin scheduling of streaming receivers
nishkamravi2 Jun 3, 2015
41705de
Update ReceiverTracker.scala
nishkamravi2 Jun 3, 2015
bb5e09b
Add a new var in receiver to store location information for round-rob…
nishkamravi2 Jun 5, 2015
b05ee2f
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
3cac21b
Generalize the scheduling algorithm
nishkamravi2 Jun 5, 2015
975b8d8
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark …
nishkamravi2 Jun 5, 2015
6e3515c
Minor changes
nishkamravi2 Jun 5, 2015
7888257
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
6caeefe
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
07b9dfa
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
02dbdb8
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
45e3a99
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark …
nishkamravi2 Jun 5, 2015
16e84ec
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
4cf97b6
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
f8a3e05
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
7f3e028
Update ReceiverTracker.scala, add unit test cases in SchedulerSuite
nishkamravi2 Jun 8, 2015
242e677
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
179b90f
Update ReceiverTracker.scala
nishkamravi2 Jun 8, 2015
4604f28
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
68e8540
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
bc23907
Update ReceiverTracker.scala
nishkamravi2 Jun 12, 2015
48a4a97
Update ReceiverTracker.scala
nishkamravi2 Jun 12, 2015
ae29152
Update test suite with TD's suggestions
nishkamravi2 Jun 17, 2015
9f1abc2
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 17, 2015
6127e58
Update ReceiverTracker and ReceiverTrackerSuite
nishkamravi2 Jun 23, 2015
f747739
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 23, 2015
1918819
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 23, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/** Identifier of the stream this receiver is associated with. */
private var id: Int = -1

/** Location of the receiver for scheduling purposes. */
private[streaming] var host: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we really need a new API on Receiver just to attach a preferred location at the time of launching the receivers as tasks.


/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
private[streaming] var executor_ : ReceiverSupervisor = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Get the list of executors excluding driver
*/
private def getExecutors(ssc: StreamingContext): List[String] = {
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
executors.diff(List(driver))
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand All @@ -281,18 +290,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
})

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)

Expand All @@ -308,12 +305,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
supervisor.start()
supervisor.awaitTermination()
}

// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if this new functionality is put into a function by itself. So that we can unit test it individually to test against different combinations of executors and all, may be different policies in the future.


// If no location preferences are specified, set host location for each receiver
// so as to distribute them evenly over executors in a round-robin fashion
var roundRobin = false;
if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
val executors = getExecutors(ssc)
if (!executors.isEmpty) {
var i = 0;
for (i <- 0 to (receivers.length - 1)) {
receivers(i).host = Some(executors(i % executors.length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have save the hosts with the receiver object? You can easily create a array (same size as the number of receivers) to store the hosts for each receiver. The receiver class does not have to change.

}
roundRobin = true
}
}

val tempRDD =
if (hasLocationPreferences) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right way to allocate hosts to each receiver that takes in to account the preferred locations on some of them isthe following.

  1. Lets say there are 10 receivers, with 2 receivers, SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1 #5 and Remove references to ClusterScheduler (SPARK-1140) #9 has preferred location set.
  2. Lets say there are 25 executors
  3. Maintain an array of type Array[ArrayBuffer[String]] of size 10. Lets call it locations. Each ArrayBuffer is empty at first.
  4. First allocate all the locations of receivers that already have them. So for locations[5] ++= receiver[5].preferredLocation and same for receiver 9.
  5. Then distribute all the executors among all the other receivers. Yes, there can be multiple hosts per receiver. This helps to ensure that if one of the executors fail, the task will use other allocated hosts, and wont end up in an executor that is already running a receiver.

Now you can use makeRDD(receivers, locations), no further conditions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. PreferredLocation set for all receivers or none sounds like a good assumption (either user decides how to schedule or we do). Here is a slightly modified version of the algorithm:

  1. Leave receiver class unmodified and use an external array for storing locations
  2. If location preferences are set (which should be for all receivers or none), goto 4
  3. If num_receivers > num_executors, distribute receivers among executors (as in current PR)
  4. If num_executors > num_receivers, distribute executors among receivers
  5. makeRDD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we want to make as little assumption as possible. Users may want to mix Flume receivers (which may have preferred locations) with Kafka Receivers (without any preferred location). So when we are fixing things, its best to fix it in the right way.

Is it too hard to implement it without that assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is trivial (simpler than what it is at the moment in fact). I'm probably missing something here that is specific to Flume, but wondering if there is a reason for the user to want to set preferredLocation for Flume receivers and not for Kafka receivers in the same app? And if not, would "mixed" scheduling be a design decision we want to make consciously and encourage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the non-polling version of Flume receiver NEEDS a location (so that Flume can push events to that machine), whereas no other sources that we need requires it. We dont need to encourage anything, just good to support it if any one wants to do it. In fact, between the two Flume approaches, the push-based approach referred here is not a good approach, but still is present because the other pull-based polling approach is slightly complicated to set up and may not be feasible for some settings. So we HAVE to support both. For Kafka, we dont need to set preferred location.

val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else if (roundRobin) {
val roundRobinReceivers = receivers.map(r => (r, Seq(r.host.get)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
Expand Down