From f2b597022b4fc4023c238e5b5a9824946f84f84e Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Fri, 23 May 2014 22:02:57 +0800 Subject: [PATCH 01/11] bugfix worker DriverStateChanged state should match DriverState.FAILED --- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index fb9cc116cd08b..e7ff855010488 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -311,6 +311,8 @@ private[spark] class Worker( state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") + case DriverState.FAILED => + logWarning(s"Driver $driverId failed with state $state") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => From 480ce949a83c0d854078b38f5665f3369cf759eb Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Sat, 24 May 2014 23:24:37 +0800 Subject: [PATCH 02/11] address aarondav comments --- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index e7ff855010488..e3533bdf8e03e 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -312,11 +312,13 @@ private[spark] class Worker( case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FAILED => - logWarning(s"Driver $driverId failed with state $state") + logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") + case _ => + logDebug(s"Driver $driverId changed state to $state") } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) From 1e1e30eebe6eb97563dd50dfded25bea24ecda95 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 22 Jul 2014 20:57:15 +0800 Subject: [PATCH 03/11] use config spark.scheduler.priority for specifying TaskSet's priority on DAGScheduler --- .../apache/spark/scheduler/DAGScheduler.scala | 3 ++- .../scala/org/apache/spark/scheduler/Pool.scala | 1 + .../apache/spark/scheduler/Schedulable.scala | 1 + .../spark/scheduler/SchedulingAlgorithm.scala | 17 ++++++++++++++--- .../org/apache/spark/scheduler/TaskSet.scala | 1 + .../apache/spark/scheduler/TaskSetManager.scala | 1 + .../org/apache/spark/scheduler/FakeTask.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 8 files changed, 22 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ede3c7d9f01ae..bcc391b7682d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -778,8 +778,9 @@ class DAGScheduler( logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) + val priority:String = properties.getProperty("spark.scheduler.priority", "0") taskScheduler.submitTasks( - new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) + new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, priority.toInt, properties)) stageToInfos(stage).submissionTime = Some(clock.getTime()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 174b73221afc0..b62310b0a7568 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -46,6 +46,7 @@ private[spark] class Pool( // A pool's stage id is used to break the tie in scheduling. var stageId = -1 + var jobId = -1 var name = poolName var parent: Pool = null diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index a87ef030e69c2..6dc74352895a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -37,6 +37,7 @@ private[spark] trait Schedulable { def runningTasks: Int def priority: Int def stageId: Int + def jobId: Int def name: String def addSchedulable(schedulable: Schedulable): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 5e62c8468f007..5ba76ce3866b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -32,15 +32,26 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { - val stageId1 = s1.stageId - val stageId2 = s2.stageId - res = math.signum(stageId1 - stageId2) + val jobId1 = s1.jobId + val jobId2 = s2.jobId + res = math.signum(jobId1 - jobId2) + if (res == 0) { + val stageId1 = s1.stageId + val stageId2 = s2.stageId + res = math.signum(stageId1 - stageId2) + } + if (res < 0) { + true + } else { + false + } } if (res < 0) { true } else { false } + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index 613fa7850bb25..aff7fd77e1320 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -27,6 +27,7 @@ private[spark] class TaskSet( val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, + val jobId: Int, val priority: Int, val properties: Properties) { val id: String = stageId + "." + attempt diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8b5e8cb802a45..2a925fca5df32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -86,6 +86,7 @@ private[spark] class TaskSetManager( var minShare = 0 var priority = taskSet.priority var stageId = taskSet.stageId + var jobId = taskSet.jobId var name = "TaskSet_" + taskSet.stageId.toString var parent: Pool = null diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 0a7cb69416a08..4905103f8cf7d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -37,6 +37,6 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil) } - new TaskSet(tasks, 0, 0, 0, null) + new TaskSet(tasks, 0, 0, 0, 0, null) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 86b443b18f2a6..569edd280ab64 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -524,7 +524,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) + val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, 0, null) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) assert(!manager.emittedTaskSizeWarning) From 69da641ed1af96aafde0f8f20e8344d67c035af8 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Tue, 22 Jul 2014 21:23:41 +0800 Subject: [PATCH 04/11] fix file line length exceeds 100 --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bcc391b7682d1..40ed4e20b92cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -780,7 +780,8 @@ class DAGScheduler( logDebug("New pending tasks: " + myPending) val priority:String = properties.getProperty("spark.scheduler.priority", "0") taskScheduler.submitTasks( - new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, priority.toInt, properties)) + new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, priority.toInt, + properties)) stageToInfos(stage).submissionTime = Some(clock.getTime()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( From 21a9bcd63cb0d6722a0d5e9616c4a6865af26bfa Mon Sep 17 00:00:00 2001 From: Wang Lianhui Date: Tue, 22 Jul 2014 23:32:52 +0800 Subject: [PATCH 05/11] Fix bug --- .../scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 5ba76ce3866b7..21c3b5fe55379 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -46,12 +46,11 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { false } } - if (res < 0) { + if (res > 0) { true } else { false } - } } From 79b30eea676ec099a144064bf74a51b3b8fa2829 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 23 Jul 2014 11:23:53 +0800 Subject: [PATCH 06/11] address markhamstra comments --- .../spark/scheduler/SchedulingAlgorithm.scala | 27 +++---------------- .../org/apache/spark/scheduler/TaskSet.scala | 10 ++++++- .../org/apache/spark/scheduler/FakeTask.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 4 files changed, 14 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 5ba76ce3866b7..8ca018b70ce31 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.math.Ordering.Implicits._ + /** * An interface for sort algorithm * FIFO: FIFO algorithm between TaskSetManagers @@ -28,30 +30,7 @@ private[spark] trait SchedulingAlgorithm { private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { - val priority1 = s1.priority - val priority2 = s2.priority - var res = math.signum(priority1 - priority2) - if (res == 0) { - val jobId1 = s1.jobId - val jobId2 = s2.jobId - res = math.signum(jobId1 - jobId2) - if (res == 0) { - val stageId1 = s1.stageId - val stageId2 = s2.stageId - res = math.signum(stageId1 - stageId2) - } - if (res < 0) { - true - } else { - false - } - } - if (res < 0) { - true - } else { - false - } - + (s1.priority, s2.jobId, s2.stageId) > (s2.priority, s1.jobId, s1.stageId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index aff7fd77e1320..aa612ecbb85b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -28,9 +28,17 @@ private[spark] class TaskSet( val stageId: Int, val attempt: Int, val jobId: Int, - val priority: Int, val properties: Properties) { val id: String = stageId + "." + attempt + val DEFAULT_PRIORITY: Int = 0 + + def priority:Int = { + if(properties != null){ + properties.getProperty("spark.scheduler.priority", "0").toInt + }else{ + DEFAULT_PRIORITY + } + } def kill(interruptThread: Boolean) { tasks.foreach(_.kill(interruptThread)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 4905103f8cf7d..0a7cb69416a08 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -37,6 +37,6 @@ object FakeTask { val tasks = Array.tabulate[Task[_]](numTasks) { i => new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil) } - new TaskSet(tasks, 0, 0, 0, 0, null) + new TaskSet(tasks, 0, 0, 0, null) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 569edd280ab64..86b443b18f2a6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -524,7 +524,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, 0, null) + val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) assert(!manager.emittedTaskSizeWarning) From 94bc6e91a89f62238bbf5ffb99b80007e92a4b68 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 23 Jul 2014 11:45:59 +0800 Subject: [PATCH 07/11] DAGScheduler donot pass priority to TaskSet --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 40ed4e20b92cb..ede3c7d9f01ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -778,10 +778,8 @@ class DAGScheduler( logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) - val priority:String = properties.getProperty("spark.scheduler.priority", "0") taskScheduler.submitTasks( - new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, priority.toInt, - properties)) + new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stageToInfos(stage).submissionTime = Some(clock.getTime()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( From d2b087871b314d43656d7d02acc35d93a8b210cd Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 23 Jul 2014 12:51:56 +0800 Subject: [PATCH 08/11] address markhamstra comments --- .../scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala | 3 +-- core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 8ca018b70ce31..944023ccea00f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import scala.math.Ordering.Implicits._ - /** * An interface for sort algorithm * FIFO: FIFO algorithm between TaskSetManagers @@ -30,6 +28,7 @@ private[spark] trait SchedulingAlgorithm { private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { + import scala.math.Ordering.Implicits._ (s1.priority, s2.jobId, s2.stageId) > (s2.priority, s1.jobId, s1.stageId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index aa612ecbb85b4..d7563584cc065 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -32,7 +32,7 @@ private[spark] class TaskSet( val id: String = stageId + "." + attempt val DEFAULT_PRIORITY: Int = 0 - def priority:Int = { + val priority:Int = { if(properties != null){ properties.getProperty("spark.scheduler.priority", "0").toInt }else{ From c44df006443c55441c7b4a11192700207ba29075 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 23 Jul 2014 16:28:50 +0800 Subject: [PATCH 09/11] address markhamstra comments with droping extra code --- .../scala/org/apache/spark/scheduler/TaskSet.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index d7563584cc065..ffbe357c6d08b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -30,15 +30,10 @@ private[spark] class TaskSet( val jobId: Int, val properties: Properties) { val id: String = stageId + "." + attempt - val DEFAULT_PRIORITY: Int = 0 - val priority:Int = { - if(properties != null){ - properties.getProperty("spark.scheduler.priority", "0").toInt - }else{ - DEFAULT_PRIORITY - } - } + val priority = if(properties != null){ + properties.getProperty("spark.scheduler.priority", "0").toInt + } else 0 def kill(interruptThread: Boolean) { tasks.foreach(_.kill(interruptThread)) From 535f3ea103ce34dcbecc6c9e85714d7af8dbd9ce Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 23 Jul 2014 16:30:50 +0800 Subject: [PATCH 10/11] add space for code style --- core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index ffbe357c6d08b..b13cd85b2165a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -31,7 +31,7 @@ private[spark] class TaskSet( val properties: Properties) { val id: String = stageId + "." + attempt - val priority = if(properties != null){ + val priority = if(properties != null) { properties.getProperty("spark.scheduler.priority", "0").toInt } else 0 From d1eae883a448ca4ff0d347b4b925a80240e8ec78 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Wed, 23 Jul 2014 16:32:28 +0800 Subject: [PATCH 11/11] add space for code style --- core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index b13cd85b2165a..7bd41a2d21aec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -31,7 +31,7 @@ private[spark] class TaskSet( val properties: Properties) { val id: String = stageId + "." + attempt - val priority = if(properties != null) { + val priority = if (properties != null) { properties.getProperty("spark.scheduler.priority", "0").toInt } else 0