-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7988][STREAMING] Round-robin scheduling of receivers by default #6607
Conversation
@@ -107,8 +107,8 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable | |||
*/ | |||
def onStop() | |||
|
|||
/** Override this to specify a preferred location (hostname). */ | |||
def preferredLocation : Option[String] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this break binary compatibility. This is not feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a new private var in receiver feels like the simplest workaround for the compatibility issue
ok to test |
Test build #34180 has finished for PR 6607 at commit
|
Test build #34236 has finished for PR 6607 at commit
|
// Run the dummy Spark job to ensure that all slaves have registered. | ||
// This avoids all the receivers to be scheduled on the same node. | ||
if (!ssc.sparkContext.isLocal) { | ||
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() | ||
} | ||
|
||
// Right now, we only honor preferences if all receivers have them | ||
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice if this new functionality is put into a function by itself. So that we can unit test it individually to test against different combinations of executors and all, may be different policies in the future.
I added some comments. At a highlevel 2 things are needed
|
…into master_nravi Conflicts: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
Test build #34270 has finished for PR 6607 at commit
|
Test build #34271 has finished for PR 6607 at commit
|
Test build #34273 has finished for PR 6607 at commit
|
Test build #34274 has finished for PR 6607 at commit
|
@@ -17,8 +17,9 @@ | |||
|
|||
package org.apache.spark.streaming.scheduler | |||
|
|||
import scala.collection.mutable.{HashMap, SynchronizedMap} | |||
import scala.collection.mutable.{HashMap, SynchronizedMap, ArrayBuffer} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ordering
This looks pretty good. I left one comment that I felt can improve code readability and makes the logic easier to understand. |
Thanks for the review. Will look into the failing test cases soon. |
Test build #34300 has finished for PR 6607 at commit
|
val locations = new Array[ArrayBuffer[String]](receivers.length) | ||
if (!executors.isEmpty) { | ||
var i = 0 | ||
for (i <- 0 to (receivers.length - 1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cleaner to use 0 until X
rather than 0 to X-1
val executors = getExecutors(ssc) | ||
val locations = scheduleReceivers(receivers, executors) | ||
val tempRDD = | ||
if (locations(0) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what condition will location(0)
be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sparkContext.isLocal == true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then its more intuitive to check that directly. If !local, then schedule and makeRDD, otherwise, makeRDD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
location(0) check is all-encompassing (no assumptions made about when it may be true). We can add a comment next to it to clarify that it can be null for local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be so, but its hard to read and understand the condition (which is why I asked). Also checking for null at location(0) to detect whether an ArrayBuffer was assigned to the position to detect whether very very brittle check and ties the logic deep with the implementation of the scheduleReceiver
. If someone changes the implementation of the function to apply a different way to allocate receivers (say, always assign a ArrayBuffer even if it is empty), this may totally break. So this condition makes non-intuitive assumptions about the implementation logic of the scheduleReceiver. This is BAD code design.
We try to design the code as intuitive and modular as possible, so that others can easily contribute. That's the only way to manage a large open source project with so many contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, what if there were numerous conditions for which locations(0) could be null, would you enlist them all? It's common practice to do: Obj x = f(); if (x) {do blah}. If we don't want a check on locations(0), the right way would be to return null (or some such) from scheduleReceiver when locations(0) is null. So we can check if(locations) instead of if(locations(0)). Better still, we can check for if(!executors.isEmpty) before invoking scheduleReceiver, so no further check is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, in Scala, we try to not rely on null rather use Option and None. Here a suggestion, which I think is a cleaner design with clean semantics. The makeRDD is designed to take a sequence of (item, locations)
. If for a item, the location is a empty (not null, just empty seq), then that automatically means there is no preferred location. That's intuitive.
So the scheduleReceiver
can designed as follows.
scheduleReceiver
always returns a Array[ArrayBuffer[String]] where any of the buffers can be empty, but there are no nulls.- the logic in this location becomes
if (sparkContext is local) {
// make RDD
} else {
// schedule receivers
// make RDD with returned result
}
if there were no executors at that point of time, all the buffers will be empty,
which is perfectly okay to pass on to makeRDD. The code stays simple with only condition, and no matter what the executors are empty or not, it just works.
If you want to be extra careful, you can simply add a check that none of the returned locations are null. That still is just one line and easy to understand code rather than introducing another level of conditions.
How does this sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. Following, I think, is slightly cleaner:
if(!executors.isEmpty){
scheduleReceivers
make RDD with returned result
}else{
make RDD
}
Avoids the redundant invocation to scheduleReceivers and subsequent memory allocations. Optimizes away the extra logic (check on local) and assumptions about how locations is formatted. If this sounds good, I think we can get the final iteration of this PR going.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM!
Test build #35036 has finished for PR 6607 at commit
|
Test build #35037 has finished for PR 6607 at commit
|
This should contain all the changes we have discussed so far. ReceiverTrackerSuite now contains two instead of four tests (intentionally keeping "some preferred location" test separate) |
Test build #35516 has finished for PR 6607 at commit
|
Test build #35518 has finished for PR 6607 at commit
|
} | ||
loc | ||
} | ||
def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add empty line.
Few nits, otherwise good to go. |
Test build #35537 has finished for PR 6607 at commit
|
LGTM! Merging this. Thanks! |
} | ||
} | ||
var count = 0 | ||
for (i <- 0 until max(receivers.length, executors.length)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is max used here ?
receivers.length is not enough ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we want to allocate more executors per receiver so that receiver tasks can failover to other executors, but do not conflict with other running receivers.
Thanks! |
Minimal PR for round-robin scheduling of receivers. Dense scheduling can be enabled by setting preferredLocation, so a new config parameter isn't really needed. Tested this on a cluster of 6 nodes and noticed 20-25% gain in throughput compared to random scheduling.
@tdas @pwendell