Skip to content

Commit

Permalink
[SPARK-13803] restore the changes in SPARK-3411
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch contains the functionality to balance the load of the cluster-mode drivers among workers

This patch restores the changes in #1106 which was erased due to the merging of #731

## How was this patch tested?

test with existing test cases

Author: CodingCat <zhunansjtu@gmail.com>

Closes #11702 from CodingCat/SPARK-13803.

(cherry picked from commit bd5365b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
  • Loading branch information
CodingCat authored and srowen committed Mar 15, 2016
1 parent 7a24d94 commit 58fbdf6
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -683,15 +683,28 @@ private[deploy] class Master(
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
Expand Down

0 comments on commit 58fbdf6

Please sign in to comment.