Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7988][STREAMING] Round-robin scheduling of receivers by default #6607

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fff1b2e
Round-robin scheduling of streaming receivers
nishkamravi2 Jun 3, 2015
41705de
Update ReceiverTracker.scala
nishkamravi2 Jun 3, 2015
bb5e09b
Add a new var in receiver to store location information for round-rob…
nishkamravi2 Jun 5, 2015
b05ee2f
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
3cac21b
Generalize the scheduling algorithm
nishkamravi2 Jun 5, 2015
975b8d8
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark …
nishkamravi2 Jun 5, 2015
6e3515c
Minor changes
nishkamravi2 Jun 5, 2015
7888257
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
6caeefe
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
07b9dfa
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
02dbdb8
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
45e3a99
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark …
nishkamravi2 Jun 5, 2015
16e84ec
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
4cf97b6
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
f8a3e05
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
7f3e028
Update ReceiverTracker.scala, add unit test cases in SchedulerSuite
nishkamravi2 Jun 8, 2015
242e677
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
179b90f
Update ReceiverTracker.scala
nishkamravi2 Jun 8, 2015
4604f28
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
68e8540
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
bc23907
Update ReceiverTracker.scala
nishkamravi2 Jun 12, 2015
48a4a97
Update ReceiverTracker.scala
nishkamravi2 Jun 12, 2015
ae29152
Update test suite with TD's suggestions
nishkamravi2 Jun 17, 2015
9f1abc2
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 17, 2015
6127e58
Update ReceiverTracker and ReceiverTrackerSuite
nishkamravi2 Jun 23, 2015
f747739
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 23, 2015
1918819
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 23, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.existentials
import scala.math.max
import org.apache.spark.rdd._

import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
Expand Down Expand Up @@ -270,6 +272,44 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Get the list of executors excluding driver
*/
private def getExecutors(ssc: StreamingContext): List[String] = {
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
executors.diff(List(driver))
}

/** Set host location(s) for each receiver so as to distribute them over
* executors in a round-robin fashion taking into account preferredLocation if set
*/
private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]],
executors: List[String]): Array[ArrayBuffer[String]] = {
val locations = new Array[ArrayBuffer[String]](receivers.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need this to be an Array[ArrayBuffer]? Couldn't this just be Array[Seq]? You could just wrap the executor in a Seq when adding it to the array. So you could do locations(i) = Seq(executor(index)) instead of first allocating an ArrayBuffer first etc. Is there a case where a receiver could have more than 1 location specified? I don't really see one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TD suggested adding support for the case when num_executors > num_receivers-- assign multiple executors per receiver for fault tolerance, which is not a bad idea (though probably not common in practice).

if (!executors.isEmpty) {
var i = 0
for (i <- 0 to (receivers.length - 1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cleaner to use 0 until X rather than 0 to X-1

locations(i) = new ArrayBuffer[String]()
if (receivers(i).preferredLocation.isDefined) {
locations(i) += receivers(i).preferredLocation.get
}
}

var count = 0;
for (i <- 0 to (max(receivers.length, executors.length) - 1)) {
if (!receivers(i % receivers.length).preferredLocation.isDefined) {
locations(i % receivers.length) += executors(count)
count += 1;
if (count == executors.length) {
count = 0;
}
}
}
}
locations
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand All @@ -281,18 +321,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
})

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)

Expand All @@ -308,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
supervisor.start()
supervisor.awaitTermination()
}

// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// Get the list of executors and schedule receivers
val executors = getExecutors(ssc)
val locations = scheduleReceivers(receivers, executors)
val tempRDD =
if (locations(0) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what condition will location(0) be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sparkContext.isLocal == true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then its more intuitive to check that directly. If !local, then schedule and makeRDD, otherwise, makeRDD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

location(0) check is all-encompassing (no assumptions made about when it may be true). We can add a comment next to it to clarify that it can be null for local.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be so, but its hard to read and understand the condition (which is why I asked). Also checking for null at location(0) to detect whether an ArrayBuffer was assigned to the position to detect whether very very brittle check and ties the logic deep with the implementation of the scheduleReceiver. If someone changes the implementation of the function to apply a different way to allocate receivers (say, always assign a ArrayBuffer even if it is empty), this may totally break. So this condition makes non-intuitive assumptions about the implementation logic of the scheduleReceiver. This is BAD code design.

We try to design the code as intuitive and modular as possible, so that others can easily contribute. That's the only way to manage a large open source project with so many contributors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, what if there were numerous conditions for which locations(0) could be null, would you enlist them all? It's common practice to do: Obj x = f(); if (x) {do blah}. If we don't want a check on locations(0), the right way would be to return null (or some such) from scheduleReceiver when locations(0) is null. So we can check if(locations) instead of if(locations(0)). Better still, we can check for if(!executors.isEmpty) before invoking scheduleReceiver, so no further check is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, in Scala, we try to not rely on null rather use Option and None. Here a suggestion, which I think is a cleaner design with clean semantics. The makeRDD is designed to take a sequence of (item, locations). If for a item, the location is a empty (not null, just empty seq), then that automatically means there is no preferred location. That's intuitive.

So the scheduleReceiver can designed as follows.

  1. scheduleReceiver always returns a Array[ArrayBuffer[String]] where any of the buffers can be empty, but there are no nulls.
  2. the logic in this location becomes
if (sparkContext is local) {
   // make RDD
} else {
   // schedule receivers
   // make RDD with returned result
}

if there were no executors at that point of time, all the buffers will be empty,
which is perfectly okay to pass on to makeRDD. The code stays simple with only condition, and no matter what the executors are empty or not, it just works.

If you want to be extra careful, you can simply add a check that none of the returned locations are null. That still is just one line and easy to understand code rather than introducing another level of conditions.

How does this sound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. Following, I think, is slightly cleaner:

if(!executors.isEmpty){
scheduleReceivers
make RDD with returned result
}else{
make RDD
}
Avoids the redundant invocation to scheduleReceivers and subsequent memory allocations. Optimizes away the extra logic (check on local) and assumptions about how locations is formatted. If this sounds good, I think we can get the final iteration of this PR going.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM!

val roundRobinReceivers = (0 to (receivers.length - 1)).map(i =>
(receivers(i), locations(i)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver._
import org.apache.spark.util.Utils

/** Testsuite for receiver scheduling */
class SchedulerSuite extends TestSuiteBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this code is in ReceiverTracker, please make this the ReceiverTrackerSuite.

val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test")
val ssc = new StreamingContext(sparkConf, Milliseconds(100))
val tracker = new ReceiverTracker(ssc)
val launcher = new tracker.ReceiverLauncher()

test("receiver scheduling - no preferredLocation") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preferredLocation --> preferred location
(why use camel case in text, unless it refers to a classname?)

val numReceivers = 10;
val receivers = (1 to numReceivers).map(i => new DummyReceiver)
val executors: List[String] = List("Host1", "Host2", "Host3", "Host4", "Host5")
val locations = launcher.scheduleReceivers(receivers, executors)
assert(locations(0)(0) === "Host1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont test the sizes of the location(x) arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems redundant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the scheduling algorithm allocates it to extra nodes that it should not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, adding a couple more checks doesn't hurt, so will do.

assert(locations(4)(0) === "Host5")
assert(locations(5)(0) === "Host1")
assert(locations(9)(0) === "Host5")
}

test("receiver scheduling - no preferredLocation, numExecutors > numReceivers") {
val numReceivers = 3;
val receivers = (1 to numReceivers).map(i => new DummyReceiver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of duplicate code here. You can put all of the duplicate code in a function, say,
def testScheduling(numHosts: Int, numReceivers: Int, receiverToPrefLocations: Map[Int, Int], expectedSchedule: Seq(Seq(Int)))
then you can write all of these unit tests as one-liners

testScheduling(3, 3, Map.empty, Seq(Seq(1), Seq(2), Seq(3))
testScheduling(5, 3, Map.empty, Seq(Seq(1, 2), Seq(3, 4), Seq(5))
testScheduling(2, 3, Map.empty, Seq(Seq(1), Seq(2), Seq())
testScheduling(3, 3, Map(1 -> 3), Seq(Seq(3), Seq(1), Seq(2), )

In fact all of them can be one or two tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes structure brittle, takes away code readability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "structure brittle" ? What structure? The current approach definitely a lot more verbose than what it can be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structure of the four tests, which could evolve separately as opposed to being centralized. The way these tests are written is pretty consistent with the rest of spark test suites, which can be easily read and debugged independently at the cost of slight code duplication. Take a look at JobCancellationSuite for example. What you're suggesting is elegant but adds complexity and makes the code hard to read. If we had a dozen such tests, the verbosity would bother me too. Will try and reduce the number of asserts, I think some of them are redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I am not sure how you claim that they are consistent with rest of test suites. As a counterexample, see RDDSuite. How about this for being easier to read?

testScheduling(numHosts = 3, numReceivers = 3, allocation = " 1 | 2 | 3 ")
testScheduling(numHosts = 5, numReceivers = 3, allocation = " 1, 2 | 3, 4 | 5 ")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you looked at JobCancellationSuite as an example. Anyways, this representation is definitely cleaner, let's try and incorporate it in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad we reached a consensus! Looking forward to the update.

val executors: List[String] = List("Host1", "Host2", "Host3", "Host4", "Host5")
val locations = launcher.scheduleReceivers(receivers, executors)
assert(locations(0)(0) === "Host1")
assert(locations(2)(0) === "Host3")
assert(locations(0)(1) === "Host4")
assert(locations(1)(1) === "Host5")
}

test("receiver scheduling - all have preferredLocation") {
val numReceivers = 5;
val receivers = (1 to numReceivers).map(i => new DummyReceiver(host = Some("Host" + i)))
val executors: List[String] = List("Host1", "Host5", "Host4", "Host3", "Host2")
val locations = launcher.scheduleReceivers(receivers, executors)
assert(locations(1)(0) === "Host2")
assert(locations(4)(0) === "Host5")
}

test("receiver scheduling - some have preferredLocation") {
val numReceivers = 3;
val receivers: Seq[Receiver[_]] = Seq(
new DummyReceiver(host = Some("Host2")),
new DummyReceiver,
new DummyReceiver)
val executors: List[String] = List("Host1", "Host2", "Host3", "Host4", "Host5")
val locations = launcher.scheduleReceivers(receivers, executors)
assert(locations(0)(0) === "Host2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, Host2 is being used twice, while ideally Host2 should be used only once. There are enough hosts for 3 receivers that receiver3 does not need to be allocated to host2 again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the algorithm needs to be tweaked a little bit more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require priority scheduling, mere tweaking will not do. Mixed receiver type in a single app is a corner case scenario, yet to be encountered in practice. Adding support for it (as we have done in this PR) is different from designing the algorithm around it in an attempt to make the corner case performant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, we can address this later. This logic is good for now.

assert(locations(1)(0) === "Host1")
assert(locations(2)(0) === "Host2")
assert(locations(1)(1) === "Host3")
}
}

/**
* Dummy receiver implementation
*/
class DummyReceiver(host: Option[String] = None) extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private class so that its not used outside this class.


def onStart() {
}

def onStop() {
}

override def preferredLocation: Option[String] = host
}