-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7988][STREAMING] Round-robin scheduling of receivers by default #6607
Changes from 2 commits
fff1b2e
41705de
bb5e09b
b05ee2f
3cac21b
975b8d8
6e3515c
7888257
6caeefe
07b9dfa
02dbdb8
45e3a99
16e84ec
4cf97b6
f8a3e05
7f3e028
242e677
179b90f
4604f28
68e8540
bc23907
48a4a97
ae29152
9f1abc2
6127e58
f747739
1918819
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -270,6 +270,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
} | ||
} | ||
|
||
/** | ||
* Get the list of executors excluding driver | ||
*/ | ||
private def getExecutors(ssc: StreamingContext): List[String] = { | ||
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList | ||
val driver = ssc.sparkContext.getConf.get("spark.driver.host") | ||
executors.diff(List(driver)) | ||
} | ||
|
||
/** | ||
* Get the receivers from the ReceiverInputDStreams, distributes them to the | ||
* worker nodes as a parallel collection, and runs them. | ||
|
@@ -281,18 +290,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
rcvr | ||
}) | ||
|
||
// Right now, we only honor preferences if all receivers have them | ||
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) | ||
|
||
// Create the parallel collection of receivers to distributed them on the worker nodes | ||
val tempRDD = | ||
if (hasLocationPreferences) { | ||
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) | ||
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) | ||
} else { | ||
ssc.sc.makeRDD(receivers, receivers.size) | ||
} | ||
|
||
val checkpointDirOption = Option(ssc.checkpointDir) | ||
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration) | ||
|
||
|
@@ -308,12 +305,37 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
supervisor.start() | ||
supervisor.awaitTermination() | ||
} | ||
|
||
// Run the dummy Spark job to ensure that all slaves have registered. | ||
// This avoids all the receivers to be scheduled on the same node. | ||
if (!ssc.sparkContext.isLocal) { | ||
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() | ||
} | ||
|
||
// Right now, we only honor preferences if all receivers have them | ||
var hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) | ||
|
||
// If no location preferences are specified, set preferredLocation for each receiver | ||
// so as to distribute them evenly over executors in a round-robin fashion | ||
if (!hasLocationPreferences && !ssc.sparkContext.isLocal) { | ||
val executors = getExecutors(ssc) | ||
if (!executors.isEmpty) { | ||
var i = 0; | ||
for (i <- 0 to (receivers.length - 1)) { | ||
receivers(i).preferredLocation = Some(executors(i % executors.length)) | ||
} | ||
hasLocationPreferences = true | ||
} | ||
} | ||
|
||
val tempRDD = | ||
if (hasLocationPreferences) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The right way to allocate hosts to each receiver that takes in to account the preferred locations on some of them isthe following.
Now you can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the review. PreferredLocation set for all receivers or none sounds like a good assumption (either user decides how to schedule or we do). Here is a slightly modified version of the algorithm:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, we want to make as little assumption as possible. Users may want to mix Flume receivers (which may have preferred locations) with Kafka Receivers (without any preferred location). So when we are fixing things, its best to fix it in the right way. Is it too hard to implement it without that assumption? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation is trivial (simpler than what it is at the moment in fact). I'm probably missing something here that is specific to Flume, but wondering if there is a reason for the user to want to set preferredLocation for Flume receivers and not for Kafka receivers in the same app? And if not, would "mixed" scheduling be a design decision we want to make consciously and encourage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the non-polling version of Flume receiver NEEDS a location (so that Flume can push events to that machine), whereas no other sources that we need requires it. We dont need to encourage anything, just good to support it if any one wants to do it. In fact, between the two Flume approaches, the push-based approach referred here is not a good approach, but still is present because the other pull-based polling approach is slightly complicated to set up and may not be feasible for some settings. So we HAVE to support both. For Kafka, we dont need to set preferred location. |
||
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) | ||
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) | ||
} else { | ||
ssc.sc.makeRDD(receivers, receivers.size) | ||
} | ||
|
||
// Distribute the receivers and start them | ||
logInfo("Starting " + receivers.length + " receivers") | ||
running = true | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this break binary compatibility. This is not feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a new private var in receiver feels like the simplest workaround for the compatibility issue