From 4154849d98bd4519669717a5acb7583df2502297 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 21 Sep 2015 20:31:17 +0800 Subject: [PATCH 1/7] add a function --- .../spark/ExecutorAllocationManager.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..caff738d337ce 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -240,6 +240,21 @@ private[spark] class ExecutorAllocationManager( executor.awaitTermination(10, TimeUnit.SECONDS) } + def setExecutorBusy(info: TaskInfo): Unit = { + val executorId = info.executorId + allocationManager.synchronized { + // This guards against the race condition in which the `SparkListenerTaskStart` + // event is posted before the `SparkListenerBlockManagerAdded` event, which is + // possible because these events are posted in different threads. (see SPARK-4951) + if (!allocationManager.executorIds.contains(executorId)) { + allocationManager.onExecutorAdded(executorId) + } + + // Mark the executor on which this task is scheduled as busy + allocationManager.onExecutorBusy(executorId) + } + } + /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. @@ -590,12 +605,6 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { numRunningTasks += 1 - // This guards against the race condition in which the `SparkListenerTaskStart` - // event is posted before the `SparkListenerBlockManagerAdded` event, which is - // possible because these events are posted in different threads. (see SPARK-4951) - if (!allocationManager.executorIds.contains(executorId)) { - allocationManager.onExecutorAdded(executorId) - } // If this is the last pending task, mark the scheduler queue as empty stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex @@ -603,9 +612,7 @@ private[spark] class ExecutorAllocationManager( allocationManager.onSchedulerQueueEmpty() } - // Mark the executor on which this task is scheduled as busy executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId - allocationManager.onExecutorBusy(executorId) } } From 679f676ffa5d21b7dcfe1dc21ca3bfbfa390359d Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 21 Sep 2015 20:32:26 +0800 Subject: [PATCH 2/7] set executor busy --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 62af9031b9f8b..87b0da50736f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -491,6 +491,7 @@ private[spark] class TaskSetManager( taskName, taskId, host, taskLocality, serializedTask.limit)) sched.dagScheduler.taskStarted(task, info) + sched.sc.executorAllocationManager.foreach(_.setExecutorBusy(info)) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask)) } From 566cfc9a84a066fccbd94876fbe6f5e89b63cd86 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Mon, 21 Sep 2015 21:52:42 +0800 Subject: [PATCH 3/7] scala style --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index caff738d337ce..6673d3ba49344 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,7 +249,7 @@ private[spark] class ExecutorAllocationManager( if (!allocationManager.executorIds.contains(executorId)) { allocationManager.onExecutorAdded(executorId) } - + // Mark the executor on which this task is scheduled as busy allocationManager.onExecutorBusy(executorId) } From cfc50720c3ec54c85acf28f065d2a203bc025087 Mon Sep 17 00:00:00 2001 From: huleilei Date: Tue, 22 Sep 2015 16:51:12 +0800 Subject: [PATCH 4/7] unit test --- .../spark/ExecutorAllocationManagerSuite.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 116f027a0f987..b6a41cb0feae9 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -586,9 +586,12 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 5) // Starting a task cancel the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) +// sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) +// sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) +// sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + setExecutorBusy(manager, createTaskInfo(0, 0, "executor-2")) + setExecutorBusy(manager, createTaskInfo(1, 1, "executor-2")) + setExecutorBusy(manager, createTaskInfo(2, 2, "executor-2")) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) @@ -661,7 +664,8 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).isEmpty) sc.listenerBus.postToAll(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + // sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + setExecutorBusy(manager, createTaskInfo(0, 0, "executor-1")) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) @@ -876,6 +880,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy) private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) + private val _setExecutorBusy = PrivateMethod[Unit]('setExecutorBusy) private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() @@ -954,4 +959,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = { manager invokePrivate _hostToLocalTaskCount() } + + private def setExecutorBusy(manager: ExecutorAllocationManager, info: TaskInfo): Map[String, Int] = { + manager invokePrivate _setExecutorBusy(info) + } } From 7cfd36937fccb6c1374863791ff80eecb031a4cc Mon Sep 17 00:00:00 2001 From: huleilei Date: Tue, 22 Sep 2015 17:11:09 +0800 Subject: [PATCH 5/7] type match --- .../scala/org/apache/spark/ExecutorAllocationManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index b6a41cb0feae9..d68eb571f8860 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -880,7 +880,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy) private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) - private val _setExecutorBusy = PrivateMethod[Unit]('setExecutorBusy) + private val _setExecutorBusy = PrivateMethod[Map[String,Int]]('setExecutorBusy) private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() From 0372fbdd7372822cb99330ef56fa495dcdf8109a Mon Sep 17 00:00:00 2001 From: huleilei Date: Tue, 22 Sep 2015 19:06:50 +0800 Subject: [PATCH 6/7] unit test --- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index d68eb571f8860..a4bddad628f04 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -586,12 +586,12 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 5) // Starting a task cancel the remove timer for that executor + setExecutorBusy(manager, createTaskInfo(0, 0, "executor-1")) + setExecutorBusy(manager, createTaskInfo(1, 1, "executor-1")) + setExecutorBusy(manager, createTaskInfo(2, 2, "executor-2")) // sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) // sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) // sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) - setExecutorBusy(manager, createTaskInfo(0, 0, "executor-2")) - setExecutorBusy(manager, createTaskInfo(1, 1, "executor-2")) - setExecutorBusy(manager, createTaskInfo(2, 2, "executor-2")) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) From f1ab8947744072eaac20691950e77436ad05e745 Mon Sep 17 00:00:00 2001 From: huleilei Date: Tue, 22 Sep 2015 19:09:07 +0800 Subject: [PATCH 7/7] unit test --- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index a4bddad628f04..0908a6947e16d 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -589,9 +589,9 @@ class ExecutorAllocationManagerSuite setExecutorBusy(manager, createTaskInfo(0, 0, "executor-1")) setExecutorBusy(manager, createTaskInfo(1, 1, "executor-1")) setExecutorBusy(manager, createTaskInfo(2, 2, "executor-2")) -// sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) -// sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) -// sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) @@ -664,8 +664,8 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).isEmpty) sc.listenerBus.postToAll(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - // sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) setExecutorBusy(manager, createTaskInfo(0, 0, "executor-1")) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1"))