From 0d5b02547087c485337667501393d59fbbeb1e3e Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 23 Oct 2016 21:37:26 +0100 Subject: [PATCH 01/10] Add Pool usage policies test coverage for FIFO & FAIR Schedulers --- .../apache/spark/scheduler/PoolSuite.scala | 94 ++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 520736ab64270..5606b6c66b073 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -31,6 +31,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val LOCAL = "local" val APP_NAME = "PoolSuite" val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" + val TEST_POOL = "testPool" def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) : TaskSetManager = { @@ -201,6 +202,97 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR) } + test("FIFO Scheduler just uses root pool") { + sc = new SparkContext("local", "PoolSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0) + val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + + val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) + val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + + val properties = new Properties() + properties.setProperty("spark.scheduler.pool", TEST_POOL) + + // FIFOSchedulableBuilder just uses rootPool so even if properties are set, related pool + // (testPool) is not created and TaskSetManagers are added to rootPool + schedulableBuilder.addTaskSetManager(taskSetManager0, properties) + schedulableBuilder.addTaskSetManager(taskSetManager1, properties) + + assert(rootPool.getSchedulableByName(TEST_POOL) == null) + assert(rootPool.schedulableQueue.size == 2) + assert(rootPool.getSchedulableByName(taskSetManager0.name) eq taskSetManager0) + assert(rootPool.getSchedulableByName(taskSetManager1.name) eq taskSetManager1) + } + + test("FAIR Scheduler uses default pool when spark.scheduler.pool property is not set") { + sc = new SparkContext("local", "PoolSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + // FAIR Scheduler uses default pool when pool properties are null + val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) + + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + + val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) + assert(defaultPool != null) + assert(defaultPool.schedulableQueue.size == 1) + assert(defaultPool.getSchedulableByName(taskSetManager0.name) eq taskSetManager0) + + // FAIR Scheduler uses default pool when spark.scheduler.pool property is not set + val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) + + schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties()) + + assert(defaultPool.schedulableQueue.size == 2) + assert(defaultPool.getSchedulableByName(taskSetManager1.name) eq taskSetManager1) + + // FAIR Scheduler uses default pool when spark.scheduler.pool property is set as default pool + val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, taskScheduler) + + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, schedulableBuilder + .DEFAULT_POOL_NAME) + + schedulableBuilder.addTaskSetManager(taskSetManager2, properties) + + assert(defaultPool.schedulableQueue.size == 3) + assert(defaultPool.getSchedulableByName(taskSetManager2.name) eq taskSetManager2) + } + + test("FAIR Scheduler creates a new pool when spark.scheduler.pool property points non-existent") { + sc = new SparkContext("local", "PoolSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + schedulableBuilder.buildPools() + + assert(rootPool.getSchedulableByName(TEST_POOL) == null) + + val taskSetManager = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) + + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, TEST_POOL) + + // FAIR Scheduler creates a new pool with default values when spark.scheduler.pool property + // points non-existent pool. This can be happened when scheduler allocation file is not set or + // it does not contain related pool + schedulableBuilder.addTaskSetManager(taskSetManager, properties) + + val testPool = rootPool.getSchedulableByName(TEST_POOL) + assert(testPool != null) + assert(testPool.schedulingMode == schedulableBuilder.DEFAULT_SCHEDULING_MODE) + assert(testPool.minShare == schedulableBuilder.DEFAULT_MINIMUM_SHARE) + assert(testPool.weight == schedulableBuilder.DEFAULT_WEIGHT) + assert(testPool.getSchedulableByName(taskSetManager.name) eq taskSetManager) + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { assert(rootPool.getSchedulableByName(poolName) != null) @@ -208,5 +300,5 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight) assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode) } - + } From fd9e3f337d8830f74159913e949f3fdbe12da947 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sat, 24 Dec 2016 20:35:51 +0000 Subject: [PATCH 02/10] Minor refactoring for previous commit --- .../apache/spark/scheduler/PoolSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 5606b6c66b073..21548334b639d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -202,7 +202,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR) } - test("FIFO Scheduler just uses root pool") { + test("SPARK-18066: FIFO Scheduler just uses root pool") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) @@ -222,11 +222,12 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { assert(rootPool.getSchedulableByName(TEST_POOL) == null) assert(rootPool.schedulableQueue.size == 2) - assert(rootPool.getSchedulableByName(taskSetManager0.name) eq taskSetManager0) - assert(rootPool.getSchedulableByName(taskSetManager1.name) eq taskSetManager1) + assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) + assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) } - test("FAIR Scheduler uses default pool when spark.scheduler.pool property is not set") { + test("SPARK-18066: FAIR Scheduler uses default pool when spark.scheduler.pool property is not " + + "set") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) @@ -242,7 +243,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) assert(defaultPool != null) assert(defaultPool.schedulableQueue.size == 1) - assert(defaultPool.getSchedulableByName(taskSetManager0.name) eq taskSetManager0) + assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) // FAIR Scheduler uses default pool when spark.scheduler.pool property is not set val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) @@ -250,7 +251,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties()) assert(defaultPool.schedulableQueue.size == 2) - assert(defaultPool.getSchedulableByName(taskSetManager1.name) eq taskSetManager1) + assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) // FAIR Scheduler uses default pool when spark.scheduler.pool property is set as default pool val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, taskScheduler) @@ -262,10 +263,11 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { schedulableBuilder.addTaskSetManager(taskSetManager2, properties) assert(defaultPool.schedulableQueue.size == 3) - assert(defaultPool.getSchedulableByName(taskSetManager2.name) eq taskSetManager2) + assert(defaultPool.getSchedulableByName(taskSetManager2.name) === taskSetManager2) } - test("FAIR Scheduler creates a new pool when spark.scheduler.pool property points non-existent") { + test("SPARK-18066: FAIR Scheduler creates a new pool when spark.scheduler.pool property points " + + "non-existent") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) @@ -290,7 +292,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { assert(testPool.schedulingMode == schedulableBuilder.DEFAULT_SCHEDULING_MODE) assert(testPool.minShare == schedulableBuilder.DEFAULT_MINIMUM_SHARE) assert(testPool.weight == schedulableBuilder.DEFAULT_WEIGHT) - assert(testPool.getSchedulableByName(taskSetManager.name) eq taskSetManager) + assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager) } private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, From be133c1d1888cab5302a4bd04c72303051091b69 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Fri, 10 Feb 2017 23:18:40 +0000 Subject: [PATCH 03/10] Review comments are addressed. --- .../org/apache/spark/scheduler/Pool.scala | 4 +- .../apache/spark/scheduler/PoolSuite.scala | 78 +++++++++---------- 2 files changed, 39 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 2a69a6c5e8790..a2063de68b24c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -37,8 +37,8 @@ private[spark] class Pool( val schedulableQueue = new ConcurrentLinkedQueue[Schedulable] val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] - var weight = initWeight - var minShare = initMinShare + val weight = initWeight + val minShare = initMinShare var runningTasks = 0 var priority = 0 diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 21548334b639d..45795db5685a1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -33,7 +33,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" val TEST_POOL = "testPool" - def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) + private def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) : TaskSetManager = { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) @@ -41,7 +41,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) } - def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { + private def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = { val taskSetQueue = rootPool.getSortedTaskSetQueue val nextTaskSetToSchedule = taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) @@ -202,7 +202,11 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR) } - test("SPARK-18066: FIFO Scheduler just uses root pool") { + /** + * spark.scheduler.pool property should be ignored for the FIFO scheduler, + * because pools are only needed for fair scheduling. + */ + test("FIFO scheduler uses root pool and not spark.scheduler.pool property") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) @@ -215,19 +219,19 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val properties = new Properties() properties.setProperty("spark.scheduler.pool", TEST_POOL) - // FIFOSchedulableBuilder just uses rootPool so even if properties are set, related pool - // (testPool) is not created and TaskSetManagers are added to rootPool + // When FIFO Scheduler is used and task sets are submitted, they should be added to + // the root pool, and no additional pools should be created + // (even though there's a configured default pool). schedulableBuilder.addTaskSetManager(taskSetManager0, properties) schedulableBuilder.addTaskSetManager(taskSetManager1, properties) - assert(rootPool.getSchedulableByName(TEST_POOL) == null) - assert(rootPool.schedulableQueue.size == 2) + assert(rootPool.getSchedulableByName(TEST_POOL) === null) + assert(rootPool.schedulableQueue.size === 2) assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) } - test("SPARK-18066: FAIR Scheduler uses default pool when spark.scheduler.pool property is not " + - "set") { + test("FAIR Scheduler uses default pool when spark.scheduler.pool property is not set") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) @@ -235,39 +239,27 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) schedulableBuilder.buildPools() - // FAIR Scheduler uses default pool when pool properties are null + // Submit a new task set manager with pool properties set to null. This should result + // in the task set manager getting added to the default pool. val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager0, null) val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) assert(defaultPool != null) - assert(defaultPool.schedulableQueue.size == 1) + assert(defaultPool.schedulableQueue.size === 1) assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) - // FAIR Scheduler uses default pool when spark.scheduler.pool property is not set + // When a task set manager is submitted with spark.scheduler.pool unset, it should be added to + // the default pool (as above). val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) - schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties()) - assert(defaultPool.schedulableQueue.size == 2) + assert(defaultPool.schedulableQueue.size === 2) assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) - - // FAIR Scheduler uses default pool when spark.scheduler.pool property is set as default pool - val taskSetManager2 = createTaskSetManager(stageId = 2, numTasks = 1, taskScheduler) - - val properties = new Properties() - properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, schedulableBuilder - .DEFAULT_POOL_NAME) - - schedulableBuilder.addTaskSetManager(taskSetManager2, properties) - - assert(defaultPool.schedulableQueue.size == 3) - assert(defaultPool.getSchedulableByName(taskSetManager2.name) === taskSetManager2) } - test("SPARK-18066: FAIR Scheduler creates a new pool when spark.scheduler.pool property points " + - "non-existent") { + test("FAIR Scheduler creates a new pool when spark.scheduler.pool property points to " + + "a non-existent pool") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) @@ -275,32 +267,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) schedulableBuilder.buildPools() - assert(rootPool.getSchedulableByName(TEST_POOL) == null) + assert(rootPool.getSchedulableByName(TEST_POOL) === null) val taskSetManager = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler) val properties = new Properties() properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, TEST_POOL) - // FAIR Scheduler creates a new pool with default values when spark.scheduler.pool property - // points non-existent pool. This can be happened when scheduler allocation file is not set or - // it does not contain related pool + // The fair scheduler should create a new pool with default values when spark.scheduler.pool + // points to a pool that doesn't exist yet (this can happen when the file that pools are read + // from isn't set, or when that file doesn't contain the pool name specified + // by spark.scheduler.pool). schedulableBuilder.addTaskSetManager(taskSetManager, properties) val testPool = rootPool.getSchedulableByName(TEST_POOL) assert(testPool != null) - assert(testPool.schedulingMode == schedulableBuilder.DEFAULT_SCHEDULING_MODE) - assert(testPool.minShare == schedulableBuilder.DEFAULT_MINIMUM_SHARE) - assert(testPool.weight == schedulableBuilder.DEFAULT_WEIGHT) + assert(testPool.schedulingMode === schedulableBuilder.DEFAULT_SCHEDULING_MODE) + assert(testPool.minShare === schedulableBuilder.DEFAULT_MINIMUM_SHARE) + assert(testPool.weight === schedulableBuilder.DEFAULT_WEIGHT) + + verifyPool(rootPool, TEST_POOL, schedulableBuilder.DEFAULT_MINIMUM_SHARE, + schedulableBuilder.DEFAULT_WEIGHT, schedulableBuilder.DEFAULT_SCHEDULING_MODE) assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager) } private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { - assert(rootPool.getSchedulableByName(poolName) != null) - assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare) - assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight) - assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode) + val selectedPool = rootPool.getSchedulableByName(poolName) + assert(selectedPool != null) + assert(selectedPool.minShare === expectedInitMinShare) + assert(selectedPool.weight === expectedInitWeight) + assert(selectedPool.schedulingMode === expectedSchedulingMode) } - } From ad16e53a26f1e986f1ac65263a8d40c70d2a9a15 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Fri, 10 Feb 2017 23:33:14 +0000 Subject: [PATCH 04/10] Minor style is fixed. --- .../test/scala/org/apache/spark/scheduler/PoolSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 45795db5685a1..027540c837231 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -203,9 +203,9 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } /** - * spark.scheduler.pool property should be ignored for the FIFO scheduler, - * because pools are only needed for fair scheduling. - */ + * spark.scheduler.pool property should be ignored for the FIFO scheduler, + * because pools are only needed for fair scheduling. + */ test("FIFO scheduler uses root pool and not spark.scheduler.pool property") { sc = new SparkContext("local", "PoolSuite") val taskScheduler = new TaskSchedulerImpl(sc) From e292923a29b64717b96d9d72ced2c2b7968ab54f Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sat, 11 Feb 2017 11:11:39 +0000 Subject: [PATCH 05/10] Minor redundant check is removed. --- .../scala/org/apache/spark/scheduler/PoolSuite.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 027540c837231..54c5dae7ce64c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -245,7 +245,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { schedulableBuilder.addTaskSetManager(taskSetManager0, null) val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME) - assert(defaultPool != null) + assert(defaultPool !== null) assert(defaultPool.schedulableQueue.size === 1) assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) @@ -280,21 +280,16 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { // by spark.scheduler.pool). schedulableBuilder.addTaskSetManager(taskSetManager, properties) - val testPool = rootPool.getSchedulableByName(TEST_POOL) - assert(testPool != null) - assert(testPool.schedulingMode === schedulableBuilder.DEFAULT_SCHEDULING_MODE) - assert(testPool.minShare === schedulableBuilder.DEFAULT_MINIMUM_SHARE) - assert(testPool.weight === schedulableBuilder.DEFAULT_WEIGHT) - verifyPool(rootPool, TEST_POOL, schedulableBuilder.DEFAULT_MINIMUM_SHARE, schedulableBuilder.DEFAULT_WEIGHT, schedulableBuilder.DEFAULT_SCHEDULING_MODE) + val testPool = rootPool.getSchedulableByName(TEST_POOL) assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager) } private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) - assert(selectedPool != null) + assert(selectedPool !== null) assert(selectedPool.minShare === expectedInitMinShare) assert(selectedPool.weight === expectedInitWeight) assert(selectedPool.schedulingMode === expectedSchedulingMode) From fc3deeab536e4d977033f03e773e6f1383d8f979 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 12 Feb 2017 22:35:55 +0000 Subject: [PATCH 06/10] Pool variables are addressed via separated PR --- core/src/main/scala/org/apache/spark/scheduler/Pool.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index a2063de68b24c..2a69a6c5e8790 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -37,8 +37,8 @@ private[spark] class Pool( val schedulableQueue = new ConcurrentLinkedQueue[Schedulable] val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] - val weight = initWeight - val minShare = initMinShare + var weight = initWeight + var minShare = initMinShare var runningTasks = 0 var priority = 0 From e2d2e59e7556d35f9f56eec7db2674f6f969a812 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 12 Mar 2017 18:53:52 +0000 Subject: [PATCH 07/10] Review comment is addressed. --- .../src/test/scala/org/apache/spark/scheduler/PoolSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 54c5dae7ce64c..cddff3dd35861 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -33,7 +33,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" val TEST_POOL = "testPool" - private def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) + def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) : TaskSetManager = { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(stageId, i, Nil) @@ -41,7 +41,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) } - private def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = { + def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = { val taskSetQueue = rootPool.getSortedTaskSetQueue val nextTaskSetToSchedule = taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) From 63ee0c868fef95d2f991b024a723dca212dee759 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 12 Mar 2017 20:07:18 +0000 Subject: [PATCH 08/10] Add additional logs to highlight pool creation to the user. --- .../org/apache/spark/scheduler/SchedulableBuilder.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index e53c4fb5b4778..524b1a178c375 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -191,7 +191,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format( + logInfo(("An Unconfigured pool is found. It is built with default configuration. " + + "This can happen when the file that pools are read from isn't set, or when that file " + + "doesn't contain the pool name specified by spark.scheduler.pool. " + + "Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d").format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } From f13ad3de7521ec6e65a9283aa6c6f37a903cead5 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Sun, 12 Mar 2017 20:14:42 +0000 Subject: [PATCH 09/10] Logging level is updated from info to warning. --- .../scala/org/apache/spark/scheduler/SchedulableBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 524b1a178c375..b9e451fcbade9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -191,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logInfo(("An Unconfigured pool is found. It is built with default configuration. " + + logWarning(("An Unconfigured pool is found. It is built with default configuration. " + "This can happen when the file that pools are read from isn't set, or when that file " + "doesn't contain the pool name specified by spark.scheduler.pool. " + "Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d").format( From b7f2629b554116c0a6c3d0674e717dc8c2bf77e3 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Wed, 15 Mar 2017 20:50:51 +0000 Subject: [PATCH 10/10] Warning message is updated. --- .../apache/spark/scheduler/SchedulableBuilder.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index b9e451fcbade9..20cedaf060420 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -191,11 +191,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logWarning(("An Unconfigured pool is found. It is built with default configuration. " + - "This can happen when the file that pools are read from isn't set, or when that file " + - "doesn't contain the pool name specified by spark.scheduler.pool. " + - "Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d").format( - poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + logWarning(s"A job was submitted with scheduler pool $poolName, which has not been " + + "configured. This can happen when the file that pools are read from isn't set, or " + + s"when that file doesn't contain $poolName. Created $poolName with default " + + s"configuration (schedulingMode: $DEFAULT_SCHEDULING_MODE, " + + s"minShare: $DEFAULT_MINIMUM_SHARE, weight: $DEFAULT_WEIGHT)") } } parentPool.addSchedulable(manager)