From 824396c82977171c38ab5d7f6c0f84bc19eccaba Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 15 Aug 2017 09:18:21 -0500 Subject: [PATCH 1/5] [SPARK-20589] Allow limiting task concurrency per stage --- .../spark/ExecutorAllocationManager.scala | 21 ++- .../spark/scheduler/TaskSetManager.scala | 141 +++++++++++------- .../ExecutorAllocationManagerSuite.scala | 119 +++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 95 ++++++++++++ docs/configuration.md | 10 ++ 5 files changed, 327 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index fcc72ff49276d..d78b08f92fa1e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -92,6 +92,7 @@ private[spark] class ExecutorAllocationManager( private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) + private var maxConcurrentTasks = Int.MaxValue // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( @@ -262,7 +263,8 @@ private[spark] class ExecutorAllocationManager( * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { - val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks + val numRunningOrPendingTasks = Math.min(listener.totalPendingTasks + listener.totalRunningTasks, + maxConcurrentTasks) (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } @@ -313,7 +315,7 @@ private[spark] class ExecutorAllocationManager( // Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 - } else if (maxNeeded < numExecutorsTarget) { + } else if (maxNeeded <= numExecutorsTarget) { // The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requests val oldNumExecutorsTarget = numExecutorsTarget @@ -592,6 +594,21 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroupId = if (jobStart.properties != null) { + jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { + "" + } + val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", + Int.MaxValue) + + logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") + allocationManager.synchronized { + allocationManager.maxConcurrentTasks = maxConcurrentTasks + } + } + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { initializing = false val stageId = stageSubmitted.stageInfo.stageId diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 589fe672ade71..6a1ccd85c884c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -71,6 +71,29 @@ private[spark] class TaskSetManager( val tasks = taskSet.tasks val numTasks = tasks.length + + // The max no. of concurrent tasks that can run for a particular job group. + val maxConcurrentTasks = { + // This is set to null while running the unit tests + if (taskSet.properties != null) { + val jobGroupId = taskSet.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + if (jobGroupId != null && !jobGroupId.isEmpty) { + val maxTasks = conf.getInt(s"spark.job.${jobGroupId}.maxConcurrentTasks", Int.MaxValue) + if (maxTasks < 1) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress." + ) + } else { + maxTasks + } + } else { + Int.MaxValue + } + } else { + Int.MaxValue + } + } + val copiesRunning = new Array[Int](numTasks) // For each task, tracks whether a copy of the task has succeeded. A task will also be @@ -437,7 +460,7 @@ private[spark] class TaskSetManager( blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } - if (!isZombie && !offerBlacklisted) { + if (!isZombie && !offerBlacklisted && runningTasks < maxConcurrentTasks) { val curTime = clock.getTimeMillis() var allowedLocality = maxLocality @@ -450,64 +473,68 @@ private[spark] class TaskSetManager( } } - dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => - // Found a task; do some bookkeeping and return a task description - val task = tasks(index) - val taskId = sched.newTaskId() - // Do various bookkeeping - copiesRunning(index) += 1 - val attemptNum = taskAttempts(index).size - val info = new TaskInfo(taskId, index, attemptNum, curTime, - execId, host, taskLocality, speculative) - taskInfos(taskId) = info - taskAttempts(index) = info :: taskAttempts(index) - // Update our locality level for delay scheduling - // NO_PREF will not affect the variables related to delay scheduling - if (maxLocality != TaskLocality.NO_PREF) { - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime - } - // Serialize and return the task - val serializedTask: ByteBuffer = try { - ser.serialize(task) - } catch { - // If the task cannot be serialized, then there's no point to re-attempt the task, - // as it will always fail. So just abort the whole task-set. - case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." - logError(msg, e) - abort(s"$msg Exception during serialization: $e") - throw new TaskNotSerializableException(e) - } - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { - emittedTaskSizeWarning = true - logWarning(s"Stage ${task.stageId} contains a task of very large size " + - s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + - s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") - } - addRunningTask(taskId) - - // We used to log the time it takes to serialize the task, but task size is already - // a good proxy to task serialization time. - // val timeTaken = clock.getTime() - startTime - val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") - - sched.dagScheduler.taskStarted(task, info) - new TaskDescription( - taskId, - attemptNum, - execId, - taskName, - index, - sched.sc.addedFiles, - sched.sc.addedJars, - task.localProperties, - serializedTask) + dequeueTask(execId, host, allowedLocality).map { + case ((index, taskLocality, speculative)) => + // Found a task; do some bookkeeping and return a task description + val task = tasks(index) + val taskId = sched.newTaskId() + // Do various bookkeeping + copiesRunning(index) += 1 + val attemptNum = taskAttempts(index).size + val info = new TaskInfo(taskId, index, attemptNum, curTime, + execId, host, taskLocality, speculative) + taskInfos(taskId) = info + taskAttempts(index) = info :: taskAttempts(index) + // Update our locality level for delay scheduling + // NO_PREF will not affect the variables related to delay scheduling + if (maxLocality != TaskLocality.NO_PREF) { + currentLocalityIndex = getLocalityIndex(taskLocality) + lastLaunchTime = curTime + } + // Serialize and return the task + val serializedTask: ByteBuffer = try { + ser.serialize(task) + } catch { + // If the task cannot be serialized, then there's no point to re-attempt the task, + // as it will always fail. So just abort the whole task-set. + case NonFatal(e) => + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + abort(s"$msg Exception during serialization: $e") + throw new TaskNotSerializableException(e) + } + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && + !emittedTaskSizeWarning) { + emittedTaskSizeWarning = true + logWarning(s"Stage ${task.stageId} contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } + addRunningTask(taskId) + + // We used to log the time it takes to serialize the task, but task size is already + // a good proxy to task serialization time. + // val timeTaken = clock.getTime() - startTime + val taskName = s"task ${info.id} in stage ${taskSet.id}" + logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") + + sched.dagScheduler.taskStarted(task, info) + new TaskDescription( + taskId, + attemptNum, + execId, + taskName, + index, + sched.sc.addedFiles, + sched.sc.addedJars, + task.localProperties, + serializedTask) } } else { + if (runningTasks >= maxConcurrentTasks) { + logDebug("Already running max. no. of concurrent tasks.") + } None } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 4ea42fc7d5c22..9d79296116e6b 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + val sc = new SparkContext(conf) + contexts += sc + sc.setJobGroup("group1", "", false) + + val manager = sc.executorAllocationManager.get + val stage0 = createStageInfo(0, 10) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + + // Verify that we're capped at number of max concurrent tasks in the stage + assert(maxNumExecutorsNeeded(manager) === 2) + + // Submit another stage in the same job + val stage1 = createStageInfo(1, 10) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + + // Submit a new job in the same job group + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + + // Set another jobGroup + sc.setJobGroup("group2", "", false) + + val stage3 = createStageInfo(3, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 5) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) + sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + + // Clear jobGroup + sc.clearJobGroup() + + val stage4 = createStageInfo(4, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") + val sc = new SparkContext(conf) + contexts += sc + sc.setJobGroup("group1", "", false) + + val manager = sc.executorAllocationManager.get + val stage0 = createStageInfo(0, 10) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + + // Verify that we're capped at number of max concurrent tasks in the stage + assert(maxNumExecutorsNeeded(manager) === 1) + + // Submit another stage in the same job + val stage1 = createStageInfo(1, 10) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + assert(maxNumExecutorsNeeded(manager) === 1) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + + // Submit a new job in the same job group + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 1) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + + // Set another jobGroup + sc.setJobGroup("group2", "", false) + + val stage3 = createStageInfo(3, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) + sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + + // Clear jobGroup + sc.clearJobGroup() + + val stage4 = createStageInfo(4, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 17) + } + test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index e46900e4e5049..8e5f68755a49e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1214,6 +1214,101 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt()) } + test("limit max concurrent running tasks in a job group when configured ") { + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val props = new Properties(); + props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + + val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) + } + val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + + // make some offers to our taskset + var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + + // make 4 more offers + val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + + // inform tsm that one task has completed + val directTaskResult = new DirectTaskResult[String](null, Seq()) { + override def value(resultSer: SerializerInstance): String = "" + } + tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + + // make 4 more offers after previous task completed + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + + val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) + } + val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + + // make 5 offers to our taskset + var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 3).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + // tsm accepts all offers as it has tasks to run & there is no constraint for maxConcurrentTasks + assert(taskDescs.size === 6) + + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + (0 until 1).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // tsm accepts more offers as it has tasks to run + + val directTaskResult = new DirectTaskResult[String](null, Seq()) { + override def value(resultSer: SerializerInstance): String = "" + } + tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // tsm accepts 2 offers as it had only 2 tasks to run. + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { diff --git a/docs/configuration.md b/docs/configuration.md index 500f980455b0e..98ca8cfa394b4 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1435,6 +1435,16 @@ Apart from these, the following properties are also available, and may be useful driver using more memory. + + spark.job.[userJobGroup].maxConcurrentTasks + Int.MAX_VALUE + + Maximum number of tasks to run concurrently in a given job group. This is especially useful + to avoid DoS while accessing an external service for ex: a database from multiple executors + simultaneously. You can set the job group by calling the setJobGroup method on + sc. + + spark.blacklist.enabled From 824621286ffb107010409c4d0d3442550628247d Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 21 Aug 2017 11:51:41 -0500 Subject: [PATCH 2/5] Allow limiting task concurrency per stage in concurrent job groups --- .../spark/ExecutorAllocationManager.scala | 113 ++++++++++- .../spark/scheduler/TaskSetManager.scala | 113 ++++++----- .../ExecutorAllocationManagerSuite.scala | 175 +++++++++++++----- .../spark/scheduler/TaskSetManagerSuite.scala | 8 +- 4 files changed, 292 insertions(+), 117 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 79cc9483004f6..1f1ee08c5322a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -92,7 +92,7 @@ private[spark] class ExecutorAllocationManager( private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) - private var maxConcurrentTasks = Int.MaxValue + private var maxConcurrentTasks = 0 // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( @@ -316,7 +316,7 @@ private[spark] class ExecutorAllocationManager( // Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 - } else if (maxNeeded <= numExecutorsTarget) { + } else if (maxNeeded < numExecutorsTarget) { // The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requests val oldNumExecutorsTarget = numExecutorsTarget @@ -597,6 +597,10 @@ private[spark] class ExecutorAllocationManager( private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] // Number of tasks currently running on the cluster. Should be 0 when no stages are active. private var numRunningTasks: Int = _ + private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int] + private val jobIdToJobGroup = new mutable.HashMap[Int, String] + private val stageIdToJobId = new mutable.HashMap[Int, Int] + private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int] // stageId to tuple (the number of task with locality preferences, a map where each pair is a // node and the number of tasks that would like to be scheduled on that node) map, @@ -605,17 +609,44 @@ private[spark] class ExecutorAllocationManager( private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - val jobGroupId = if (jobStart.properties != null) { + // limit the concurrent tasks if the job belongs to a jobGroup & a config is specified. + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) } else { - "" + null } - val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", - Int.MaxValue) - logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") - allocationManager.synchronized { - allocationManager.maxConcurrentTasks = maxConcurrentTasks + val maxConTasks = if (jobGroupId != null && + conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { + Int.MaxValue + } + + if (maxConTasks <= 0) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + jobGroupId = "default-group-" + jobStart.jobId.hashCode + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { + jobGroupToMaxConTasks(jobGroupId) = maxConTasks + } + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val jobGroupId = jobIdToJobGroup(jobEnd.jobId) + + jobIdToJobGroup -= jobEnd.jobId + // Remove jobGroup mapping if this is the only remaining job in the group. + if (!jobIdToJobGroup.values.exists(_ == jobGroupId)) { + jobGroupToMaxConTasks -= jobGroupId } } @@ -644,7 +675,9 @@ private[spark] class ExecutorAllocationManager( // Update the executor placement hints updateExecutorPlacementHints() + maxConcurrentTasks = getMaxConTasks } + logInfo(s"Setting max concurrent tasks to $maxConcurrentTasks on stage submit.") } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { @@ -653,7 +686,7 @@ private[spark] class ExecutorAllocationManager( stageIdToNumTasks -= stageId stageIdToTaskIndices -= stageId stageIdToExecutorPlacementHints -= stageId - + stageIdToJobId -= stageId // Update the executor placement hints updateExecutorPlacementHints() @@ -666,7 +699,9 @@ private[spark] class ExecutorAllocationManager( numRunningTasks = 0 } } + maxConcurrentTasks = getMaxConTasks } + logInfo(s"Setting max concurrent tasks to $maxConcurrentTasks on stage complete.") } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { @@ -720,6 +755,8 @@ private[spark] class ExecutorAllocationManager( allocationManager.onSchedulerBacklogged() } stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) } + } else { + stageIdToCompleteTaskCount(stageId) = stageIdToCompleteTaskCount.getOrElse(stageId, 0) + 1 } } } @@ -740,6 +777,62 @@ private[spark] class ExecutorAllocationManager( allocationManager.onExecutorRemoved(executorRemoved.executorId) } + /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ + def getMaxConTasks(): Int = { + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) + + def getMaxConTasks(maxConTasks: Int, + stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = { + if (stagesByJobGroupItr.hasNext) { + val (jobGroupId, stages) = stagesByJobGroupItr.next + // Get the total running and pending tasks for a job group. + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(0, stages.iterator) + val maxTasks = Math.min(jobGroupToMaxConTasks(jobGroupId), + totalIncompleteTasksForJobGroup) + if (doesSumOverflow(maxConTasks, maxTasks)) { + Int.MaxValue + } else { + getMaxConTasks(maxConTasks + maxTasks, stagesByJobGroupItr) + } + } else { + maxConTasks + } + } + + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(totalTasks: Int, stagesItr: Iterator[(Int, Int)]): Int = { + if (stagesItr.hasNext) { + val (stageId, numTasks) = stagesItr.next + val activeTasks = getIncompleteTasksForStage(stageId, numTasks) + if (doesSumOverflow(totalTasks, activeTasks)) { + Int.MaxValue + } else { + getIncompleteTasksForJobGroup(totalTasks + activeTasks, stagesItr) + } + } else { + totalTasks + } + } + + // Get the total running & pending tasks for a single stage. + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { + var pendingTasks = numTasks + if (stageIdToTaskIndices.contains(stageId)) { + pendingTasks -= stageIdToTaskIndices(stageId).size + } + var runningTasks = 0 + if (stageIdToCompleteTaskCount.contains(stageId)) { + runningTasks = stageIdToTaskIndices(stageId).size - stageIdToCompleteTaskCount(stageId) + } + pendingTasks + runningTasks + } + getMaxConTasks(0, stagesByJobGroup.iterator) + } + + private def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + /** * An estimate of the total number of pending tasks remaining for currently running stages. Does * not account for tasks which may have failed and been resubmitted. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d442bbf2452d6..bfc1b2d1b140b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -477,63 +477,62 @@ private[spark] class TaskSetManager( } } - dequeueTask(execId, host, allowedLocality).map { - case ((index, taskLocality, speculative)) => - // Found a task; do some bookkeeping and return a task description - val task = tasks(index) - val taskId = sched.newTaskId() - // Do various bookkeeping - copiesRunning(index) += 1 - val attemptNum = taskAttempts(index).size - val info = new TaskInfo(taskId, index, attemptNum, curTime, - execId, host, taskLocality, speculative) - taskInfos(taskId) = info - taskAttempts(index) = info :: taskAttempts(index) - // Update our locality level for delay scheduling - // NO_PREF will not affect the variables related to delay scheduling - if (maxLocality != TaskLocality.NO_PREF) { - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime - } - // Serialize and return the task - val serializedTask: ByteBuffer = try { - ser.serialize(task) - } catch { - // If the task cannot be serialized, then there's no point to re-attempt the task, - // as it will always fail. So just abort the whole task-set. - case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." - logError(msg, e) - abort(s"$msg Exception during serialization: $e") - throw new TaskNotSerializableException(e) - } - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { - emittedTaskSizeWarning = true - logWarning(s"Stage ${task.stageId} contains a task of very large size " + - s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + - s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") - } - addRunningTask(taskId) - - // We used to log the time it takes to serialize the task, but task size is already - // a good proxy to task serialization time. - // val timeTaken = clock.getTime() - startTime - val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") - - sched.dagScheduler.taskStarted(task, info) - new TaskDescription( - taskId, - attemptNum, - execId, - taskName, - index, - addedFiles, - addedJars, - task.localProperties, - serializedTask) + dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => + // Found a task; do some bookkeeping and return a task description + val task = tasks(index) + val taskId = sched.newTaskId() + // Do various bookkeeping + copiesRunning(index) += 1 + val attemptNum = taskAttempts(index).size + val info = new TaskInfo(taskId, index, attemptNum, curTime, + execId, host, taskLocality, speculative) + taskInfos(taskId) = info + taskAttempts(index) = info :: taskAttempts(index) + // Update our locality level for delay scheduling + // NO_PREF will not affect the variables related to delay scheduling + if (maxLocality != TaskLocality.NO_PREF) { + currentLocalityIndex = getLocalityIndex(taskLocality) + lastLaunchTime = curTime + } + // Serialize and return the task + val serializedTask: ByteBuffer = try { + ser.serialize(task) + } catch { + // If the task cannot be serialized, then there's no point to re-attempt the task, + // as it will always fail. So just abort the whole task-set. + case NonFatal(e) => + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + abort(s"$msg Exception during serialization: $e") + throw new TaskNotSerializableException(e) + } + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && + !emittedTaskSizeWarning) { + emittedTaskSizeWarning = true + logWarning(s"Stage ${task.stageId} contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } + addRunningTask(taskId) + + // We used to log the time it takes to serialize the task, but task size is already + // a good proxy to task serialization time. + // val timeTaken = clock.getTime() - startTime + val taskName = s"task ${info.id} in stage ${taskSet.id}" + logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") + + sched.dagScheduler.taskStarted(task, info) + new TaskDescription( + taskId, + attemptNum, + execId, + taskName, + index, + addedFiles, + addedJars, + task.localProperties, + serializedTask) } } else { if (runningTasks >= maxConcurrentTasks) { diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 9d79296116e6b..f4655d60063ab 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -95,7 +95,9 @@ class ExecutorAllocationManagerSuite test("add executors") { sc = createSparkContext(1, 10, 1) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + val stage0 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) // Keep adding until the limit is reached assert(numExecutorsTarget(manager) === 1) @@ -140,7 +142,9 @@ class ExecutorAllocationManagerSuite test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + val stages = Seq(createStageInfo(0, 5), createStageInfo(1, 3), createStageInfo(2, 3)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Verify that we're capped at number of tasks in the stage assert(numExecutorsTarget(manager) === 0) @@ -156,7 +160,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) sc.listenerBus.postToAll(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) @@ -172,7 +176,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that re-running a task doesn't blow things up - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(2))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) @@ -201,21 +205,20 @@ class ExecutorAllocationManagerSuite sc.setJobGroup("group1", "", false) val manager = sc.executorAllocationManager.get - val stage0 = createStageInfo(0, 10) + val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) // Submit the job and stage start/submit events - sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Verify that we're capped at number of max concurrent tasks in the stage assert(maxNumExecutorsNeeded(manager) === 2) // Submit another stage in the same job - val stage1 = createStageInfo(1, 10) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) assert(maxNumExecutorsNeeded(manager) === 2) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) // Submit a new job in the same job group @@ -261,26 +264,25 @@ class ExecutorAllocationManagerSuite sc.setJobGroup("group1", "", false) val manager = sc.executorAllocationManager.get - val stage0 = createStageInfo(0, 10) + val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) // Submit the job and stage start/submit events - sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Verify that we're capped at number of max concurrent tasks in the stage assert(maxNumExecutorsNeeded(manager) === 1) // Submit another stage in the same job - val stage1 = createStageInfo(1, 10) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) assert(maxNumExecutorsNeeded(manager) === 1) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) // Submit a new job in the same job group val stage2 = createStageInfo(2, 20) - sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) assert(maxNumExecutorsNeeded(manager) === 1) @@ -291,7 +293,7 @@ class ExecutorAllocationManagerSuite sc.setJobGroup("group2", "", false) val stage3 = createStageInfo(3, 20) - sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) assert(maxNumExecutorsNeeded(manager) === 2) @@ -302,15 +304,83 @@ class ExecutorAllocationManagerSuite sc.clearJobGroup() val stage4 = createStageInfo(4, 50) - sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) assert(maxNumExecutorsNeeded(manager) === 17) } + test("add executors capped by max concurrent tasks for concurrent job groups") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "5") + .set("spark.job.group2.maxConcurrentTasks", "11") + .set("spark.job.group3.maxConcurrentTasks", "17") + val sc = new SparkContext(conf) + contexts += sc + + val manager = sc.executorAllocationManager.get + + // Submit a job in group1 + sc.setJobGroup("group1", "", false) + val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the job group + assert(maxNumExecutorsNeeded(manager) === 2) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 5) + + // Submit a job in group 2 + sc.setJobGroup("group2", "", false) + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11 + + // Submit a job in group 3 + sc.setJobGroup("group3", "", false) + val stage3 = createStageInfo(3, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17 + + // Mark job in group 2 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11 + + // Mark job in group 1 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5 + + // Submit a job without any job group + sc.clearJobGroup() + val stage4 = createStageInfo(4, 333) + sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333 + + // Mark job without job group as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4)) + sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333 + } + test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + val stage0 = createStageInfo(2, 5) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -436,7 +506,9 @@ class ExecutorAllocationManagerSuite test ("interleaving add and remove") { sc = createSparkContext(5, 10, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + val stage0 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) // Add a few executors assert(addExecutors(manager) === 1) @@ -608,7 +680,9 @@ class ExecutorAllocationManagerSuite val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + val stage0 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -721,7 +795,10 @@ class ExecutorAllocationManagerSuite // Starting a stage should start the add timer val numTasks = 10 - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + val stages = Seq(createStageInfo(0, numTasks), createStageInfo(1, numTasks), + createStageInfo(2, numTasks)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) assert(addTime(manager) !== NOT_SET) // Starting a subset of the tasks should not cancel the add timer @@ -735,8 +812,8 @@ class ExecutorAllocationManagerSuite // Start two different stages // The add timer should be canceled only if all tasks in both stages start running - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(2))) assert(addTime(manager) !== NOT_SET) taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } assert(addTime(manager) !== NOT_SET) @@ -847,8 +924,9 @@ class ExecutorAllocationManagerSuite test("avoid ramp up when target < running executors") { sc = createSparkContext(0, 100000, 0) val manager = sc.executorAllocationManager.get - val stage1 = createStageInfo(0, 1000) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + val stages = Seq(createStageInfo(0, 1000), createStageInfo(1, 1000)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) @@ -859,12 +937,12 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, s"executor-$i") } assert(executorIds(manager).size === 15) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) adjustRequestedExecutors(manager) assert(numExecutorsTarget(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) addExecutors(manager) assert(numExecutorsTarget(manager) === 16) } @@ -881,7 +959,9 @@ class ExecutorAllocationManagerSuite // Verify whether the initial number of executors is kept with no pending tasks assert(numExecutorsTarget(manager) === 3) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + val stage0 = createStageInfo(1, 2) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) clock.advance(100L) assert(maxNumExecutorsNeeded(manager) === 2) @@ -930,26 +1010,29 @@ class ExecutorAllocationManagerSuite Seq.empty, Seq.empty ) - val stageInfo1 = createStageInfo(1, 5, localityPreferences1) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1)) - - assert(localityAwareTasks(manager) === 3) - assert(hostToLocalTaskCount(manager) === - Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2)) - val localityPreferences2 = Seq( Seq(TaskLocation("host2"), TaskLocation("host3"), TaskLocation("host5")), Seq(TaskLocation("host3"), TaskLocation("host4"), TaskLocation("host5")), Seq.empty ) - val stageInfo2 = createStageInfo(2, 3, localityPreferences2) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2)) + val stageInfos = Seq(createStageInfo(1, 5, localityPreferences1), + createStageInfo(2, 3, localityPreferences2)) + + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stageInfos, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfos(0))) + + assert(localityAwareTasks(manager) === 3) + assert(hostToLocalTaskCount(manager) === + Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2)) + + + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfos(1))) assert(localityAwareTasks(manager) === 5) assert(hostToLocalTaskCount(manager) === Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfos(0))) assert(localityAwareTasks(manager) === 2) assert(hostToLocalTaskCount(manager) === Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2)) @@ -960,7 +1043,9 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(maxNumExecutorsNeeded(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + val stage0 = createStageInfo(0, 1) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) assert(maxNumExecutorsNeeded(manager) === 1) val taskInfo = createTaskInfo(1, 1, "executor-1") @@ -981,7 +1066,9 @@ class ExecutorAllocationManagerSuite // Allocation manager is reset when adding executor requests are sent without reporting back // executor added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + val stage0 = createStageInfo(0, 10) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 2) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4727d56a4fbe7..d9f9b254bc1bd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1249,9 +1249,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks // inform tsm that one task has completed - val directTaskResult = new DirectTaskResult[String](null, Seq()) { - override def value(resultSer: SerializerInstance): String = "" - } + val directTaskResult = createTaskResult(0) tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) // make 4 more offers after previous task completed @@ -1295,9 +1293,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } assert(taskDescs.size === 2) // tsm accepts more offers as it has tasks to run - val directTaskResult = new DirectTaskResult[String](null, Seq()) { - override def value(resultSer: SerializerInstance): String = "" - } + val directTaskResult = createTaskResult(0) tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) taskDescs = Seq( From 517acb490ae5938a22c4175347f6bbc24b47781f Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 21 Aug 2017 14:30:17 -0500 Subject: [PATCH 3/5] Remove comment --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1f1ee08c5322a..ee59bb394af01 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -609,7 +609,6 @@ private[spark] class ExecutorAllocationManager( private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - // limit the concurrent tasks if the job belongs to a jobGroup & a config is specified. jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) var jobGroupId = if (jobStart.properties != null) { From 65941f7884551e84a13a6cc2e7488a01e7d8beec Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 21 Aug 2017 14:42:05 -0500 Subject: [PATCH 4/5] Fix comment style --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index ee59bb394af01..327ad8d90e639 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -777,8 +777,8 @@ private[spark] class ExecutorAllocationManager( } /** - * Calculate the maximum no. of concurrent tasks that can run currently. - */ + * Calculate the maximum no. of concurrent tasks that can run currently. + */ def getMaxConTasks(): Int = { val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) From 0e518f00ce97fd5d17fe89792c2503d2514b0473 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 22 Aug 2017 10:38:01 -0500 Subject: [PATCH 5/5] Fix new unit test and add comments --- .../org/apache/spark/ExecutorAllocationManager.scala | 10 ++++++++-- .../apache/spark/ExecutorAllocationManagerSuite.scala | 10 ++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 8e4c4e0a1d4f3..ee7ff2253d7a1 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -783,10 +783,16 @@ private[spark] class ExecutorAllocationManager( * Calculate the maximum no. of concurrent tasks that can run currently. */ def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group and multiple jobs can run with + // multiple stages. We need to get all the active stages belonging to a job group to calculate + // the total no. of pending + running tasks to decide the maximum no. of executors we need at + // that time to serve the outstanding tasks. This is capped by the minimum of no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) - def getMaxConTasks(maxConTasks: Int, - stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = { + def getMaxConTasks( + maxConTasks: Int, + stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = { if (stagesByJobGroupItr.hasNext) { val (jobGroupId, stages) = stagesByJobGroupItr.next // Get the total running and pending tasks for a job group. diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 4749a2e7a7250..da9c0f853b6b8 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -223,7 +223,7 @@ class ExecutorAllocationManagerSuite // Submit a new job in the same job group val stage2 = createStageInfo(2, 20) - sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) assert(maxNumExecutorsNeeded(manager) === 2) @@ -234,7 +234,7 @@ class ExecutorAllocationManagerSuite sc.setJobGroup("group2", "", false) val stage3 = createStageInfo(3, 20) - sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) assert(maxNumExecutorsNeeded(manager) === 5) @@ -245,7 +245,7 @@ class ExecutorAllocationManagerSuite sc.clearJobGroup() val stage4 = createStageInfo(4, 50) - sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) assert(maxNumExecutorsNeeded(manager) === 50) } @@ -507,7 +507,9 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8))) + val stages = Seq(createStageInfo(0, 8)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Remove when numExecutorsTarget is the same as the current number of executors assert(addExecutors(manager) === 1)