From 45d568c721bb5ea638d2903deec1c0777baf6a04 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 18 Oct 2016 18:20:48 -0700 Subject: [PATCH 1/9] TaskAssigner to support different scheduling algorithms --- .../spark/internal/config/package.scala | 5 + .../apache/spark/scheduler/TaskAssigner.scala | 233 ++++++++++++++++++ .../spark/scheduler/TaskSchedulerImpl.scala | 39 ++- .../scheduler/TaskSchedulerImplSuite.scala | 82 +++++- docs/configuration.md | 13 + 5 files changed, 350 insertions(+), 22 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2951bdc18bc57..c6aecae7fac8d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -223,4 +223,9 @@ package object config { " bigger files.") .longConf .createWithDefault(4 * 1024 * 1024) + + private[spark] val SPARK_SCHEDULER_TASK_ASSIGNER = ConfigBuilder("spark.scheduler.taskAssigner") + .doc("The task assigner (roundrobin, packed, balanced) to schedule tasks on workers.") + .stringConf + .createWithDefault("roundrobin") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala new file mode 100644 index 0000000000000..d6184d318190b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** Tracking the current state of the workers with available cores and assigned task list. */ +class OfferState(val workOffer: WorkerOffer) { + // The current remaining cores that can be allocated to tasks. + var coresAvailable: Int = workOffer.cores + // The list of tasks that are assigned to this worker. + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested + * to perform task assignment given available workers, first sorts the candidate tasksets, + * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * assignment with different the locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to + * initialize the its internal worker states at the beginning of resource offering. Before each + * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to + * initialize the data structure for the round. When performing real task assignment, + * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve + * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify + * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for + * the next request. After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information, and eventually, invokes reset() method so that + * TaskAssigner can cleanup its internal maintained resources. + */ + +private[scheduler] abstract class TaskAssigner { + var offer: Seq[OfferState] = _ + var CPUS_PER_TASK = 1 + + def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { + this.CPUS_PER_TASK = CPUS_PER_TASK + } + + // The final assigned offer returned to TaskScheduler. + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // Invoked at the beginning of resource offering to construct the offer with the workoffers. + def construct(workOffer: Seq[WorkerOffer]): Unit = { + offer = workOffer.map(o => new OfferState(o)) + } + + // Invoked at each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Whether there is offer available to be used inside of one round of Taskset assignment. + def hasNext: Boolean + + // Returned the next assigned offer based on the task assignment strategy. + def getNext(): OfferState + + // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + // the assigner can decide whether the current worker is valid for the next offering. + def offerAccepted(assigned: Boolean): Unit + + // Invoked at the end of resource offering to release internally maintained resources. + // Subclass is responsible to release its own private resources. + def reset(): Unit = { + offer = null + } +} + +object TaskAssigner extends Logging { + private val roundrobin = classOf[RoundRobinAssigner].getCanonicalName + private val packed = classOf[PackedAssigner].getCanonicalName + private val balanced = classOf[BalancedAssigner].getCanonicalName + private val assignerMap: Map[String, String] = + Map("roundrobin" -> roundrobin, + "packed" -> packed, + "balanced" -> balanced) + + def init(conf: SparkConf): TaskAssigner = { + val assignerName = conf.get(config.SPARK_SCHEDULER_TASK_ASSIGNER.key, "roundrobin") + .toLowerCase() + val className = assignerMap.getOrElse(assignerName, roundrobin) + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + val assigner = try { + logInfo(s"Constructing an assigner as $className") + Utils.classForName(className).getConstructor() + .newInstance().asInstanceOf[TaskAssigner] + } catch { + case _: Throwable => + logInfo(s"$assignerName cannot be constructed, fallback to default $roundrobin.") + new RoundRobinAssigner() + } + assigner.withCpuPerTask(CPUS_PER_TASK) + assigner + } +} + +/** + * Assign the task to workers with available cores in roundrobin manner. + */ +class RoundRobinAssigner extends TaskAssigner { + private var idx = 0 + + override def construct(workOffer: Seq[WorkerOffer]): Unit = { + offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + override def init(): Unit = { + idx = 0 + } + + override def hasNext: Boolean = idx < offer.size + + override def getNext(): OfferState = { + offer(idx) + } + + override def offerAccepted(assigned: Boolean): Unit = { + idx += 1 + } + + override def reset(): Unit = { + super.reset + idx = 0 + } +} + +/** + * Assign the task to workers with the most available cores. + */ +class BalancedAssigner extends TaskAssigner { + private var maxHeap: PriorityQueue[OfferState] = _ + private var currentOffer: OfferState = _ + + override def construct(workOffer: Seq[WorkerOffer]): Unit = { + offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + implicit val ord: Ordering[OfferState] = new Ordering[OfferState] { + def compare(x: OfferState, y: OfferState): Int = { + return Ordering[Int].compare(x.coresAvailable, y.coresAvailable) + } + } + + override def init(): Unit = { + maxHeap = new PriorityQueue[OfferState]() + offer.filter(_.coresAvailable >= CPUS_PER_TASK).foreach(maxHeap.enqueue(_)) + } + + override def hasNext: Boolean = maxHeap.nonEmpty + + override def getNext(): OfferState = { + currentOffer = maxHeap.dequeue() + currentOffer + } + + override def offerAccepted(assigned: Boolean): Unit = { + if (currentOffer.coresAvailable >= CPUS_PER_TASK && assigned) { + maxHeap.enqueue(currentOffer) + } + } + + override def reset(): Unit = { + super.reset + maxHeap = null + currentOffer = null + } +} + +/** + * Assign the task to workers with the least available cores. In other words, PackedAssigner tries + * to schedule tasks to fewer workers. As a result, there will be idle workers without any tasks + * assigned if more than required workers are reserved. If the dynamic allocator is enabled, + * these idle workers will be released by driver. The released resources can then be allocated to + * other jobs by underling resource manager. This assigner can potentially reduce the resource + * reservation for a job. + */ +class PackedAssigner extends TaskAssigner { + private var sorted: Seq[OfferState] = _ + private var idx = 0 + private var currentOffer: OfferState = _ + + override def init(): Unit = { + idx = 0 + sorted = offer.filter(_.coresAvailable >= CPUS_PER_TASK).sortBy(_.coresAvailable) + } + + override def hasNext: Boolean = idx < sorted.size + + override def getNext(): OfferState = { + currentOffer = sorted(idx) + currentOffer + } + + override def offerAccepted(assigned: Boolean): Unit = { + if (currentOffer.coresAvailable < CPUS_PER_TASK || !assigned) { + idx += 1 + } + } + + override def reset(): Unit = { + super.reset + sorted = null + currentOffer = null + idx = 0 + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3e3f1ad031e66..80a9e67ad4986 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -22,9 +22,7 @@ import java.util.{Timer, TimerTask} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong -import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) val conf = sc.conf - + private val taskAssigner: TaskAssigner = TaskAssigner.init(conf) // How often to check for speculative tasks val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") @@ -250,24 +248,26 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false - for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { + taskAssigner.init() + while(taskAssigner.hasNext) { + var assigned = false + val currentOffer = taskAssigner.getNext() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - tasks(i) += task + currentOffer.tasks += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) + currentOffer.coresAvailable -= CPUS_PER_TASK + assert(currentOffer.coresAvailable >= 0) launchedTask = true + assigned = true } } catch { case e: TaskNotSerializableException => @@ -277,6 +277,7 @@ private[spark] class TaskSchedulerImpl( return launchedTask } } + taskAssigner.offerAccepted(assigned) } return launchedTask } @@ -305,12 +306,8 @@ private[spark] class TaskSchedulerImpl( hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } + taskAssigner.construct(offers) - // Randomly shuffle offers to avoid always placing tasks on the same set of workers. - val shuffledOffers = Random.shuffle(offers) - // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) - val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -329,7 +326,7 @@ private[spark] class TaskSchedulerImpl( for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( - taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) + taskSet, currentMaxLocality, taskAssigner) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } @@ -337,10 +334,12 @@ private[spark] class TaskSchedulerImpl( taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } - + val tasks = taskAssigner.tasks + taskAssigner.reset() if (tasks.size > 0) { hasLaunchedTask = true } + return tasks } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f5f1947661d9a..824c3851fc380 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -85,8 +85,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler } - test("Scheduler does not always schedule tasks on the same workers") { - val taskScheduler = setupScheduler() + private def roundrobin(taskScheduler: TaskSchedulerImpl): Unit = { val numFreeCores = 1 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), new WorkerOffer("executor1", "host1", numFreeCores)) @@ -109,6 +108,85 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { + val taskScheduler = setupScheduler() + roundrobin(taskScheduler) + } + + test("User can specify the roundrobin task assigner") { + val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) + roundrobin(taskScheduler) + } + + test("Fallback to roundrobin when the task assigner provided is not valid") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") + roundrobin(taskScheduler) + } + + test("Scheduler balance the assignment to the worker with more free cores") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "BaLanceD") + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), + new WorkerOffer("executor1", "host1", 4)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(2) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(2 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(1).executorId) + assert(count == 2) + assert(!failedTaskSet) + } + + test("Scheduler balance the assignment across workers with same free cores") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "balanced") + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), + new WorkerOffer("executor1", "host1", 2)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(2) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(2 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(1).executorId) + assert(count == 1) + assert(!failedTaskSet) + } + + test("Scheduler packs the assignment to workers with less free cores") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "paCkeD") + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), + new WorkerOffer("executor1", "host1", 4)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(2) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(2 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) + assert(count == 2) + assert(!failedTaskSet) + } + + test("Scheduler keeps packing the assignment to the same worker") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "packed") + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4), + new WorkerOffer("executor1", "host1", 4)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(4) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) + assert(count == 4) + assert(!failedTaskSet) + } + test("Scheduler correctly accounts for multiple CPUs per task") { val taskCpus = 2 val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) diff --git a/docs/configuration.md b/docs/configuration.md index a3b4ff01e6d92..798d8191d4f21 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1374,6 +1374,19 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + + The strategy of how to allocate tasks among workers with free cores. By default, roundrobin + with randomness is used, which tries to allocate task to workers with available cores in + roundrobin manner. In addition, packed and balanced is provided. The former tries to + allocate tasks to workers with the least free cores, resulting in tasks assigned to few + workers, which may help driver to release the reserved idle workers when dynamic + (spark.dynamicAllocation.enabled) is enabled. The latter tries to assign tasks across workers + in a balance way (allocating tasks to workers with most free cores). + + #### Dynamic Allocation From 7e5ec1e8a440f91cf4d45a205af818a361a3431d Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 19 Oct 2016 14:19:27 -0700 Subject: [PATCH 2/9] solve review comments --- .../apache/spark/scheduler/TaskAssigner.scala | 177 +++++++++--------- .../spark/scheduler/TaskSchedulerImpl.scala | 10 +- .../scheduler/TaskSchedulerImplSuite.scala | 18 +- docs/configuration.md | 13 +- 4 files changed, 110 insertions(+), 108 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index d6184d318190b..c2c46e644d387 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -25,11 +25,11 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.SparkConf import org.apache.spark.util.Utils -/** Tracking the current state of the workers with available cores and assigned task list. */ +/** Tracks the current state of the workers with available cores and assigned task list. */ class OfferState(val workOffer: WorkerOffer) { - // The current remaining cores that can be allocated to tasks. + /** The current remaining cores that can be allocated to tasks. */ var coresAvailable: Int = workOffer.cores - // The list of tasks that are assigned to this worker. + /** The list of tasks that are assigned to this WorkerOffer. */ val tasks = new ArrayBuffer[TaskDescription](coresAvailable) } @@ -37,60 +37,70 @@ class OfferState(val workOffer: WorkerOffer) { * TaskAssigner is the base class for all task assigner implementations, and can be * extended to implement different task scheduling algorithms. * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner - * is used to assign tasks to workers with available cores. Internally, TaskScheduler, requested - * to perform task assignment given available workers, first sorts the candidate tasksets, + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * perform task assignment given available workers, it first sorts the candidate tasksets, * and then for each taskset, it takes a number of rounds to request TaskAssigner for task - * assignment with different the locality restrictions until there is either no qualified + * assignment with different locality restrictions until there is either no qualified * workers or no valid tasks to be assigned. * * TaskAssigner is responsible to maintain the worker availability state and task assignment * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] - * and TaskAssigner is as follows. First, TaskScheduler invokes construct() of TaskAssigner to - * initialize the its internal worker states at the beginning of resource offering. Before each - * round of task assignment for a taskset, TaskScheduler invoke the init() of TaskAssigner to - * initialize the data structure for the round. When performing real task assignment, - * hasNext()/getNext() is used by TaskScheduler to check the worker availability and retrieve - * current offering from TaskAssigner. Then offerAccepted is used by TaskScheduler to notify - * the TaskAssigner so that TaskAssigner can decide whether the current offer is valid or not for - * the next request. After task assignment is done, TaskScheduler invokes the tasks() to - * retrieve all the task assignment information, and eventually, invokes reset() method so that - * TaskAssigner can cleanup its internal maintained resources. + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * retrieve all the task assignment information. */ private[scheduler] abstract class TaskAssigner { - var offer: Seq[OfferState] = _ - var CPUS_PER_TASK = 1 + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 - def withCpuPerTask(CPUS_PER_TASK: Int): Unit = { - this.CPUS_PER_TASK = CPUS_PER_TASK + protected def withCpuPerTask(cpuPerTask: Int): Unit = { + this.cpuPerTask = cpuPerTask } - // The final assigned offer returned to TaskScheduler. + /** The final assigned offer returned to TaskScheduler. */ final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) - // Invoked at the beginning of resource offering to construct the offer with the workoffers. + /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ def construct(workOffer: Seq[WorkerOffer]): Unit = { offer = workOffer.map(o => new OfferState(o)) } - // Invoked at each round of Taskset assignment to initialize the internal structure. + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ def init(): Unit - // Whether there is offer available to be used inside of one round of Taskset assignment. + /** + * Tests Whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ def hasNext: Boolean - // Returned the next assigned offer based on the task assignment strategy. - def getNext(): OfferState - - // Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that - // the assigner can decide whether the current worker is valid for the next offering. + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior otherwise. + */ + def next(): OfferState + + /** + * Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + * the assigner can decide whether the current worker is valid for the next offering. + */ def offerAccepted(assigned: Boolean): Unit - - // Invoked at the end of resource offering to release internally maintained resources. - // Subclass is responsible to release its own private resources. - def reset(): Unit = { - offer = null - } } object TaskAssigner extends Logging { @@ -104,19 +114,18 @@ object TaskAssigner extends Logging { def init(conf: SparkConf): TaskAssigner = { val assignerName = conf.get(config.SPARK_SCHEDULER_TASK_ASSIGNER.key, "roundrobin") - .toLowerCase() - val className = assignerMap.getOrElse(assignerName, roundrobin) - val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) - val assigner = try { - logInfo(s"Constructing an assigner as $className") - Utils.classForName(className).getConstructor() - .newInstance().asInstanceOf[TaskAssigner] - } catch { - case _: Throwable => - logInfo(s"$assignerName cannot be constructed, fallback to default $roundrobin.") - new RoundRobinAssigner() + val className = { + val name = assignerMap.get(assignerName.toLowerCase()) + name.getOrElse { + logWarning(s"$assignerName cannot be constructed, fallback to default $roundrobin.") + roundrobin + } } - assigner.withCpuPerTask(CPUS_PER_TASK) + // The className is valid. No need to catch exceptions. + logInfo(s"Constructing TaskAssigner as $className") + val assigner = Utils.classForName(className) + .getConstructor().newInstance().asInstanceOf[TaskAssigner] + assigner.withCpuPerTask(cpuPerTask = conf.getInt("spark.task.cpus", 1)) assigner } } @@ -125,72 +134,63 @@ object TaskAssigner extends Logging { * Assign the task to workers with available cores in roundrobin manner. */ class RoundRobinAssigner extends TaskAssigner { - private var idx = 0 + private var currentOfferIndex = 0 override def construct(workOffer: Seq[WorkerOffer]): Unit = { offer = Random.shuffle(workOffer.map(o => new OfferState(o))) } override def init(): Unit = { - idx = 0 + currentOfferIndex = 0 } - override def hasNext: Boolean = idx < offer.size + override def hasNext: Boolean = currentOfferIndex < offer.size - override def getNext(): OfferState = { - offer(idx) + override def next(): OfferState = { + offer(currentOfferIndex) } override def offerAccepted(assigned: Boolean): Unit = { - idx += 1 - } - - override def reset(): Unit = { - super.reset - idx = 0 + currentOfferIndex += 1 } } /** - * Assign the task to workers with the most available cores. + * Assign the task to workers with the most available cores. It other words, BalancedAssigner tries + * to distribute the task across workers in a balanced way. Potentially, it may alleviate the + * workers' memory pressure as less tasks running on the same workers, which also indicates that + * the task itself can make use of more computation resources, e.g., hyper-thread, across clusters. */ class BalancedAssigner extends TaskAssigner { - private var maxHeap: PriorityQueue[OfferState] = _ - private var currentOffer: OfferState = _ - - override def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = Random.shuffle(workOffer.map(o => new OfferState(o))) - } - implicit val ord: Ordering[OfferState] = new Ordering[OfferState] { def compare(x: OfferState, y: OfferState): Int = { return Ordering[Int].compare(x.coresAvailable, y.coresAvailable) } } + private val maxHeap: PriorityQueue[OfferState] = new PriorityQueue[OfferState]() + private var currentOffer: OfferState = _ + + override def construct(workOffer: Seq[WorkerOffer]): Unit = { + offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } override def init(): Unit = { - maxHeap = new PriorityQueue[OfferState]() - offer.filter(_.coresAvailable >= CPUS_PER_TASK).foreach(maxHeap.enqueue(_)) + maxHeap.clear() + offer.filter(_.coresAvailable >= cpuPerTask).foreach(maxHeap.enqueue(_)) } override def hasNext: Boolean = maxHeap.nonEmpty - override def getNext(): OfferState = { + override def next(): OfferState = { currentOffer = maxHeap.dequeue() currentOffer } override def offerAccepted(assigned: Boolean): Unit = { - if (currentOffer.coresAvailable >= CPUS_PER_TASK && assigned) { + if (currentOffer.coresAvailable >= cpuPerTask && assigned) { maxHeap.enqueue(currentOffer) } } - - override def reset(): Unit = { - super.reset - maxHeap = null - currentOffer = null - } } /** @@ -199,35 +199,28 @@ class BalancedAssigner extends TaskAssigner { * assigned if more than required workers are reserved. If the dynamic allocator is enabled, * these idle workers will be released by driver. The released resources can then be allocated to * other jobs by underling resource manager. This assigner can potentially reduce the resource - * reservation for a job. + * reservation for jobs, which over allocate resources than they need. */ class PackedAssigner extends TaskAssigner { private var sorted: Seq[OfferState] = _ - private var idx = 0 + private var currentOfferIndex = 0 private var currentOffer: OfferState = _ override def init(): Unit = { - idx = 0 - sorted = offer.filter(_.coresAvailable >= CPUS_PER_TASK).sortBy(_.coresAvailable) + currentOfferIndex = 0 + sorted = offer.filter(_.coresAvailable >= cpuPerTask).sortBy(_.coresAvailable) } - override def hasNext: Boolean = idx < sorted.size + override def hasNext: Boolean = currentOfferIndex < sorted.size - override def getNext(): OfferState = { - currentOffer = sorted(idx) + override def next(): OfferState = { + currentOffer = sorted(currentOfferIndex) currentOffer } override def offerAccepted(assigned: Boolean): Unit = { - if (currentOffer.coresAvailable < CPUS_PER_TASK || !assigned) { - idx += 1 + if (currentOffer.coresAvailable < cpuPerTask || !assigned) { + currentOfferIndex += 1 } } - - override def reset(): Unit = { - super.reset - sorted = null - currentOffer = null - idx = 0 - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 80a9e67ad4986..289bfbbf7f5dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -58,7 +58,7 @@ private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) val conf = sc.conf - private val taskAssigner: TaskAssigner = TaskAssigner.init(conf) + private val taskAssigner: TaskAssigner = TaskAssigner.init(conf) // How often to check for speculative tasks val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") @@ -94,6 +94,9 @@ private[spark] class TaskSchedulerImpl( // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] + // For testing to verify the right TaskAssigner is picked up. + def getTaskAssigner(): TaskAssigner = taskAssigner + def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap // The set of executors we have on each host; this is used to compute hostsAlive, which @@ -251,9 +254,9 @@ private[spark] class TaskSchedulerImpl( taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false taskAssigner.init() - while(taskAssigner.hasNext) { + while (taskAssigner.hasNext) { var assigned = false - val currentOffer = taskAssigner.getNext() + val currentOffer = taskAssigner.next() val execId = currentOffer.workOffer.executorId val host = currentOffer.workOffer.host if (currentOffer.coresAvailable >= CPUS_PER_TASK) { @@ -335,7 +338,6 @@ private[spark] class TaskSchedulerImpl( } } val tasks = taskAssigner.tasks - taskAssigner.reset() if (tasks.size > 0) { hasLaunchedTask = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 824c3851fc380..8ff2a4a066432 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -86,6 +86,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } private def roundrobin(taskScheduler: TaskSchedulerImpl): Unit = { + assert(taskScheduler.getTaskAssigner().isInstanceOf[RoundRobinAssigner]) val numFreeCores = 1 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), new WorkerOffer("executor1", "host1", numFreeCores)) @@ -95,6 +96,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // probability of that happening is 2^-1000 (so sufficiently small to be considered // negligible). val numTrials = 1000 + Seq(1, 2, 3).toIterator.next() val selectedExecutorIds = 1.to(numTrials).map { _ => val taskSet = FakeTask.createTaskSet(1) taskScheduler.submitTasks(taskSet) @@ -113,18 +115,19 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B roundrobin(taskScheduler) } - test("User can specify the roundrobin task assigner") { + test("Roundrobin - User can specify the roundrobin task assigner") { val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) roundrobin(taskScheduler) } - test("Fallback to roundrobin when the task assigner provided is not valid") { + test("Roundrobin Fallback - fallbacks to default if the provided is invalid") { val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") roundrobin(taskScheduler) } - test("Scheduler balance the assignment to the worker with more free cores") { + test("Balanced - Assigner balances the tasks to the worker with more free cores") { val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "BaLanceD") + assert(taskScheduler.getTaskAssigner().isInstanceOf[BalancedAssigner]) val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), new WorkerOffer("executor1", "host1", 4)) val selectedExecutorIds = { @@ -139,8 +142,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - test("Scheduler balance the assignment across workers with same free cores") { + test("Balanced - Assigner balances the tasks across workers with same free cores") { val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "balanced") + assert(taskScheduler.getTaskAssigner().isInstanceOf[BalancedAssigner]) val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), new WorkerOffer("executor1", "host1", 2)) val selectedExecutorIds = { @@ -155,8 +159,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - test("Scheduler packs the assignment to workers with less free cores") { + test("Packed - Assigner packs the tasks to workers with less free cores") { val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "paCkeD") + assert(taskScheduler.getTaskAssigner().isInstanceOf[PackedAssigner]) val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), new WorkerOffer("executor1", "host1", 4)) val selectedExecutorIds = { @@ -171,8 +176,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - test("Scheduler keeps packing the assignment to the same worker") { + test("Packed - Assigner keeps packing the assignment to the same worker") { val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "packed") + assert(taskScheduler.getTaskAssigner().isInstanceOf[PackedAssigner]) val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4), new WorkerOffer("executor1", "host1", 4)) val selectedExecutorIds = { diff --git a/docs/configuration.md b/docs/configuration.md index 798d8191d4f21..976af3f4852d7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1378,13 +1378,14 @@ Apart from these, the following properties are also available, and may be useful spark.scheduler.taskAssigner roundrobin - The strategy of how to allocate tasks among workers with free cores. By default, roundrobin + The strategy of how to allocate tasks among workers with free cores. There are three task + assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin with randomness is used, which tries to allocate task to workers with available cores in - roundrobin manner. In addition, packed and balanced is provided. The former tries to - allocate tasks to workers with the least free cores, resulting in tasks assigned to few - workers, which may help driver to release the reserved idle workers when dynamic - (spark.dynamicAllocation.enabled) is enabled. The latter tries to assign tasks across workers - in a balance way (allocating tasks to workers with most free cores). + roundrobin manner.The packed task assigner tries to allocate tasks to workers with the least + free cores, resulting in tasks assigned to few workers, which may help driver to release the + reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. + The balanced task assigner tries to assign tasks across workers in a balance way (allocating + tasks to workers with most free cores). From c1f2a9cb52b55a7a8f597f08379c308b7a62704a Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 20 Oct 2016 13:57:36 -0700 Subject: [PATCH 3/9] shuffle all initial offers --- .../org/apache/spark/scheduler/TaskAssigner.scala | 10 +--------- .../spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index c2c46e644d387..c7d21eff7a564 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -76,7 +76,7 @@ private[scheduler] abstract class TaskAssigner { /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = workOffer.map(o => new OfferState(o)) + offer = Random.shuffle(workOffer.map(o => new OfferState(o))) } /** Invoked at each round of Taskset assignment to initialize the internal structure. */ @@ -136,10 +136,6 @@ object TaskAssigner extends Logging { class RoundRobinAssigner extends TaskAssigner { private var currentOfferIndex = 0 - override def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = Random.shuffle(workOffer.map(o => new OfferState(o))) - } - override def init(): Unit = { currentOfferIndex = 0 } @@ -170,10 +166,6 @@ class BalancedAssigner extends TaskAssigner { private val maxHeap: PriorityQueue[OfferState] = new PriorityQueue[OfferState]() private var currentOffer: OfferState = _ - override def construct(workOffer: Seq[WorkerOffer]): Unit = { - offer = Random.shuffle(workOffer.map(o => new OfferState(o))) - } - override def init(): Unit = { maxHeap.clear() offer.filter(_.coresAvailable >= cpuPerTask).foreach(maxHeap.enqueue(_)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 8ff2a4a066432..eead70122d96a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -189,7 +189,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskDescriptions.map(_.executorId) } val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) - assert(count == 4) + assert(count == 4 || count == 0) assert(!failedTaskSet) } From bbd39e796084e48f9e400418c3c7980eb8e8f2b2 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Sat, 22 Oct 2016 13:07:41 -0700 Subject: [PATCH 4/9] solve review comments --- .../org/apache/spark/scheduler/TaskAssigner.scala | 14 +++++++------- docs/configuration.md | 2 +- .../spark/sql/DataFrameWindowFunctionsSuite.scala | 5 ++++- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index c7d21eff7a564..84596b24c2403 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkConf import org.apache.spark.util.Utils /** Tracks the current state of the workers with available cores and assigned task list. */ -class OfferState(val workOffer: WorkerOffer) { +private[scheduler] class OfferState(val workOffer: WorkerOffer) { /** The current remaining cores that can be allocated to tasks. */ var coresAvailable: Int = workOffer.cores /** The list of tasks that are assigned to this WorkerOffer. */ @@ -38,7 +38,7 @@ class OfferState(val workOffer: WorkerOffer) { * extended to implement different task scheduling algorithms. * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner * is used to assign tasks to workers with available cores. Internally, when TaskScheduler - * perform task assignment given available workers, it first sorts the candidate tasksets, + * performs task assignment given available workers, it first sorts the candidate tasksets, * and then for each taskset, it takes a number of rounds to request TaskAssigner for task * assignment with different locality restrictions until there is either no qualified * workers or no valid tasks to be assigned. @@ -50,7 +50,7 @@ class OfferState(val workOffer: WorkerOffer) { * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal * worker states at the beginning of resource offering. * - * Second, before each round of task assignment for a taskset, TaskScheduler invoke the init() + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() * of TaskAssigner to initialize the data structure for the round. * * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler @@ -83,9 +83,9 @@ private[scheduler] abstract class TaskAssigner { def init(): Unit /** - * Tests Whether there is offer available to be used inside of one round of Taskset assignment. - * @return `true` if a subsequent call to `next` will yield an element, - * `false` otherwise. + * Tests whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. */ def hasNext: Boolean @@ -152,7 +152,7 @@ class RoundRobinAssigner extends TaskAssigner { } /** - * Assign the task to workers with the most available cores. It other words, BalancedAssigner tries + * Assign the task to workers with the most available cores. In other words, BalancedAssigner tries * to distribute the task across workers in a balanced way. Potentially, it may alleviate the * workers' memory pressure as less tasks running on the same workers, which also indicates that * the task itself can make use of more computation resources, e.g., hyper-thread, across clusters. diff --git a/docs/configuration.md b/docs/configuration.md index 976af3f4852d7..59690c1b735da 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1381,7 +1381,7 @@ Apart from these, the following properties are also available, and may be useful The strategy of how to allocate tasks among workers with free cores. There are three task assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin with randomness is used, which tries to allocate task to workers with available cores in - roundrobin manner.The packed task assigner tries to allocate tasks to workers with the least + roundrobin manner. The packed task assigner tries to allocate tasks to workers with the least free cores, resulting in tasks assigned to few workers, which may help driver to release the reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. The balanced task assigner tries to assign tasks across workers in a balance way (allocating diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 1255c49104718..66755291dc170 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -31,7 +31,10 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { test("reuse window partitionBy") { val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") val w = Window.partitionBy("key").orderBy("value") - + val df1 = df.select( + lead("key", 1).over(w), + lead("value", 1).over(w)) + df1.explain(true) checkAnswer( df.select( lead("key", 1).over(w), From 4ebef6cb6c0c372b03d08f40cb5f6988d33238f3 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Sun, 23 Oct 2016 10:41:16 -0700 Subject: [PATCH 5/9] resolve review comments --- .../apache/spark/scheduler/TaskAssigner.scala | 59 +++++++++++-------- .../spark/scheduler/TaskSchedulerImpl.scala | 10 ++-- docs/configuration.md | 12 ++-- .../sql/DataFrameWindowFunctionsSuite.scala | 5 +- 4 files changed, 46 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index 84596b24c2403..c5cfe5a1b3f89 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -31,6 +31,12 @@ private[scheduler] class OfferState(val workOffer: WorkerOffer) { var coresAvailable: Int = workOffer.cores /** The list of tasks that are assigned to this WorkerOffer. */ val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { + tasks += task + coresAvailable -= cpu + assert(coresAvailable >= 0) + } } /** @@ -39,7 +45,7 @@ private[scheduler] class OfferState(val workOffer: WorkerOffer) { * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner * is used to assign tasks to workers with available cores. Internally, when TaskScheduler * performs task assignment given available workers, it first sorts the candidate tasksets, - * and then for each taskset, it takes a number of rounds to request TaskAssigner for task + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task * assignment with different locality restrictions until there is either no qualified * workers or no valid tasks to be assigned. * @@ -53,28 +59,33 @@ private[scheduler] class OfferState(val workOffer: WorkerOffer) { * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() * of TaskAssigner to initialize the data structure for the round. * - * Third, when performing real task assignment, hasNext()/getNext() is used by TaskScheduler + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler * to check the worker availability and retrieve current offering from TaskAssigner. * - * Fourth, then offerAccepted is used by TaskScheduler to notify the TaskAssigner so that + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that * TaskAssigner can decide whether the current offer is valid or not for the next request. * - * Fifth, After task assignment is done, TaskScheduler invokes the tasks() to + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to * retrieve all the task assignment information. */ -private[scheduler] abstract class TaskAssigner { +private[scheduler] sealed abstract class TaskAssigner { protected var offer: Seq[OfferState] = _ protected var cpuPerTask = 1 - protected def withCpuPerTask(cpuPerTask: Int): Unit = { + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { this.cpuPerTask = cpuPerTask + this } - /** The final assigned offer returned to TaskScheduler. */ + /** The currently assigned offers. */ final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) - /** Invoked at the beginning of resource offering to construct the offer with the workoffers. */ + /** + * Invoked at the beginning of resource offering to construct the offer with the workoffers. + * By default, offers is randomly shuffled to avoid always placing tasks on the same set of + * workers. + */ def construct(workOffer: Seq[WorkerOffer]): Unit = { offer = Random.shuffle(workOffer.map(o => new OfferState(o))) } @@ -99,8 +110,10 @@ private[scheduler] abstract class TaskAssigner { /** * Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that * the assigner can decide whether the current worker is valid for the next offering. + * + * @param isAccepted whether TaskScheduler assigns a task to current offer. */ - def offerAccepted(assigned: Boolean): Unit + def offerAccepted(isAccepted: Boolean): Unit } object TaskAssigner extends Logging { @@ -123,15 +136,13 @@ object TaskAssigner extends Logging { } // The className is valid. No need to catch exceptions. logInfo(s"Constructing TaskAssigner as $className") - val assigner = Utils.classForName(className) - .getConstructor().newInstance().asInstanceOf[TaskAssigner] - assigner.withCpuPerTask(cpuPerTask = conf.getInt("spark.task.cpus", 1)) - assigner + Utils.classForName(className).getConstructor().newInstance().asInstanceOf[TaskAssigner] + .withCpuPerTask(cpuPerTask = conf.getInt("spark.task.cpus", 1)) } } /** - * Assign the task to workers with available cores in roundrobin manner. + * Assigns the task to workers with available cores in roundrobin manner. */ class RoundRobinAssigner extends TaskAssigner { private var currentOfferIndex = 0 @@ -146,13 +157,13 @@ class RoundRobinAssigner extends TaskAssigner { offer(currentOfferIndex) } - override def offerAccepted(assigned: Boolean): Unit = { + override def offerAccepted(isAccepted: Boolean): Unit = { currentOfferIndex += 1 } } /** - * Assign the task to workers with the most available cores. In other words, BalancedAssigner tries + * Assigns the task to workers with the most available cores. In other words, BalancedAssigner tries * to distribute the task across workers in a balanced way. Potentially, it may alleviate the * workers' memory pressure as less tasks running on the same workers, which also indicates that * the task itself can make use of more computation resources, e.g., hyper-thread, across clusters. @@ -178,8 +189,8 @@ class BalancedAssigner extends TaskAssigner { currentOffer } - override def offerAccepted(assigned: Boolean): Unit = { - if (currentOffer.coresAvailable >= cpuPerTask && assigned) { + override def offerAccepted(isAccepted: Boolean): Unit = { + if (currentOffer.coresAvailable >= cpuPerTask && isAccepted) { maxHeap.enqueue(currentOffer) } } @@ -194,24 +205,24 @@ class BalancedAssigner extends TaskAssigner { * reservation for jobs, which over allocate resources than they need. */ class PackedAssigner extends TaskAssigner { - private var sorted: Seq[OfferState] = _ + private var sortedOffer: Seq[OfferState] = _ private var currentOfferIndex = 0 private var currentOffer: OfferState = _ override def init(): Unit = { currentOfferIndex = 0 - sorted = offer.filter(_.coresAvailable >= cpuPerTask).sortBy(_.coresAvailable) + sortedOffer = offer.filter(_.coresAvailable >= cpuPerTask).sortBy(_.coresAvailable) } - override def hasNext: Boolean = currentOfferIndex < sorted.size + override def hasNext: Boolean = currentOfferIndex < sortedOffer.size override def next(): OfferState = { - currentOffer = sorted(currentOfferIndex) + currentOffer = sortedOffer(currentOfferIndex) currentOffer } - override def offerAccepted(assigned: Boolean): Unit = { - if (currentOffer.coresAvailable < cpuPerTask || !assigned) { + override def offerAccepted(isAccepted: Boolean): Unit = { + if (currentOffer.coresAvailable < cpuPerTask || !isAccepted) { currentOfferIndex += 1 } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 289bfbbf7f5dc..3d4d2786ab834 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -255,22 +255,20 @@ private[spark] class TaskSchedulerImpl( var launchedTask = false taskAssigner.init() while (taskAssigner.hasNext) { - var assigned = false + var isAccepted = false val currentOffer = taskAssigner.next() val execId = currentOffer.workOffer.executorId val host = currentOffer.workOffer.host if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - currentOffer.tasks += task + currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 - currentOffer.coresAvailable -= CPUS_PER_TASK - assert(currentOffer.coresAvailable >= 0) launchedTask = true - assigned = true + isAccepted = true } } catch { case e: TaskNotSerializableException => @@ -280,7 +278,7 @@ private[spark] class TaskSchedulerImpl( return launchedTask } } - taskAssigner.offerAccepted(assigned) + taskAssigner.offerAccepted(isAccepted) } return launchedTask } diff --git a/docs/configuration.md b/docs/configuration.md index 59690c1b735da..321f7c0e0a38d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1378,14 +1378,14 @@ Apart from these, the following properties are also available, and may be useful spark.scheduler.taskAssigner roundrobin - The strategy of how to allocate tasks among workers with free cores. There are three task + The strategy of how to allocate tasks among workers with free cores. Three task assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin - with randomness is used, which tries to allocate task to workers with available cores in - roundrobin manner. The packed task assigner tries to allocate tasks to workers with the least - free cores, resulting in tasks assigned to few workers, which may help driver to release the + with randomness is used to allocate task to workers with available cores in a + roundrobin manner. The packed task assigner is used to allocate tasks to workers with the least + free cores, resulting in tasks assigned to fewer workers, which may help driver to release the reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. - The balanced task assigner tries to assign tasks across workers in a balance way (allocating - tasks to workers with most free cores). + The balanced task assigner is used to assign tasks across workers in a balanced way (allocating + tasks to workers with the most free cores). diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 66755291dc170..1255c49104718 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -31,10 +31,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { test("reuse window partitionBy") { val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") val w = Window.partitionBy("key").orderBy("value") - val df1 = df.select( - lead("key", 1).over(w), - lead("value", 1).over(w)) - df1.explain(true) + checkAnswer( df.select( lead("key", 1).over(w), From a42325d90e85f1d972f5c84ba1a8f8ba73e9b16d Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Sun, 23 Oct 2016 21:13:59 -0700 Subject: [PATCH 6/9] solve review comments --- .../org/apache/spark/scheduler/TaskAssigner.scala | 11 +++++++---- docs/configuration.md | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index c5cfe5a1b3f89..cb20012933a6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.PriorityQueue import scala.util.Random +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.{config, Logging} -import org.apache.spark.SparkConf import org.apache.spark.util.Utils /** Tracks the current state of the workers with available cores and assigned task list. */ @@ -33,9 +33,12 @@ private[scheduler] class OfferState(val workOffer: WorkerOffer) { val tasks = new ArrayBuffer[TaskDescription](coresAvailable) def assignTask(task: TaskDescription, cpu: Int): Unit = { + if (coresAvailable < cpu) { + throw new SparkException(s"Available cores are less than cpu per task" + + s" ($coresAvailable < $cpu)") + } tasks += task coresAvailable -= cpu - assert(coresAvailable >= 0) } } @@ -142,7 +145,7 @@ object TaskAssigner extends Logging { } /** - * Assigns the task to workers with available cores in roundrobin manner. + * Assigns the task to workers with available cores in a roundrobin manner. */ class RoundRobinAssigner extends TaskAssigner { private var currentOfferIndex = 0 @@ -197,7 +200,7 @@ class BalancedAssigner extends TaskAssigner { } /** - * Assign the task to workers with the least available cores. In other words, PackedAssigner tries + * Assigns the task to workers with the least available cores. In other words, PackedAssigner tries * to schedule tasks to fewer workers. As a result, there will be idle workers without any tasks * assigned if more than required workers are reserved. If the dynamic allocator is enabled, * these idle workers will be released by driver. The released resources can then be allocated to diff --git a/docs/configuration.md b/docs/configuration.md index 321f7c0e0a38d..8dbe5d923d38c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1380,7 +1380,7 @@ Apart from these, the following properties are also available, and may be useful The strategy of how to allocate tasks among workers with free cores. Three task assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin - with randomness is used to allocate task to workers with available cores in a + with randomness is used to allocate tasks to workers with available cores in a roundrobin manner. The packed task assigner is used to allocate tasks to workers with the least free cores, resulting in tasks assigned to fewer workers, which may help driver to release the reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. From ce6f06aaa9ec90f118a359cb02b669a16b33abb2 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 1 Nov 2016 11:26:20 -0700 Subject: [PATCH 7/9] solve review comments --- .../org/apache/spark/scheduler/TaskAssigner.scala | 5 +++-- .../spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- docs/configuration.md | 15 ++++++++------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index cb20012933a6d..ae261a84b847a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -133,8 +133,9 @@ object TaskAssigner extends Logging { val className = { val name = assignerMap.get(assignerName.toLowerCase()) name.getOrElse { - logWarning(s"$assignerName cannot be constructed, fallback to default $roundrobin.") - roundrobin + throw new SparkException(s"Task Assigner $assignerName is not available. " + + s"Please choose roundrobin, packed, or balanced. roundrobin is used by default") + } } // The className is valid. No need to catch exceptions. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3d4d2786ab834..6ec37f831ff9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -285,8 +285,8 @@ private[spark] class TaskSchedulerImpl( /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task - * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so - * that tasks are balanced across the cluster. + * sets for tasks in order of priority. We fill each node with tasks in a roundrobin, packed or + * balanced way based on the configured TaskAssigner */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname diff --git a/docs/configuration.md b/docs/configuration.md index 8dbe5d923d38c..04f0ae007bc4c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1379,13 +1379,14 @@ Apart from these, the following properties are also available, and may be useful roundrobin The strategy of how to allocate tasks among workers with free cores. Three task - assigners (roundrobin, packed, and balanced) are supported currently. By default, roundrobin - with randomness is used to allocate tasks to workers with available cores in a - roundrobin manner. The packed task assigner is used to allocate tasks to workers with the least - free cores, resulting in tasks assigned to fewer workers, which may help driver to release the - reserved idle workers when dynamic allocation(spark.dynamicAllocation.enabled) is enabled. - The balanced task assigner is used to assign tasks across workers in a balanced way (allocating - tasks to workers with the most free cores). + assigners (roundrobin, packed, and balanced) are supported currently. By default, the + "roundrobin" task assigner is used to allocate tasks to workers with available cores in a + roundrobin manner with randomness. The "packed" task assigner is used to allocate tasks to + workers with the least free cores, resulting in tasks assigned to fewer workers, which may + help driver to release the reserved idle workers when dynamic + allocation(spark.dynamicAllocation.enabled) is enabled. The "balanced" task assigner is used + to assign tasks across workers in a balanced way (allocating tasks to workers with the most + free cores). From bfd4173b0b52389f3dfde9d7fb5fe6d8c110a840 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 1 Nov 2016 11:35:58 -0700 Subject: [PATCH 8/9] solve review comments --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index eead70122d96a..22bd9fa2603d4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -120,9 +120,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B roundrobin(taskScheduler) } - test("Roundrobin Fallback - fallbacks to default if the provided is invalid") { - val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "invalid") - roundrobin(taskScheduler) + test("Invalid - SparkException is thrown") { + intercept[SparkException] { setupScheduler("spark.scheduler.taskAssigner" -> "invalid")} } test("Balanced - Assigner balances the tasks to the worker with more free cores") { From ada2a45b43b9722bae99cf578b544195e69a1625 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 1 Nov 2016 13:23:01 -0700 Subject: [PATCH 9/9] solve review comments --- .../main/scala/org/apache/spark/scheduler/TaskAssigner.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala index ae261a84b847a..61d4d31b6b7ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -133,8 +133,8 @@ object TaskAssigner extends Logging { val className = { val name = assignerMap.get(assignerName.toLowerCase()) name.getOrElse { - throw new SparkException(s"Task Assigner $assignerName is not available. " + - s"Please choose roundrobin, packed, or balanced. roundrobin is used by default") + throw new SparkException(s"Task Assigner $assignerName is invalid. Available assigners " + + s"are roundrobin, packed, and balanced. roundrobin is the default") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6ec37f831ff9e..44b981a3e7de9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -286,7 +286,7 @@ private[spark] class TaskSchedulerImpl( /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a roundrobin, packed or - * balanced way based on the configured TaskAssigner + * balanced way based on the configured TaskAssigner. */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 22bd9fa2603d4..7ea07630c92e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -120,8 +120,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B roundrobin(taskScheduler) } - test("Invalid - SparkException is thrown") { - intercept[SparkException] { setupScheduler("spark.scheduler.taskAssigner" -> "invalid")} + test("Invalid - Throws SparkException") { + intercept[SparkException] { setupScheduler("spark.scheduler.taskAssigner" -> "invalid") } } test("Balanced - Assigner balances the tasks to the worker with more free cores") {