-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7988][STREAMING] Round-robin scheduling of receivers by default #6607
Changes from 22 commits
fff1b2e
41705de
bb5e09b
b05ee2f
3cac21b
975b8d8
6e3515c
7888257
6caeefe
07b9dfa
02dbdb8
45e3a99
16e84ec
4cf97b6
f8a3e05
7f3e028
242e677
179b90f
4604f28
68e8540
bc23907
48a4a97
ae29152
9f1abc2
6127e58
f747739
1918819
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,10 @@ | |
|
||
package org.apache.spark.streaming.scheduler | ||
|
||
import scala.collection.mutable.{HashMap, SynchronizedMap} | ||
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap} | ||
import scala.language.existentials | ||
import scala.math.max | ||
import org.apache.spark.rdd._ | ||
|
||
import org.apache.spark.streaming.util.WriteAheadLogUtils | ||
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException} | ||
|
@@ -270,6 +272,44 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
} | ||
} | ||
|
||
/** | ||
* Get the list of executors excluding driver | ||
*/ | ||
private def getExecutors(ssc: StreamingContext): List[String] = { | ||
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList | ||
val driver = ssc.sparkContext.getConf.get("spark.driver.host") | ||
executors.diff(List(driver)) | ||
} | ||
|
||
/** Set host location(s) for each receiver so as to distribute them over | ||
* executors in a round-robin fashion taking into account preferredLocation if set | ||
*/ | ||
private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]], | ||
executors: List[String]): Array[ArrayBuffer[String]] = { | ||
val locations = new Array[ArrayBuffer[String]](receivers.length) | ||
if (!executors.isEmpty) { | ||
var i = 0 | ||
for (i <- 0 until receivers.length) { | ||
locations(i) = new ArrayBuffer[String]() | ||
if (receivers(i).preferredLocation.isDefined) { | ||
locations(i) += receivers(i).preferredLocation.get | ||
} | ||
} | ||
|
||
var count = 0; | ||
for (i <- 0 until max(receivers.length, executors.length)) { | ||
if (!receivers(i % receivers.length).preferredLocation.isDefined) { | ||
locations(i % receivers.length) += executors(count) | ||
count += 1; | ||
if (count == executors.length) { | ||
count = 0; | ||
} | ||
} | ||
} | ||
} | ||
locations | ||
} | ||
|
||
/** | ||
* Get the receivers from the ReceiverInputDStreams, distributes them to the | ||
* worker nodes as a parallel collection, and runs them. | ||
|
@@ -281,18 +321,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
rcvr | ||
}) | ||
|
||
// Right now, we only honor preferences if all receivers have them | ||
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) | ||
|
||
// Create the parallel collection of receivers to distributed them on the worker nodes | ||
val tempRDD = | ||
if (hasLocationPreferences) { | ||
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) | ||
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) | ||
} else { | ||
ssc.sc.makeRDD(receivers, receivers.size) | ||
} | ||
|
||
val checkpointDirOption = Option(ssc.checkpointDir) | ||
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration) | ||
|
||
|
@@ -308,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
supervisor.start() | ||
supervisor.awaitTermination() | ||
} | ||
|
||
// Run the dummy Spark job to ensure that all slaves have registered. | ||
// This avoids all the receivers to be scheduled on the same node. | ||
if (!ssc.sparkContext.isLocal) { | ||
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() | ||
} | ||
|
||
// Get the list of executors and schedule receivers | ||
val executors = getExecutors(ssc) | ||
val locations = scheduleReceivers(receivers, executors) | ||
val tempRDD = | ||
if (locations(0) != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Under what condition will There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sparkContext.isLocal == true There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then its more intuitive to check that directly. If !local, then schedule and makeRDD, otherwise, makeRDD There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. location(0) check is all-encompassing (no assumptions made about when it may be true). We can add a comment next to it to clarify that it can be null for local. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be so, but its hard to read and understand the condition (which is why I asked). Also checking for null at location(0) to detect whether an ArrayBuffer was assigned to the position to detect whether very very brittle check and ties the logic deep with the implementation of the We try to design the code as intuitive and modular as possible, so that others can easily contribute. That's the only way to manage a large open source project with so many contributors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally speaking, what if there were numerous conditions for which locations(0) could be null, would you enlist them all? It's common practice to do: Obj x = f(); if (x) {do blah}. If we don't want a check on locations(0), the right way would be to return null (or some such) from scheduleReceiver when locations(0) is null. So we can check if(locations) instead of if(locations(0)). Better still, we can check for if(!executors.isEmpty) before invoking scheduleReceiver, so no further check is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First of all, in Scala, we try to not rely on null rather use Option and None. Here a suggestion, which I think is a cleaner design with clean semantics. The makeRDD is designed to take a So the
if there were no executors at that point of time, all the buffers will be empty, If you want to be extra careful, you can simply add a check that none of the returned locations are null. That still is just one line and easy to understand code rather than introducing another level of conditions. How does this sound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fine. Following, I think, is slightly cleaner: if(!executors.isEmpty){ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM! |
||
val roundRobinReceivers = (0 until receivers.length).map(i => | ||
(receivers(i), locations(i))) | ||
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers) | ||
} else { | ||
ssc.sc.makeRDD(receivers, receivers.size) | ||
} | ||
|
||
// Distribute the receivers and start them | ||
logInfo("Starting " + receivers.length + " receivers") | ||
running = true | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.streaming.scheduler | ||
|
||
import org.apache.spark.streaming._ | ||
import org.apache.spark.SparkConf | ||
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.streaming.receiver._ | ||
import org.apache.spark.util.Utils | ||
|
||
/** Testsuite for receiver scheduling */ | ||
class SchedulerSuite extends TestSuiteBase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this code is in ReceiverTracker, please make this the ReceiverTrackerSuite. |
||
val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test") | ||
val ssc = new StreamingContext(sparkConf, Milliseconds(100)) | ||
val tracker = new ReceiverTracker(ssc) | ||
val launcher = new tracker.ReceiverLauncher() | ||
|
||
test("receiver scheduling - no preferredLocation") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. preferredLocation --> preferred location |
||
val numReceivers = 10; | ||
val receivers = (1 to numReceivers).map(i => new DummyReceiver) | ||
val executors: List[String] = List("Host1", "Host2", "Host3", "Host4", "Host5") | ||
val locations = launcher.scheduleReceivers(receivers, executors) | ||
assert(locations(0)(0) === "Host1") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You dont test the sizes of the location(x) arrays. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems redundant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the scheduling algorithm allocates it to extra nodes that it should not? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In any case, adding a couple more checks doesn't hurt, so will do. |
||
assert(locations(4)(0) === "Host5") | ||
assert(locations(5)(0) === "Host1") | ||
assert(locations(9)(0) === "Host5") | ||
} | ||
|
||
test("receiver scheduling - no preferredLocation, numExecutors > numReceivers") { | ||
val numReceivers = 3; | ||
val receivers = (1 to numReceivers).map(i => new DummyReceiver) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a lot of duplicate code here. You can put all of the duplicate code in a function, say,
In fact all of them can be one or two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes structure brittle, takes away code readability There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean "structure brittle" ? What structure? The current approach definitely a lot more verbose than what it can be. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Structure of the four tests, which could evolve separately as opposed to being centralized. The way these tests are written is pretty consistent with the rest of spark test suites, which can be easily read and debugged independently at the cost of slight code duplication. Take a look at JobCancellationSuite for example. What you're suggesting is elegant but adds complexity and makes the code hard to read. If we had a dozen such tests, the verbosity would bother me too. Will try and reduce the number of asserts, I think some of them are redundant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well I am not sure how you claim that they are consistent with rest of test suites. As a counterexample, see RDDSuite. How about this for being easier to read?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if you looked at JobCancellationSuite as an example. Anyways, this representation is definitely cleaner, let's try and incorporate it in the PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Glad we reached a consensus! Looking forward to the update. |
||
val executors: List[String] = List("Host1", "Host2", "Host3", "Host4", "Host5") | ||
val locations = launcher.scheduleReceivers(receivers, executors) | ||
assert(locations(0)(0) === "Host1") | ||
assert(locations(2)(0) === "Host3") | ||
assert(locations(0)(1) === "Host4") | ||
assert(locations(1)(1) === "Host5") | ||
} | ||
|
||
test("receiver scheduling - all have preferredLocation") { | ||
val numReceivers = 5; | ||
val receivers = (1 to numReceivers).map(i => new DummyReceiver(host = Some("Host" + i))) | ||
val executors: List[String] = List("Host1", "Host5", "Host4", "Host3", "Host2") | ||
val locations = launcher.scheduleReceivers(receivers, executors) | ||
assert(locations(1)(0) === "Host2") | ||
assert(locations(4)(0) === "Host5") | ||
} | ||
|
||
test("receiver scheduling - some have preferredLocation") { | ||
val numReceivers = 3; | ||
val receivers: Seq[Receiver[_]] = Seq( | ||
new DummyReceiver(host = Some("Host2")), | ||
new DummyReceiver, | ||
new DummyReceiver) | ||
val executors: List[String] = List("Host1", "Host2", "Host3", "Host4", "Host5") | ||
val locations = launcher.scheduleReceivers(receivers, executors) | ||
assert(locations(0)(0) === "Host2") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, Host2 is being used twice, while ideally Host2 should be used only once. There are enough hosts for 3 receivers that receiver3 does not need to be allocated to host2 again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the algorithm needs to be tweaked a little bit more There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would require priority scheduling, mere tweaking will not do. Mixed receiver type in a single app is a corner case scenario, yet to be encountered in practice. Adding support for it (as we have done in this PR) is different from designing the algorithm around it in an attempt to make the corner case performant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, we can address this later. This logic is good for now. |
||
assert(locations(1)(0) === "Host1") | ||
assert(locations(2)(0) === "Host2") | ||
assert(locations(1)(1) === "Host3") | ||
} | ||
} | ||
|
||
/** | ||
* Dummy receiver implementation | ||
*/ | ||
class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this private class so that its not used outside this class. |
||
|
||
def onStart() { | ||
} | ||
|
||
def onStop() { | ||
} | ||
|
||
override def preferredLocation: Option[String] = host | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need this to be an
Array[ArrayBuffer]
? Couldn't this just beArray[Seq]
? You could just wrap the executor in aSeq
when adding it to the array. So you could dolocations(i) = Seq(executor(index))
instead of first allocating anArrayBuffer
first etc. Is there a case where a receiver could have more than 1 location specified? I don't really see one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TD suggested adding support for the case when num_executors > num_receivers-- assign multiple executors per receiver for fault tolerance, which is not a bad idea (though probably not common in practice).