Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7988][STREAMING] Round-robin scheduling of receivers by default #6607

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fff1b2e
Round-robin scheduling of streaming receivers
nishkamravi2 Jun 3, 2015
41705de
Update ReceiverTracker.scala
nishkamravi2 Jun 3, 2015
bb5e09b
Add a new var in receiver to store location information for round-rob…
nishkamravi2 Jun 5, 2015
b05ee2f
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
3cac21b
Generalize the scheduling algorithm
nishkamravi2 Jun 5, 2015
975b8d8
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark …
nishkamravi2 Jun 5, 2015
6e3515c
Minor changes
nishkamravi2 Jun 5, 2015
7888257
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
6caeefe
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
07b9dfa
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
02dbdb8
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
45e3a99
Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark …
nishkamravi2 Jun 5, 2015
16e84ec
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
4cf97b6
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
f8a3e05
Update ReceiverTracker.scala
nishkamravi2 Jun 5, 2015
7f3e028
Update ReceiverTracker.scala, add unit test cases in SchedulerSuite
nishkamravi2 Jun 8, 2015
242e677
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
179b90f
Update ReceiverTracker.scala
nishkamravi2 Jun 8, 2015
4604f28
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
68e8540
Update SchedulerSuite.scala
nishkamravi2 Jun 8, 2015
bc23907
Update ReceiverTracker.scala
nishkamravi2 Jun 12, 2015
48a4a97
Update ReceiverTracker.scala
nishkamravi2 Jun 12, 2015
ae29152
Update test suite with TD's suggestions
nishkamravi2 Jun 17, 2015
9f1abc2
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 17, 2015
6127e58
Update ReceiverTracker and ReceiverTrackerSuite
nishkamravi2 Jun 23, 2015
f747739
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 23, 2015
1918819
Update ReceiverTrackerSuite.scala
nishkamravi2 Jun 23, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.existentials
import scala.math.max
import org.apache.spark.rdd._

import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
Expand Down Expand Up @@ -270,6 +272,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Get the list of executors excluding driver
*/
private def getExecutors(ssc: StreamingContext): List[String] = {
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
executors.diff(List(driver))
}

/** Set host location(s) for each receiver so as to distribute them over
* executors in a round-robin fashion taking into account preferredLocation if set
*/
private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]],
executors: List[String]): Array[ArrayBuffer[String]] = {
val locations = new Array[ArrayBuffer[String]](receivers.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need this to be an Array[ArrayBuffer]? Couldn't this just be Array[Seq]? You could just wrap the executor in a Seq when adding it to the array. So you could do locations(i) = Seq(executor(index)) instead of first allocating an ArrayBuffer first etc. Is there a case where a receiver could have more than 1 location specified? I don't really see one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TD suggested adding support for the case when num_executors > num_receivers-- assign multiple executors per receiver for fault tolerance, which is not a bad idea (though probably not common in practice).

var i = 0
for (i <- 0 until receivers.length) {
locations(i) = new ArrayBuffer[String]()
if (receivers(i).preferredLocation.isDefined) {
locations(i) += receivers(i).preferredLocation.get
}
}
var count = 0
for (i <- 0 until max(receivers.length, executors.length)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is max used here ?
receivers.length is not enough ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to allocate more executors per receiver so that receiver tasks can failover to other executors, but do not conflict with other running receivers.

if (!receivers(i % receivers.length).preferredLocation.isDefined) {
locations(i % receivers.length) += executors(count)
count += 1
if (count == executors.length) {
count = 0
}
}
}
locations
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand All @@ -281,18 +318,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
})

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)

Expand All @@ -308,12 +333,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
supervisor.start()
supervisor.awaitTermination()
}

// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// Get the list of executors and schedule receivers
val executors = getExecutors(ssc)
val tempRDD =
if (!executors.isEmpty) {
val locations = scheduleReceivers(receivers, executors)
val roundRobinReceivers = (0 until receivers.length).map(i =>
(receivers(i), locations(i)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver._
import org.apache.spark.util.Utils

/** Testsuite for receiver scheduling */
class ReceiverTrackerSuite extends TestSuiteBase {
val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test")
val ssc = new StreamingContext(sparkConf, Milliseconds(100))
val tracker = new ReceiverTracker(ssc)
val launcher = new tracker.ReceiverLauncher()
val executors: List[String] = List("0", "1", "2", "3")

test("receiver scheduling - all or none have preferred location") {

def parse(s: String): Array[Array[String]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line.

val outerSplit = s.split("\\|")
val loc = new Array[Array[String]](outerSplit.length)
var i = 0
for (i <- 0 until outerSplit.length) {
loc(i) = outerSplit(i).split("\\,")
}
loc
}

def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line.

val receivers =
if (preferredLocation) {
Array.tabulate(numReceivers)(i => new DummyReceiver(host =
Some(((i + 1) % executors.length).toString)))
} else {
Array.tabulate(numReceivers)(_ => new DummyReceiver)
}
val locations = launcher.scheduleReceivers(receivers, executors)
val expectedLocations = parse(allocation)
assert(locations.deep === expectedLocations.deep)
}

testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0")
testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2")
testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0")
}

test("receiver scheduling - some have preferred location") {
val numReceivers = 4;
val receivers: Seq[Receiver[_]] = Seq(new DummyReceiver(host = Some("1")),
new DummyReceiver, new DummyReceiver, new DummyReceiver)
val locations = launcher.scheduleReceivers(receivers, executors)
assert(locations(0)(0) === "1")
assert(locations(1)(0) === "0")
assert(locations(2)(0) === "1")
assert(locations(0).length === 1)
assert(locations(3).length === 1)
}
}

/**
* Dummy receiver implementation
*/
private class DummyReceiver(host: Option[String] = None)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {

def onStart() {
}

def onStop() {
}

override def preferredLocation: Option[String] = host
}