Skip to content

Commit

Permalink
[SPARK-7988] [STREAMING] Round-robin scheduling of receivers by default
Browse files Browse the repository at this point in the history
Minimal PR for round-robin scheduling of receivers. Dense scheduling can be enabled by setting preferredLocation, so a new config parameter isn't really needed. Tested this on a cluster of 6 nodes and noticed 20-25% gain in throughput compared to random scheduling.

tdas pwendell

Author: nishkamravi2 <nishkamravi@gmail.com>
Author: Nishkam Ravi <nravi@cloudera.com>

Closes #6607 from nishkamravi2/master_nravi and squashes the following commits:

1918819 [Nishkam Ravi] Update ReceiverTrackerSuite.scala
f747739 [Nishkam Ravi] Update ReceiverTrackerSuite.scala
6127e58 [Nishkam Ravi] Update ReceiverTracker and ReceiverTrackerSuite
9f1abc2 [nishkamravi2] Update ReceiverTrackerSuite.scala
ae29152 [Nishkam Ravi] Update test suite with TD's suggestions
48a4a97 [nishkamravi2] Update ReceiverTracker.scala
bc23907 [nishkamravi2] Update ReceiverTracker.scala
68e8540 [nishkamravi2] Update SchedulerSuite.scala
4604f28 [nishkamravi2] Update SchedulerSuite.scala
179b90f [nishkamravi2] Update ReceiverTracker.scala
242e677 [nishkamravi2] Update SchedulerSuite.scala
7f3e028 [Nishkam Ravi] Update ReceiverTracker.scala, add unit test cases in SchedulerSuite
f8a3e05 [nishkamravi2] Update ReceiverTracker.scala
4cf97b6 [nishkamravi2] Update ReceiverTracker.scala
16e84ec [Nishkam Ravi] Update ReceiverTracker.scala
45e3a99 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi
02dbdb8 [Nishkam Ravi] Update ReceiverTracker.scala
07b9dfa [nishkamravi2] Update ReceiverTracker.scala
6caeefe [nishkamravi2] Update ReceiverTracker.scala
7888257 [nishkamravi2] Update ReceiverTracker.scala
6e3515c [Nishkam Ravi] Minor changes
975b8d8 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi
3cac21b [Nishkam Ravi] Generalize the scheduling algorithm
b05ee2f [nishkamravi2] Update ReceiverTracker.scala
bb5e09b [Nishkam Ravi] Add a new var in receiver to store location information for round-robin scheduling
41705de [nishkamravi2] Update ReceiverTracker.scala
fff1b2e [Nishkam Ravi] Round-robin scheduling of streaming receivers
  • Loading branch information
nishkamravi2 authored and tdas committed Jun 30, 2015
1 parent 9213f73 commit ca7e460
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.existentials
import scala.math.max
import org.apache.spark.rdd._

import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SparkEnv, SparkException}
Expand Down Expand Up @@ -272,6 +274,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Get the list of executors excluding driver
*/
private def getExecutors(ssc: StreamingContext): List[String] = {
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
executors.diff(List(driver))
}

/** Set host location(s) for each receiver so as to distribute them over
* executors in a round-robin fashion taking into account preferredLocation if set
*/
private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]],
executors: List[String]): Array[ArrayBuffer[String]] = {
val locations = new Array[ArrayBuffer[String]](receivers.length)
var i = 0
for (i <- 0 until receivers.length) {
locations(i) = new ArrayBuffer[String]()
if (receivers(i).preferredLocation.isDefined) {
locations(i) += receivers(i).preferredLocation.get
}
}
var count = 0
for (i <- 0 until max(receivers.length, executors.length)) {
if (!receivers(i % receivers.length).preferredLocation.isDefined) {
locations(i % receivers.length) += executors(count)
count += 1
if (count == executors.length) {
count = 0
}
}
}
locations
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand All @@ -283,18 +320,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
})

// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
Expand All @@ -311,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
supervisor.start()
supervisor.awaitTermination()
}

// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}

// Get the list of executors and schedule receivers
val executors = getExecutors(ssc)
val tempRDD =
if (!executors.isEmpty) {
val locations = scheduleReceivers(receivers, executors)
val roundRobinReceivers = (0 until receivers.length).map(i =>
(receivers(i), locations(i)))
ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver._
import org.apache.spark.util.Utils

/** Testsuite for receiver scheduling */
class ReceiverTrackerSuite extends TestSuiteBase {
val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test")
val ssc = new StreamingContext(sparkConf, Milliseconds(100))
val tracker = new ReceiverTracker(ssc)
val launcher = new tracker.ReceiverLauncher()
val executors: List[String] = List("0", "1", "2", "3")

test("receiver scheduling - all or none have preferred location") {

def parse(s: String): Array[Array[String]] = {
val outerSplit = s.split("\\|")
val loc = new Array[Array[String]](outerSplit.length)
var i = 0
for (i <- 0 until outerSplit.length) {
loc(i) = outerSplit(i).split("\\,")
}
loc
}

def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) {
val receivers =
if (preferredLocation) {
Array.tabulate(numReceivers)(i => new DummyReceiver(host =
Some(((i + 1) % executors.length).toString)))
} else {
Array.tabulate(numReceivers)(_ => new DummyReceiver)
}
val locations = launcher.scheduleReceivers(receivers, executors)
val expectedLocations = parse(allocation)
assert(locations.deep === expectedLocations.deep)
}

testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0")
testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2")
testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0")
}

test("receiver scheduling - some have preferred location") {
val numReceivers = 4;
val receivers: Seq[Receiver[_]] = Seq(new DummyReceiver(host = Some("1")),
new DummyReceiver, new DummyReceiver, new DummyReceiver)
val locations = launcher.scheduleReceivers(receivers, executors)
assert(locations(0)(0) === "1")
assert(locations(1)(0) === "0")
assert(locations(2)(0) === "1")
assert(locations(0).length === 1)
assert(locations(3).length === 1)
}
}

/**
* Dummy receiver implementation
*/
private class DummyReceiver(host: Option[String] = None)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) {

def onStart() {
}

def onStop() {
}

override def preferredLocation: Option[String] = host
}

0 comments on commit ca7e460

Please sign in to comment.