From a33ad583e92a40cb3b4b59d7fd57584a5e8678e3 Mon Sep 17 00:00:00 2001 From: trueyao <501663994@qq.com> Date: Tue, 15 Mar 2016 16:06:53 +0800 Subject: [PATCH 1/4] correct the logDebug information when jump to the next locality level --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f1339d530ad49..1f86fb2662c5c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -557,7 +557,7 @@ private[spark] class TaskSetManager( lastLaunchTime += localityWaits(currentLocalityIndex) currentLocalityIndex += 1 logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " + - s"${localityWaits(currentLocalityIndex)}ms") + s"${localityWaits(currentLocalityIndex - 1)}ms") } else { return myLocalityLevels(currentLocalityIndex) } From 0670a3a00ae4ec1d71726597a3c734bf0088cec3 Mon Sep 17 00:00:00 2001 From: trueyao <501663994@qq.com> Date: Tue, 15 Mar 2016 19:37:57 +0800 Subject: [PATCH 2/4] use previousLocalityIndex instead of currentLocalityIndex - 1 --- .../org/apache/spark/scheduler/TaskSetManager.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1f86fb2662c5c..4137f9d358f8f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -543,21 +543,22 @@ private[spark] class TaskSetManager( case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) } + val previousLocalityIndex = currentLocalityIndex if (!moreTasks) { // This is a performance optimization: if there are no more tasks that can // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). lastLaunchTime = curTime - logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + - s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") - currentLocalityIndex += 1 + currentLocalityIndex += 1 + logDebug(s"No tasks for locality level ${myLocalityLevels(previousLocalityIndex)}, " + + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex)}") } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { // Jump to the next locality level, and reset lastLaunchTime so that the next locality // wait timer doesn't immediately expire lastLaunchTime += localityWaits(currentLocalityIndex) currentLocalityIndex += 1 logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " + - s"${localityWaits(currentLocalityIndex - 1)}ms") + s"${localityWaits(previousLocalityIndex)}ms") } else { return myLocalityLevels(currentLocalityIndex) } From 9e86bed457dd1ef5ae7a39a12341f134559dc313 Mon Sep 17 00:00:00 2001 From: trueyao <501663994@qq.com> Date: Tue, 15 Mar 2016 19:43:19 +0800 Subject: [PATCH 3/4] remove extra whitespace --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4137f9d358f8f..15800b20486e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -549,7 +549,7 @@ private[spark] class TaskSetManager( // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). lastLaunchTime = curTime - currentLocalityIndex += 1 + currentLocalityIndex += 1 logDebug(s"No tasks for locality level ${myLocalityLevels(previousLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex)}") } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { From 2eb75cbf07106120e1119a45dbb32f00525ad03d Mon Sep 17 00:00:00 2001 From: trueyao <501663994@qq.com> Date: Tue, 15 Mar 2016 20:08:14 +0800 Subject: [PATCH 4/4] use currentLocalityIndex + 1 above --- .../org/apache/spark/scheduler/TaskSetManager.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 15800b20486e8..ffd5f5d20ae96 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -543,22 +543,21 @@ private[spark] class TaskSetManager( case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) } - val previousLocalityIndex = currentLocalityIndex if (!moreTasks) { // This is a performance optimization: if there are no more tasks that can // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). lastLaunchTime = curTime + logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") currentLocalityIndex += 1 - logDebug(s"No tasks for locality level ${myLocalityLevels(previousLocalityIndex)}, " + - s"so moving to locality level ${myLocalityLevels(currentLocalityIndex)}") } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { // Jump to the next locality level, and reset lastLaunchTime so that the next locality // wait timer doesn't immediately expire lastLaunchTime += localityWaits(currentLocalityIndex) + logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " + + s"${localityWaits(currentLocalityIndex)}ms") currentLocalityIndex += 1 - logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " + - s"${localityWaits(previousLocalityIndex)}ms") } else { return myLocalityLevels(currentLocalityIndex) }