Skip to content

Commit

Permalink
Avoid shuffle every time we schedule the driver using round robin
Browse files Browse the repository at this point in the history
  • Loading branch information
WangTaoTheTonic committed Sep 6, 2014
1 parent bbc7087 commit bc91bb1
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,17 +481,21 @@ private[spark] class Master(
if (state != RecoveryState.ALIVE) { return }

// First schedule drivers, they take strict precedence over applications
val shuffledAliveWorkers = Random.shuffle(workers.filter(_.state == WorkerState.ALIVE)) // Randomization helps balance drivers
val aliveWorkerNum = shuffledAliveWorkers.size
var curPos = aliveWorkerNum - 1
for (driver <- List(waitingDrivers: _*)) {
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
val shuffledWorkersIter = shuffledWorkers.iterator
val startFlag = curPos
curPos = (curPos + 1) % aliveWorkerNum
var launched = false
while(shuffledWorkersIter.hasNext && !launched) {
val worker = shuffledWorkersIter.next()
if (worker.state == WorkerState.ALIVE && worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
while (curPos != startFlag && !launched) {
val worker = shuffledAliveWorkers(curPos)
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
}
}

Expand Down

0 comments on commit bc91bb1

Please sign in to comment.