From 4281151df9010b4e9fe91e588c07e872b8e0dd69 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 11 Sep 2017 11:45:49 -0500 Subject: [PATCH 1/4] [SPARK-20589] Allow limiting task concurrency per stage --- .../spark/ExecutorAllocationManager.scala | 129 ++++++- .../spark/scheduler/TaskSetManager.scala | 28 +- .../ExecutorAllocationManagerSuite.scala | 354 ++++++++++++++++-- .../spark/scheduler/TaskSetManagerSuite.scala | 91 +++++ docs/configuration.md | 10 + 5 files changed, 573 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 7a5fb9a802354..317e6686ddabe 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -92,6 +92,7 @@ private[spark] class ExecutorAllocationManager( private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) + private var maxConcurrentTasks = 0 // How long there must be backlogged tasks for before an addition is triggered (seconds) private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( @@ -263,7 +264,8 @@ private[spark] class ExecutorAllocationManager( * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { - val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks + val numRunningOrPendingTasks = Math.min(listener.totalPendingTasks + listener.totalRunningTasks, + maxConcurrentTasks) (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } @@ -369,7 +371,6 @@ private[spark] class ExecutorAllocationManager( numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) val delta = numExecutorsTarget - oldNumExecutorsTarget - // If our target has not changed, do not send a message // to the cluster manager and reset our exponential growth if (delta == 0) { @@ -607,6 +608,10 @@ private[spark] class ExecutorAllocationManager( // Number of tasks currently running on the cluster including speculative tasks. // Should be 0 when no stages are active. private var numRunningTasks: Int = _ + private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int] + private val jobIdToJobGroup = new mutable.HashMap[Int, String] + private val stageIdToJobId = new mutable.HashMap[Int, Int] + private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int] // Number of speculative tasks to be scheduled in each stage private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] @@ -619,6 +624,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { + jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { + null + } + + val maxConTasks = if (jobGroupId != null && + conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { + Int.MaxValue + } + + if (maxConTasks <= 0) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { + jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { + jobGroupToMaxConTasks(jobGroupId) = maxConTasks + } + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val jobGroupId = jobIdToJobGroup(jobEnd.jobId) + + jobIdToJobGroup -= jobEnd.jobId + // Remove jobGroup mapping if this is the only remaining job in the group. + if (!jobIdToJobGroup.values.exists(_ == jobGroupId)) { + jobGroupToMaxConTasks -= jobGroupId + } + } + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { initializing = false val stageId = stageSubmitted.stageInfo.stageId @@ -644,7 +690,9 @@ private[spark] class ExecutorAllocationManager( // Update the executor placement hints updateExecutorPlacementHints() + maxConcurrentTasks = getMaxConTasks } + logInfo(s"Setting max concurrent tasks to $maxConcurrentTasks on stage submit.") } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { @@ -655,7 +703,7 @@ private[spark] class ExecutorAllocationManager( stageIdToTaskIndices -= stageId stageIdToSpeculativeTaskIndices -= stageId stageIdToExecutorPlacementHints -= stageId - + stageIdToJobId -= stageId // Update the executor placement hints updateExecutorPlacementHints() @@ -668,7 +716,9 @@ private[spark] class ExecutorAllocationManager( numRunningTasks = 0 } } + maxConcurrentTasks = getMaxConTasks } + logInfo(s"Setting max concurrent tasks to $maxConcurrentTasks on stage complete.") } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { @@ -728,9 +778,13 @@ private[spark] class ExecutorAllocationManager( } if (taskEnd.taskInfo.speculative) { stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)} + maxConcurrentTasks = getMaxConTasks + logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task end.") } else { stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)} } + } else { + stageIdToCompleteTaskCount(stageId) = stageIdToCompleteTaskCount.getOrElse(stageId, 0) + 1 } } } @@ -758,10 +812,57 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 + maxConcurrentTasks = getMaxConTasks + logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } + /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ + def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum of no. + // of outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { + var runningTasks = 0 + if (stageIdToTaskIndices.contains(stageId)) { + runningTasks = + stageIdToTaskIndices(stageId).size - stageIdToCompleteTaskCount.getOrElse(stageId, 0) + } + + totalPendingTasks(stageId) + runningTasks + } + + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (a, b) => { + val activeTasks = getIncompleteTasksForStage(b._1, b._2) + sumOrMax(a, activeTasks) + } + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { + stagesItr.foldLeft(0)(sumIncompleteTasksForStages) + } + + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { + (maxConTasks, x) => { + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) + sumOrMax(maxConTasks, maxTasks) + } + } + + def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else (a + b) + + def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) + stagesByJobGroup.foldLeft(0)(sumIncompleteTasksForJobGroup) + } + /** * An estimate of the total number of pending tasks remaining for currently running stages. Does * not account for tasks which may have failed and been resubmitted. @@ -780,10 +881,31 @@ private[spark] class ExecutorAllocationManager( }.sum } + def pendingTasks(stageId: Int): Int = { + if (stageIdToNumTasks.contains(stageId)) { + stageIdToNumTasks(stageId) - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) + } else { + 0 + } + } + + def pendingSpeculativeTasks(stageId: Int): Int = { + if (stageIdToNumSpeculativeTasks.contains(stageId)) { + stageIdToNumSpeculativeTasks(stageId) - + stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0) + } else { + 0 + } + } + def totalPendingTasks(): Int = { pendingTasks + pendingSpeculativeTasks } + def totalPendingTasks(stageId: Int): Int = { + pendingTasks(stageId) + pendingSpeculativeTasks(stageId) + } + /** * The number of tasks currently running across all stages. */ @@ -849,4 +971,5 @@ private[spark] class ExecutorAllocationManager( private object ExecutorAllocationManager { val NOT_SET = Long.MaxValue + val DEFAULT_JOB_GROUP = "__default_job_group" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3804ea863b4f9..988a8dab329b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -75,6 +75,29 @@ private[spark] class TaskSetManager( val tasks = taskSet.tasks val numTasks = tasks.length + + // The max no. of concurrent tasks that can run for a particular job group. + val maxConcurrentTasks = { + // This is set to null while running the unit tests + if (taskSet.properties != null) { + val jobGroupId = taskSet.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + if (jobGroupId != null && !jobGroupId.isEmpty) { + val maxTasks = conf.getInt(s"spark.job.${jobGroupId}.maxConcurrentTasks", Int.MaxValue) + if (maxTasks < 1) { + throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress." + ) + } else { + maxTasks + } + } else { + Int.MaxValue + } + } else { + Int.MaxValue + } + } + val copiesRunning = new Array[Int](numTasks) // For each task, tracks whether a copy of the task has succeeded. A task will also be @@ -441,7 +464,7 @@ private[spark] class TaskSetManager( blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } - if (!isZombie && !offerBlacklisted) { + if (!isZombie && !offerBlacklisted && runningTasks < maxConcurrentTasks) { val curTime = clock.getTimeMillis() var allowedLocality = maxLocality @@ -512,6 +535,9 @@ private[spark] class TaskSetManager( serializedTask) } } else { + if (runningTasks >= maxConcurrentTasks) { + logDebug("Already running max. no. of concurrent tasks.") + } None } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 7da4bae0ab7eb..93623af626293 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -95,7 +95,9 @@ class ExecutorAllocationManagerSuite test("add executors") { sc = createSparkContext(1, 10, 1) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + val stage0 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) // Keep adding until the limit is reached assert(numExecutorsTarget(manager) === 1) @@ -140,7 +142,9 @@ class ExecutorAllocationManagerSuite test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + val stages = Seq(createStageInfo(0, 5), createStageInfo(1, 3), createStageInfo(2, 3)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Verify that we're capped at number of tasks in the stage assert(numExecutorsTarget(manager) === 0) @@ -156,7 +160,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) sc.listenerBus.postToAll(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) @@ -172,7 +176,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that re-running a task doesn't blow things up - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(2))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) @@ -188,18 +192,20 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } - test("add executors when speculative tasks added") { + test("add executors when speculative tasks added - original") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get + val stages = Seq(createStageInfo(0, 2)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Verify that we're capped at number of tasks including the speculative ones in the stage - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) @@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + val sc = new SparkContext(conf) + contexts += sc + sc.setJobGroup("group1", "", false) + + val manager = sc.executorAllocationManager.get + val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the stage + assert(maxNumExecutorsNeeded(manager) === 2) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + + // Submit a new job in the same job group + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + + // Set another jobGroup + sc.setJobGroup("group2", "", false) + + val stage3 = createStageInfo(3, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 5) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) + sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + + // Clear jobGroup + sc.clearJobGroup() + + val stage4 = createStageInfo(4, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") + val sc = new SparkContext(conf) + contexts += sc + sc.setJobGroup("group1", "", false) + + val manager = sc.executorAllocationManager.get + val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the stage + assert(maxNumExecutorsNeeded(manager) === 1) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 1) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + + // Submit a new job in the same job group + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 1) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + + // Set another jobGroup + sc.setJobGroup("group2", "", false) + + val stage3 = createStageInfo(3, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) + sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + + // Clear jobGroup + sc.clearJobGroup() + + val stage4 = createStageInfo(4, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 17) + } + + test("add executors capped by max concurrent tasks for concurrent job groups") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "5") + .set("spark.job.group2.maxConcurrentTasks", "11") + .set("spark.job.group3.maxConcurrentTasks", "17") + val sc = new SparkContext(conf) + contexts += sc + + val manager = sc.executorAllocationManager.get + + // Submit a job in group1 + sc.setJobGroup("group1", "", false) + val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the job group + assert(maxNumExecutorsNeeded(manager) === 2) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 5) + + // Submit a job in group 2 + sc.setJobGroup("group2", "", false) + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11 + + // Submit a job in group 3 + sc.setJobGroup("group3", "", false) + val stage3 = createStageInfo(3, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17 + + // Mark job in group 2 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11 + + // Mark job in group 1 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5 + + // Submit a job without any job group + sc.clearJobGroup() + val stage4 = createStageInfo(4, 333) + sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333 + + // Mark job without job group as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4)) + sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333 + } + + test("add executors capped by max concurrent tasks for concurrent job groups with speculation") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "5") + .set("spark.job.group2.maxConcurrentTasks", "11") + .set("spark.job.group3.maxConcurrentTasks", "17") + val sc = new SparkContext(conf) + contexts += sc + + val manager = sc.executorAllocationManager.get + + // Submit a job in group1 + sc.setJobGroup("group1", "", false) + val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the job group + assert(maxNumExecutorsNeeded(manager) === 2) + + // submit a speculative task + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) + assert(maxNumExecutorsNeeded(manager) === 3) // should increase the no. + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 5) + + // Submit a job in group 2 + sc.setJobGroup("group2", "", false) + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11 + + // Submit a job in group 3 + sc.setJobGroup("group3", "", false) + val stage3 = createStageInfo(3, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17 + + // Mark job in group 2 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11 + + // Mark job in group 1 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5 + + // Submit a job without any job group + sc.clearJobGroup() + val stage4 = createStageInfo(4, 333) + sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333 + + // Submit a speculative task in unbounded job group + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4)) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4)) + assert(maxNumExecutorsNeeded(manager) === 352) // should increase the no. + + // Mark job without job group as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4)) + sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333 + } + test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + val stage0 = createStageInfo(2, 5) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -352,7 +618,9 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8))) + val stages = Seq(createStageInfo(0, 8)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) // Remove when numExecutorsTarget is the same as the current number of executors assert(addExecutors(manager) === 1) @@ -390,7 +658,9 @@ class ExecutorAllocationManagerSuite test ("interleaving add and remove") { sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + val stage0 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) // Add a few executors assert(addExecutors(manager) === 1) @@ -569,7 +839,9 @@ class ExecutorAllocationManagerSuite val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + val stage0 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -682,7 +954,10 @@ class ExecutorAllocationManagerSuite // Starting a stage should start the add timer val numTasks = 10 - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + val stages = Seq(createStageInfo(0, numTasks), createStageInfo(1, numTasks), + createStageInfo(2, numTasks)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) assert(addTime(manager) !== NOT_SET) // Starting a subset of the tasks should not cancel the add timer @@ -696,8 +971,8 @@ class ExecutorAllocationManagerSuite // Start two different stages // The add timer should be canceled only if all tasks in both stages start running - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(2))) assert(addTime(manager) !== NOT_SET) taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } assert(addTime(manager) !== NOT_SET) @@ -808,8 +1083,9 @@ class ExecutorAllocationManagerSuite test("avoid ramp up when target < running executors") { sc = createSparkContext(0, 100000, 0) val manager = sc.executorAllocationManager.get - val stage1 = createStageInfo(0, 1000) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + val stages = Seq(createStageInfo(0, 1000), createStageInfo(1, 1000)) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) @@ -820,12 +1096,12 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, s"executor-$i") } assert(executorIds(manager).size === 15) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) adjustRequestedExecutors(manager) assert(numExecutorsTarget(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) addExecutors(manager) assert(numExecutorsTarget(manager) === 16) } @@ -842,7 +1118,9 @@ class ExecutorAllocationManagerSuite // Verify whether the initial number of executors is kept with no pending tasks assert(numExecutorsTarget(manager) === 3) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + val stage0 = createStageInfo(1, 2) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) clock.advance(100L) assert(maxNumExecutorsNeeded(manager) === 2) @@ -891,26 +1169,28 @@ class ExecutorAllocationManagerSuite Seq.empty, Seq.empty ) - val stageInfo1 = createStageInfo(1, 5, localityPreferences1) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1)) - - assert(localityAwareTasks(manager) === 3) - assert(hostToLocalTaskCount(manager) === - Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2)) - val localityPreferences2 = Seq( Seq(TaskLocation("host2"), TaskLocation("host3"), TaskLocation("host5")), Seq(TaskLocation("host3"), TaskLocation("host4"), TaskLocation("host5")), Seq.empty ) - val stageInfo2 = createStageInfo(2, 3, localityPreferences2) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2)) + val stageInfos = Seq(createStageInfo(1, 5, localityPreferences1), + createStageInfo(2, 3, localityPreferences2)) + + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stageInfos, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfos(0))) + + assert(localityAwareTasks(manager) === 3) + assert(hostToLocalTaskCount(manager) === + Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2)) + + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfos(1))) assert(localityAwareTasks(manager) === 5) assert(hostToLocalTaskCount(manager) === Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1)) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfos(0))) assert(localityAwareTasks(manager) === 2) assert(hostToLocalTaskCount(manager) === Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2)) @@ -921,7 +1201,9 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(maxNumExecutorsNeeded(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + val stage0 = createStageInfo(0, 1) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) assert(maxNumExecutorsNeeded(manager) === 1) val taskInfo = createTaskInfo(1, 1, "executor-1") @@ -942,7 +1224,9 @@ class ExecutorAllocationManagerSuite // Allocation manager is reset when adding executor requests are sent without reporting back // executor added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + val stage0 = createStageInfo(0, 10) + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq(stage0), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 2) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ae43f4cadc037..d0be4f863a7b9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val props = new Properties(); + props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + + val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) + } + val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + + // make some offers to our taskset + var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + + // make 4 more offers + val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + + // inform tsm that one task has completed + val directTaskResult = createTaskResult(0) + tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + + // make 4 more offers after previous task completed + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + + val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) + } + val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + + // make 5 offers to our taskset + var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 3).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + // tsm accepts all offers as it has tasks to run & there is no constraint for maxConcurrentTasks + assert(taskDescs.size === 6) + + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + (0 until 1).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // tsm accepts more offers as it has tasks to run + + val directTaskResult = createTaskResult(0) + tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // tsm accepts 2 offers as it had only 2 tasks to run. + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { diff --git a/docs/configuration.md b/docs/configuration.md index 6e9fe591b70a3..936550a14bc8f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1437,6 +1437,16 @@ Apart from these, the following properties are also available, and may be useful driver using more memory. + + spark.job.[userJobGroup].maxConcurrentTasks + Int.MAX_VALUE + + Maximum number of tasks to run concurrently in a given job group. This is especially useful + to avoid DoS while accessing an external service for ex: a database from multiple executors + simultaneously. You can set the job group by calling the setJobGroup method on + sc. + + spark.blacklist.enabled From 04956cc0ab84ca4291cbf119927f0ead74db4b31 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 18 Sep 2017 14:55:46 -0500 Subject: [PATCH 2/4] Account for running speculative tasks --- .../spark/ExecutorAllocationManager.scala | 53 ++++++++++++++----- .../ExecutorAllocationManagerSuite.scala | 46 ++++++++++++---- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 3 files changed, 79 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 317e6686ddabe..0693bb052acb7 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -612,6 +612,7 @@ private[spark] class ExecutorAllocationManager( private val jobIdToJobGroup = new mutable.HashMap[Int, String] private val stageIdToJobId = new mutable.HashMap[Int, Int] private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int] + private val stageIdToCompleteSpeculativeTaskCount = new mutable.HashMap[Int, Int] // Number of speculative tasks to be scheduled in each stage private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] @@ -778,6 +779,12 @@ private[spark] class ExecutorAllocationManager( } if (taskEnd.taskInfo.speculative) { stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)} + val numSpeculativeTasks = stageIdToNumSpeculativeTasks(stageId) - 1 + if (numSpeculativeTasks > 0) { + stageIdToNumSpeculativeTasks(stageId) = numSpeculativeTasks + } else { + stageIdToNumSpeculativeTasks.remove(stageId) + } maxConcurrentTasks = getMaxConTasks logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task end.") } else { @@ -785,6 +792,12 @@ private[spark] class ExecutorAllocationManager( } } else { stageIdToCompleteTaskCount(stageId) = stageIdToCompleteTaskCount.getOrElse(stageId, 0) + 1 + if (taskEnd.taskInfo.speculative) { + stageIdToCompleteSpeculativeTaskCount(stageId) = + stageIdToCompleteSpeculativeTaskCount.getOrElse(stageId, 0) + 1 + maxConcurrentTasks = getMaxConTasks + logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task end.") + } } } } @@ -825,22 +838,16 @@ private[spark] class ExecutorAllocationManager( // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs // with multiple stages. We need to get all the active stages belonging to a job group to // calculate the total no. of pending + running tasks to decide the maximum no. of executors - // we need at that time to serve the outstanding tasks. This is capped by the minimum of no. - // of outstanding tasks and the max concurrent limit specified for the job group if any. + // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { - var runningTasks = 0 - if (stageIdToTaskIndices.contains(stageId)) { - runningTasks = - stageIdToTaskIndices(stageId).size - stageIdToCompleteTaskCount.getOrElse(stageId, 0) - } - - totalPendingTasks(stageId) + runningTasks + totalPendingTasks(stageId) + totalRunningTasks(stageId) } - def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (a, b) => { - val activeTasks = getIncompleteTasksForStage(b._1, b._2) - sumOrMax(a, activeTasks) + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { + val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) + sumOrMax(totalTasks, activeTasks) } // Get the total running & pending tasks for all stages in a job group. def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { @@ -911,6 +918,28 @@ private[spark] class ExecutorAllocationManager( */ def totalRunningTasks(): Int = numRunningTasks + def totalRunningTasks(stageId: Int): Int = { + runningTasks(stageId) + runningSpeculativeTasks(stageId) + } + + def runningTasks(stageId: Int): Int = { + if (stageIdToTaskIndices.contains(stageId)) { + stageIdToTaskIndices(stageId).size - stageIdToCompleteTaskCount.getOrElse(stageId, 0) + } else { + 0 + } + } + + def runningSpeculativeTasks(stageId: Int): Int = { + if (stageIdToNumSpeculativeTasks.contains(stageId) && + stageIdToSpeculativeTaskIndices.contains(stageId)) { + stageIdToSpeculativeTaskIndices(stageId).size - + stageIdToCompleteSpeculativeTaskCount.getOrElse(stageId, 0) + } else { + 0 + } + } + /** * Return true if an executor is not currently running a task, and false otherwise. * diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 93623af626293..0459025ecef20 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -192,7 +192,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } - test("add executors when speculative tasks added - original") { + test("add executors when speculative tasks added") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get @@ -417,6 +417,7 @@ class ExecutorAllocationManagerSuite .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") .set("spark.dynamicAllocation.testing", "true") + .set("spark.speculation", "true") .set("spark.job.group1.maxConcurrentTasks", "5") .set("spark.job.group2.maxConcurrentTasks", "11") .set("spark.job.group3.maxConcurrentTasks", "17") @@ -432,16 +433,28 @@ class ExecutorAllocationManagerSuite sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) - // Verify that we're capped at number of max concurrent tasks in the job group + // Verify that we're capped at number of max concurrent tasks in the job group. + assert(maxNumExecutorsNeeded(manager) === 2) // There are only 2 tasks in stage 0 + + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 2, "executor-1"))) assert(maxNumExecutorsNeeded(manager) === 2) - // submit a speculative task + // submit speculative tasks + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) + assert(maxNumExecutorsNeeded(manager) === 3) sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) - assert(maxNumExecutorsNeeded(manager) === 3) // should increase the no. + assert(maxNumExecutorsNeeded(manager) === 4) + + val speculativeTask1 = createTaskInfo(0, 1, "executor-2", true) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, speculativeTask1)) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 3, "executor-2", true))) + + assert(maxNumExecutorsNeeded(manager) === 4) // no. of executors should remain the same // Submit another stage in the same job sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) - assert(maxNumExecutorsNeeded(manager) === 5) + assert(maxNumExecutorsNeeded(manager) === 5) // Limited by conf to 5 or else would have been 14. // Submit a job in group 2 sc.setJobGroup("group2", "", false) @@ -462,11 +475,20 @@ class ExecutorAllocationManagerSuite sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded)) assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11 + // Complete a stage in job group1 + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + // Stage1 in job0 finished 10 tasks, we have 4 outstanding (2 regular + 2 speculative). The max + // no. of tasks is configured as 5 for group1, so now 1 executor is reduced (5-4). + assert(maxNumExecutorsNeeded(manager) === 21) + // Kill a speculative task in stage0. This will reduce the no. of executors by 1. + sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, + TaskKilled("For testing"), speculativeTask1, null)) + assert(maxNumExecutorsNeeded(manager) === 20) + // Mark job in group 1 as complete sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) - assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5 + assert(maxNumExecutorsNeeded(manager) === 17) // 20 - 3 (2 regular tasks + 1 speculative task) // Submit a job without any job group sc.clearJobGroup() @@ -478,12 +500,18 @@ class ExecutorAllocationManagerSuite // Submit a speculative task in unbounded job group sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4)) sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4)) - assert(maxNumExecutorsNeeded(manager) === 352) // should increase the no. + assert(maxNumExecutorsNeeded(manager) === 352) // 2 executors added for speculative execution. + + val speculativeTask2 = createTaskInfo(10, 30, "executor-2", true) + sc.listenerBus.postToAll(SparkListenerTaskStart(4, 0, speculativeTask2)) + sc.listenerBus.postToAll(SparkListenerTaskEnd(4, 0, null, Success, speculativeTask2, null)) + + assert(maxNumExecutorsNeeded(manager) === 351) // Reduce count as speculative task completed. // Mark job without job group as complete sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4)) sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded)) - assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333 + assert(maxNumExecutorsNeeded(manager) === 17) // 350 (351 with speculative) - 333 } test("cancel pending executors when no longer needed") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d0be4f863a7b9..ec7d234cdee71 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1320,7 +1320,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg "exec1" -> "host1", "exec2" -> "host2" ).flatMap { case (exec, host) => - // offer each executor twice (simulating 2 cores per executor) + // offer each executor twice (simulating 3 cores per executor) (0 until 3).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} } // tsm accepts all offers as it has tasks to run & there is no constraint for maxConcurrentTasks From c41ccf9f3be7da91e7140b010344bdab4aa6c615 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Thu, 21 Sep 2017 14:57:17 -0500 Subject: [PATCH 3/4] Map the jobgroupId to stageId instead of jobId --- .../spark/ExecutorAllocationManager.scala | 42 +++++++----------- .../spark/scheduler/TaskSetManagerSuite.scala | 43 ------------------- 2 files changed, 16 insertions(+), 69 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0693bb052acb7..91df1e3df4edb 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -609,8 +609,7 @@ private[spark] class ExecutorAllocationManager( // Should be 0 when no stages are active. private var numRunningTasks: Int = _ private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int] - private val jobIdToJobGroup = new mutable.HashMap[Int, String] - private val stageIdToJobId = new mutable.HashMap[Int, Int] + private val stageIdToJobGroupId = new mutable.HashMap[Int, String] private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int] private val stageIdToCompleteSpeculativeTaskCount = new mutable.HashMap[Int, Int] @@ -626,8 +625,6 @@ private[spark] class ExecutorAllocationManager( private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) - var jobGroupId = if (jobStart.properties != null) { jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) } else { @@ -650,22 +647,12 @@ private[spark] class ExecutorAllocationManager( jobGroupId = DEFAULT_JOB_GROUP } - jobIdToJobGroup(jobStart.jobId) = jobGroupId + jobStart.stageInfos.foreach(stageInfo => stageIdToJobGroupId(stageInfo.stageId) = jobGroupId) if (!jobGroupToMaxConTasks.contains(jobGroupId)) { jobGroupToMaxConTasks(jobGroupId) = maxConTasks } } - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - val jobGroupId = jobIdToJobGroup(jobEnd.jobId) - - jobIdToJobGroup -= jobEnd.jobId - // Remove jobGroup mapping if this is the only remaining job in the group. - if (!jobIdToJobGroup.values.exists(_ == jobGroupId)) { - jobGroupToMaxConTasks -= jobGroupId - } - } - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { initializing = false val stageId = stageSubmitted.stageInfo.stageId @@ -704,7 +691,14 @@ private[spark] class ExecutorAllocationManager( stageIdToTaskIndices -= stageId stageIdToSpeculativeTaskIndices -= stageId stageIdToExecutorPlacementHints -= stageId - stageIdToJobId -= stageId + + val jobGroupId = stageIdToJobGroupId(stageId) + stageIdToJobGroupId -= stageId + // Remove jobGroup mapping if this is the only remaining stage in the group. + if (!stageIdToJobGroupId.values.exists(_ == jobGroupId)) { + jobGroupToMaxConTasks -= jobGroupId + } + // Update the executor placement hints updateExecutorPlacementHints() @@ -841,22 +835,18 @@ private[spark] class ExecutorAllocationManager( // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of // outstanding tasks and the max concurrent limit specified for the job group if any. - def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { - totalPendingTasks(stageId) + totalRunningTasks(stageId) - } - - def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { - val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) + def sumIncompleteTasksForStages: (Int, Int) => Int = (totalTasks, stageId) => { + val activeTasks = totalPendingTasks(stageId) + totalRunningTasks(stageId) sumOrMax(totalTasks, activeTasks) } // Get the total running & pending tasks for all stages in a job group. - def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { + def getIncompleteTasksForJobGroup(stagesItr: Set[Int]): Int = { stagesItr.foldLeft(0)(sumIncompleteTasksForStages) } - def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, String])) => Int = { (maxConTasks, x) => { - val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2.keySet.toSet) val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) sumOrMax(maxConTasks, maxTasks) } @@ -866,7 +856,7 @@ private[spark] class ExecutorAllocationManager( def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) - val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) + val stagesByJobGroup = stageIdToJobGroupId.groupBy(_._2) stagesByJobGroup.foldLeft(0)(sumIncompleteTasksForJobGroup) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ec7d234cdee71..3f67fdb86e245 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1303,49 +1303,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskDescs.size === 1) // tsm accepts one as it can run one more task } - test("do not limit max concurrent running tasks in a job group by default") { - val conf = new SparkConf(). - set(config.BLACKLIST_ENABLED, true) - - sc = new SparkContext("local", "test", conf) - sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) - - val tasks = Array.tabulate[Task[_]](10) { i => - new FakeTask(0, i, Nil) - } - val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) - - // make 5 offers to our taskset - var taskDescs = Seq( - "exec1" -> "host1", - "exec2" -> "host2" - ).flatMap { case (exec, host) => - // offer each executor twice (simulating 3 cores per executor) - (0 until 3).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} - } - // tsm accepts all offers as it has tasks to run & there is no constraint for maxConcurrentTasks - assert(taskDescs.size === 6) - - taskDescs = Seq( - "exec1" -> "host1", - "exec2" -> "host2" - ).flatMap { case (exec, host) => - (0 until 1).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} - } - assert(taskDescs.size === 2) // tsm accepts more offers as it has tasks to run - - val directTaskResult = createTaskResult(0) - tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) - - taskDescs = Seq( - "exec1" -> "host1", - "exec2" -> "host2" - ).flatMap { case (exec, host) => - (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} - } - assert(taskDescs.size === 2) // tsm accepts 2 offers as it had only 2 tasks to run. - } - private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { From b6499066612a025b88765e8d84b7ca79198c6a47 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 17 Jul 2018 12:59:30 -0500 Subject: [PATCH 4/4] Update log message --- .../apache/spark/scheduler/TaskSetManager.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c7fc82c49086d..02bcea793b7ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -123,6 +123,7 @@ private[spark] class TaskSetManager( var parent: Pool = null private var totalResultSize = 0L private var calculatedTasks = 0 + private var loggedMaxConTasks = false private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = { blacklistTracker.map { _ => @@ -541,8 +542,18 @@ private[spark] class TaskSetManager( serializedTask) } } else { - if (runningTasks >= maxConcurrentTasks) { - logDebug("Already running max. no. of concurrent tasks.") + if (runningTasks >= maxConcurrentTasks && !loggedMaxConTasks) { + val jobGroupId = taskSet.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + val message = s"Already running max. no. of concurrent tasks $maxConcurrentTasks" + if (jobGroupId != null && !jobGroupId.isEmpty) { + logInfo(s"$message for jobGroup " + + s"${taskSet.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)} specified by " + + s"spark.job.${jobGroupId}.maxConcurrentTasks" + ) + } else { + logInfo(message) + } + loggedMaxConTasks = true } None }