From 09719a2b0e065f40d0dfe47c6b4342f0ad3a235c Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Mar 2017 12:09:40 +0800 Subject: [PATCH 01/10] [SPARK-16929] Improve performance when check speculatable tasks. --- .../spark/scheduler/TaskSetManager.scala | 21 ++- .../spark/util/collection/MedianHeap.scala | 130 ++++++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 2 + .../util/collection/MedianHeapSuite.scala | 103 ++++++++++++++ 4 files changed, 250 insertions(+), 6 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala create mode 100644 core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 11633bef3cfc7..e9280b61b1762 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,11 +19,10 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer -import java.util.Arrays import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.math.{max, min} +import scala.math.max import scala.util.control.NonFatal import org.apache.spark._ @@ -31,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.TaskState.TaskState import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} +import org.apache.spark.util.collection.MedianHeap /** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of @@ -63,6 +63,8 @@ private[spark] class TaskSetManager( // Limit of bytes for total size of results (default is 1GB) val maxResultSize = Utils.getMaxResultSize(conf) + val speculationEnabled = conf.getBoolean("spark.speculation", false) + // Serializer for closures and tasks. val env = SparkEnv.get val ser = env.closureSerializer.newInstance() @@ -141,6 +143,9 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] + // Use a MedianHeap to record durations of successful tasks when speculation is enabled. + val successfulTaskDurations = new MedianHeap() + // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = conf.getLong("spark.logging.exceptionPrintInterval", 10000) @@ -696,6 +701,9 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) val index = info.index info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) + if (speculationEnabled) { + successfulTaskDurations.insert(info.duration) + } removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not @@ -909,19 +917,20 @@ private[spark] class TaskSetManager( * */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { + // Can't speculate if we only have one task, and no need to speculate if the task set is a // zombie. - if (isZombie || numTasks == 1) { + + if (isZombie || numTasks == 1 || !speculationEnabled) { return false } var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) + if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray - Arrays.sort(durations) - val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) + var medianDuration = successfulTaskDurations.findMedian() val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. diff --git a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala new file mode 100644 index 0000000000000..95614111e5146 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable + +/** + * MedianHeap stores numbers and returns the median by O(1) time complexity. + * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The maxHeap stores + * the smaller half of all numbers while the minHeap stores the larger half. The sizes + * of two heaps need to be balanced each time when a new number is inserted so that their + * sizes will not be different by more than 1. Therefore each time when findMedian() is + * called we check if two heaps have the same size. If they do, we should return the + * average of the two top values of heaps. Otherwise we return the top of the heap which + * has one more element. + */ + +private[spark] +class MedianHeap(implicit val ord: Ordering[Double]) { + + // Stores all the numbers less than the current median in a maxHeap, + // i.e median is the maximum, at the root + private[this] var maxHeap = mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]]) + + // Stores all the numbers greater than the current median in a minHeap, + // i.e median is the minimum, at the root + private[this] var minHeap = mutable.PriorityQueue.empty[Double]( + implicitly[Ordering[Double]].reverse) + + // Returns if there is no element in MedianHeap. + def isEmpty(): Boolean = { + maxHeap.isEmpty && minHeap.isEmpty + } + + // Size of MedianHeap. + def size(): Int = { + maxHeap.size + minHeap.size + } + + // Insert a new number into MedianHeap. + def insert(x: Double): Unit = { + // If both heaps are empty, we arbitrarily insert it into a heap, let's say, the minHeap. + if (isEmpty) { + minHeap.enqueue(x) + } else { + // If the number is larger than current median, it should be inserted into minHeap, + // otherwise maxHeap. + if (x > findMedian) { + minHeap.enqueue(x) + } else { + maxHeap.enqueue(x) + } + } + rebalance() + } + + // Remove an number from MedianHeap, return if the number exists. + def remove(x: Double): Boolean = { + if (maxHeap.size != 0 && minHeap.size == 0 || + maxHeap.size != 0 && minHeap.size != 0 && maxHeap.head >= x) { + var dropped = false + maxHeap = maxHeap.filterNot { + case num => + if (!dropped && num == x) { + dropped = true + true + } else { + false + } + } + rebalance() + dropped + } else if (maxHeap.size == 0 && minHeap.size != 0 || + maxHeap.size != 0 && minHeap.size != 0 && minHeap.head <= x) { + var dropped = false + minHeap = minHeap.filterNot { + case num => + if (!dropped && num == x) { + dropped = true + true + } else { + false + } + } + rebalance() + dropped + } else { + false + } + } + + // Re-balance the heaps. + private[this] def rebalance(): Unit = { + if (minHeap.size - maxHeap.size > 1) { + maxHeap.enqueue(minHeap.dequeue()) + } + if (maxHeap.size - minHeap.size > 1) { + minHeap.enqueue(maxHeap.dequeue) + } + } + + // Returns the median of the numbers. + def findMedian(): Double = { + if (isEmpty) { + throw new NoSuchElementException("MedianHeap is empty.") + } + if (minHeap.size == maxHeap.size) { + (minHeap.head + maxHeap.head) / 2.0 + } else if (minHeap.size > maxHeap.size) { + minHeap.head + } else { + maxHeap.head + } + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f36bcd8504b05..064af381a76d2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set("spark.speculation", "true") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => @@ -948,6 +949,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") sc.conf.set("spark.speculation.quantile", "0.6") + sc.conf.set("spark.speculation", "true") val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => diff --git a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala new file mode 100644 index 0000000000000..82aff29b9b0b6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { + val medianHeap = new MedianHeap() + var valid = false + try { + medianHeap.findMedian() + } catch { + case e: NoSuchElementException => + valid = true + } + + assert(valid) + } + + test("Median should be correct when size of MedianHeap is even or odd") { + val random = new Random() + val medianHeap1 = new MedianHeap() + val array1 = new Array[Int](100) + (0 until 100).foreach { + case i => + val randomNumber = random.nextInt(1000) + medianHeap1.insert(randomNumber) + array1(i) += randomNumber + } + util.Arrays.sort(array1) + assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0)) + + val medianHeap2 = new MedianHeap() + val array2 = new Array[Int](101) + (0 until 101).foreach { + case i => + val randomNumber = random.nextInt(1000) + medianHeap2.insert(randomNumber) + array2(i) += randomNumber + } + util.Arrays.sort(array2) + assert(medianHeap2.findMedian() === array2(50)) + } + + test("Size of Median should be correct though there are duplicated numbers inside.") { + val random = new Random() + val medianHeap = new MedianHeap() + val array = new Array[Int](1000) + (0 until 1000).foreach { + case i => + val randomNumber = random.nextInt(100) + medianHeap.insert(randomNumber) + array(i) += randomNumber + } + util.Arrays.sort(array) + assert(medianHeap.size === 1000) + assert(medianHeap.findMedian() === ((array(499) + array(500)) / 2.0)) + } + + test("Remove a number from MedianHeap.") { + val random = new Random() + val medianHeap = new MedianHeap() + val array = new Array[Int](99) + var lastNumber = 0 + (0 until 100).foreach { + case i => + val randomNumber = random.nextInt(100) + medianHeap.insert(randomNumber) + if (i != 99) { + array(i) += randomNumber + } else { + lastNumber = randomNumber + } + } + util.Arrays.sort(array) + assert(medianHeap.remove(lastNumber) === true) + assert(medianHeap.findMedian() === array(49)) + + } +} From 318a172130bd84c0f36494f839a87b86c6750f66 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 14 Mar 2017 09:02:29 +0800 Subject: [PATCH 02/10] Get rid of 'remove' and fix doc in MedianHeap --- .../spark/scheduler/TaskSetManager.scala | 2 - .../spark/util/collection/MedianHeap.scala | 45 ++----------------- .../util/collection/MedianHeapSuite.scala | 21 --------- 3 files changed, 4 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e9280b61b1762..3042715accc21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -917,10 +917,8 @@ private[spark] class TaskSetManager( * */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { - // Can't speculate if we only have one task, and no need to speculate if the task set is a // zombie. - if (isZombie || numTasks == 1 || !speculationEnabled) { return false } diff --git a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala index 95614111e5146..565d67b6cde0c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala @@ -20,7 +20,7 @@ package org.apache.spark.util.collection import scala.collection.mutable /** - * MedianHeap stores numbers and returns the median by O(1) time complexity. + * MedianHeap inserts number by O(log n) and returns the median by O(1) time complexity. * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The maxHeap stores * the smaller half of all numbers while the minHeap stores the larger half. The sizes * of two heaps need to be balanced each time when a new number is inserted so that their @@ -30,17 +30,15 @@ import scala.collection.mutable * has one more element. */ -private[spark] -class MedianHeap(implicit val ord: Ordering[Double]) { +private[spark] class MedianHeap(implicit val ord: Ordering[Double]) { // Stores all the numbers less than the current median in a maxHeap, // i.e median is the maximum, at the root - private[this] var maxHeap = mutable.PriorityQueue.empty[Double](implicitly[Ordering[Double]]) + private[this] var maxHeap = mutable.PriorityQueue.empty[Double](ord) // Stores all the numbers greater than the current median in a minHeap, // i.e median is the minimum, at the root - private[this] var minHeap = mutable.PriorityQueue.empty[Double]( - implicitly[Ordering[Double]].reverse) + private[this] var minHeap = mutable.PriorityQueue.empty[Double](ord.reverse) // Returns if there is no element in MedianHeap. def isEmpty(): Boolean = { @@ -69,41 +67,6 @@ class MedianHeap(implicit val ord: Ordering[Double]) { rebalance() } - // Remove an number from MedianHeap, return if the number exists. - def remove(x: Double): Boolean = { - if (maxHeap.size != 0 && minHeap.size == 0 || - maxHeap.size != 0 && minHeap.size != 0 && maxHeap.head >= x) { - var dropped = false - maxHeap = maxHeap.filterNot { - case num => - if (!dropped && num == x) { - dropped = true - true - } else { - false - } - } - rebalance() - dropped - } else if (maxHeap.size == 0 && minHeap.size != 0 || - maxHeap.size != 0 && minHeap.size != 0 && minHeap.head <= x) { - var dropped = false - minHeap = minHeap.filterNot { - case num => - if (!dropped && num == x) { - dropped = true - true - } else { - false - } - } - rebalance() - dropped - } else { - false - } - } - // Re-balance the heaps. private[this] def rebalance(): Unit = { if (minHeap.size - maxHeap.size > 1) { diff --git a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala index 82aff29b9b0b6..c1ec29159b321 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala @@ -79,25 +79,4 @@ class MedianHeapSuite extends SparkFunSuite { assert(medianHeap.size === 1000) assert(medianHeap.findMedian() === ((array(499) + array(500)) / 2.0)) } - - test("Remove a number from MedianHeap.") { - val random = new Random() - val medianHeap = new MedianHeap() - val array = new Array[Int](99) - var lastNumber = 0 - (0 until 100).foreach { - case i => - val randomNumber = random.nextInt(100) - medianHeap.insert(randomNumber) - if (i != 99) { - array(i) += randomNumber - } else { - lastNumber = randomNumber - } - } - util.Arrays.sort(array) - assert(medianHeap.remove(lastNumber) === true) - assert(medianHeap.findMedian() === array(49)) - - } } From 5aa2fcf8c244e4503302053a98ef12c7d5c80878 Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 14 Mar 2017 09:08:05 +0800 Subject: [PATCH 03/10] scheduleAtFixedRate -> scheduleWithFixedDelay --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bfbcfa1aa386f..70519d3f647bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") - speculationScheduler.scheduleAtFixedRate(new Runnable { + speculationScheduler.scheduleWithFixedDelay(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } From 17288957da6ca0887b6f0b02d6bb257144d92990 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 16 Mar 2017 11:20:14 +0800 Subject: [PATCH 04/10] Change some comment and unit tests. --- .../spark/scheduler/TaskSetManager.scala | 8 ++- .../spark/util/collection/MedianHeap.scala | 66 ++++++++++--------- .../util/collection/MedianHeapSuite.scala | 55 ++++++---------- 3 files changed, 59 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3042715accc21..90fe076ae36af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -143,7 +143,9 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] - // Use a MedianHeap to record durations of successful tasks when speculation is enabled. + // Use a MedianHeap to record durations of successful tasks so we know when to launch + // speculative tasks. This is only used when speculation is enabled, to avoid the overhead + // of inserting into the heap when the heap won't be used. val successfulTaskDurations = new MedianHeap() // How frequently to reprint duplicate exceptions in full, in milliseconds @@ -919,7 +921,7 @@ private[spark] class TaskSetManager( override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a // zombie. - if (isZombie || numTasks == 1 || !speculationEnabled) { + if (isZombie || numTasks == 1) { return false } var foundTasks = false @@ -928,7 +930,7 @@ private[spark] class TaskSetManager( if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - var medianDuration = successfulTaskDurations.findMedian() + var medianDuration = successfulTaskDurations.median val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. diff --git a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala index 565d67b6cde0c..44a3e76fbce5d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala @@ -17,51 +17,53 @@ package org.apache.spark.util.collection -import scala.collection.mutable +import scala.collection.mutable.PriorityQueue /** - * MedianHeap inserts number by O(log n) and returns the median by O(1) time complexity. - * The basic idea is to maintain two heaps: a maxHeap and a minHeap. The maxHeap stores - * the smaller half of all numbers while the minHeap stores the larger half. The sizes - * of two heaps need to be balanced each time when a new number is inserted so that their - * sizes will not be different by more than 1. Therefore each time when findMedian() is - * called we check if two heaps have the same size. If they do, we should return the - * average of the two top values of heaps. Otherwise we return the top of the heap which - * has one more element. + * MedianHeap is designed to be used to quickly track the median of a group of numbers + * that may contain duplicates. Inserting a new number has O(log n) time complexity and + * determining the median has O(1) time complexity. + * The basic idea is to maintain two heaps: a smallerHalf and a largerHalf. The smallerHalf + * stores the smaller half of all numbers while the largerHalf stores the larger half. + * The sizes of two heaps need to be balanced each time when a new number is inserted so + * that their sizes will not be different by more than 1. Therefore each time when + * findMedian() is called we check if two heaps have the same size. If they do, we should + * return the average of the two top values of heaps. Otherwise we return the top of the + * heap which has one more element. */ private[spark] class MedianHeap(implicit val ord: Ordering[Double]) { - // Stores all the numbers less than the current median in a maxHeap, + // Stores all the numbers less than the current median in a smallerHalf, // i.e median is the maximum, at the root - private[this] var maxHeap = mutable.PriorityQueue.empty[Double](ord) + private[this] var smallerHalf = PriorityQueue.empty[Double](ord) - // Stores all the numbers greater than the current median in a minHeap, + // Stores all the numbers greater than the current median in a largerHalf, // i.e median is the minimum, at the root - private[this] var minHeap = mutable.PriorityQueue.empty[Double](ord.reverse) + private[this] var largerHalf = PriorityQueue.empty[Double](ord.reverse) // Returns if there is no element in MedianHeap. def isEmpty(): Boolean = { - maxHeap.isEmpty && minHeap.isEmpty + smallerHalf.isEmpty && largerHalf.isEmpty } // Size of MedianHeap. def size(): Int = { - maxHeap.size + minHeap.size + smallerHalf.size + largerHalf.size } // Insert a new number into MedianHeap. def insert(x: Double): Unit = { - // If both heaps are empty, we arbitrarily insert it into a heap, let's say, the minHeap. + // If both heaps are empty, we arbitrarily insert it into a heap, let's say, the largerHalf. if (isEmpty) { - minHeap.enqueue(x) + largerHalf.enqueue(x) } else { - // If the number is larger than current median, it should be inserted into minHeap, - // otherwise maxHeap. - if (x > findMedian) { - minHeap.enqueue(x) + // If the number is larger than current median, it should be inserted into largerHalf, + // otherwise smallerHalf. + if (x > median) { + largerHalf.enqueue(x) } else { - maxHeap.enqueue(x) + smallerHalf.enqueue(x) } } rebalance() @@ -69,25 +71,25 @@ private[spark] class MedianHeap(implicit val ord: Ordering[Double]) { // Re-balance the heaps. private[this] def rebalance(): Unit = { - if (minHeap.size - maxHeap.size > 1) { - maxHeap.enqueue(minHeap.dequeue()) + if (largerHalf.size - smallerHalf.size > 1) { + smallerHalf.enqueue(largerHalf.dequeue()) } - if (maxHeap.size - minHeap.size > 1) { - minHeap.enqueue(maxHeap.dequeue) + if (smallerHalf.size - largerHalf.size > 1) { + largerHalf.enqueue(smallerHalf.dequeue) } } // Returns the median of the numbers. - def findMedian(): Double = { + def median: Double = { if (isEmpty) { throw new NoSuchElementException("MedianHeap is empty.") } - if (minHeap.size == maxHeap.size) { - (minHeap.head + maxHeap.head) / 2.0 - } else if (minHeap.size > maxHeap.size) { - minHeap.head + if (largerHalf.size == smallerHalf.size) { + (largerHalf.head + smallerHalf.head) / 2.0 + } else if (largerHalf.size > smallerHalf.size) { + largerHalf.head } else { - maxHeap.head + smallerHalf.head } } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala index c1ec29159b321..de13c575db979 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import java.util +import java.util.Arrays import java.util.NoSuchElementException import scala.collection.mutable.ArrayBuffer @@ -31,7 +31,7 @@ class MedianHeapSuite extends SparkFunSuite { val medianHeap = new MedianHeap() var valid = false try { - medianHeap.findMedian() + medianHeap.median } catch { case e: NoSuchElementException => valid = true @@ -40,43 +40,28 @@ class MedianHeapSuite extends SparkFunSuite { assert(valid) } - test("Median should be correct when size of MedianHeap is even or odd") { - val random = new Random() - val medianHeap1 = new MedianHeap() - val array1 = new Array[Int](100) - (0 until 100).foreach { - case i => - val randomNumber = random.nextInt(1000) - medianHeap1.insert(randomNumber) - array1(i) += randomNumber - } - util.Arrays.sort(array1) - assert(medianHeap1.findMedian() === ((array1(49) + array1(50)) / 2.0)) + test("Median should be correct when size of MedianHeap is even") { + val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + val medianHeap = new MedianHeap() + array.foreach(medianHeap.insert(_)) + assert(medianHeap.size() === 10) + assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } - val medianHeap2 = new MedianHeap() - val array2 = new Array[Int](101) - (0 until 101).foreach { - case i => - val randomNumber = random.nextInt(1000) - medianHeap2.insert(randomNumber) - array2(i) += randomNumber - } - util.Arrays.sort(array2) - assert(medianHeap2.findMedian() === array2(50)) + test("Median should be correct when size of MedianHeap is odd") { + val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8) + val medianHeap = new MedianHeap() + array.foreach(medianHeap.insert(_)) + assert(medianHeap.size() === 9) + assert(medianHeap.median === (array(4))) } test("Size of Median should be correct though there are duplicated numbers inside.") { - val random = new Random() + val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) val medianHeap = new MedianHeap() - val array = new Array[Int](1000) - (0 until 1000).foreach { - case i => - val randomNumber = random.nextInt(100) - medianHeap.insert(randomNumber) - array(i) += randomNumber - } - util.Arrays.sort(array) - assert(medianHeap.size === 1000) - assert(medianHeap.findMedian() === ((array(499) + array(500)) / 2.0)) + array.foreach(medianHeap.insert(_)) + Arrays.sort(array) + assert(medianHeap.size === 10) + assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) } } From 2518a9553961d0ddfd950ab9e2218753f79fda4e Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 16 Mar 2017 13:01:12 +0800 Subject: [PATCH 05/10] Change back to scheduleAtFixedRate --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 70519d3f647bd..bfbcfa1aa386f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") - speculationScheduler.scheduleWithFixedDelay(new Runnable { + speculationScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } From 7740d7762b58e9dcd7c19d8eda7c6c58d7da1378 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 16 Mar 2017 22:47:31 +0800 Subject: [PATCH 06/10] scheduleAtFixedRate -> scheduleWithFixedDelay --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bfbcfa1aa386f..70519d3f647bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -172,7 +172,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") - speculationScheduler.scheduleAtFixedRate(new Runnable { + speculationScheduler.scheduleWithFixedDelay(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } From c13a198549fc7742b08a62b4e0e9769bf7bebd3c Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 16 Mar 2017 23:39:42 +0800 Subject: [PATCH 07/10] Refine test. --- .../util/collection/MedianHeapSuite.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala index de13c575db979..11f0e58ea39f0 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala @@ -29,15 +29,9 @@ class MedianHeapSuite extends SparkFunSuite { test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { val medianHeap = new MedianHeap() - var valid = false - try { + intercept[NoSuchElementException] { medianHeap.median - } catch { - case e: NoSuchElementException => - valid = true } - - assert(valid) } test("Median should be correct when size of MedianHeap is even") { @@ -56,7 +50,7 @@ class MedianHeapSuite extends SparkFunSuite { assert(medianHeap.median === (array(4))) } - test("Size of Median should be correct though there are duplicated numbers inside.") { + test("Median should be correct though there are duplicated numbers inside.") { val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) val medianHeap = new MedianHeap() array.foreach(medianHeap.insert(_)) @@ -64,4 +58,14 @@ class MedianHeapSuite extends SparkFunSuite { assert(medianHeap.size === 10) assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) } + + test("Median should be correct when skew situations.") { + val medianHeap = new MedianHeap() + (0 until 10).foreach(_ => medianHeap.insert(5)) + assert(medianHeap.median === 5) + (0 until 100).foreach(_ => medianHeap.insert(10)) + assert(medianHeap.median === 10) + (0 until 1000).foreach(_ => medianHeap.insert(0)) + assert(medianHeap.median === 0) + } } From 617d5aafc5e6273edb9e5545cfd0dbd5d404f884 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 16 Mar 2017 23:48:12 +0800 Subject: [PATCH 08/10] mod --- .../scala/org/apache/spark/util/collection/MedianHeapSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala index 11f0e58ea39f0..9d1759da3bea7 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala @@ -54,7 +54,6 @@ class MedianHeapSuite extends SparkFunSuite { val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) val medianHeap = new MedianHeap() array.foreach(medianHeap.insert(_)) - Arrays.sort(array) assert(medianHeap.size === 10) assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) } From 9d627c4be8c3b45628de141d394521cc274840c4 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 1 Mar 2017 10:50:19 +0800 Subject: [PATCH 09/10] Measurement for SPARK-16929. --- .../spark/scheduler/TaskSetManager.scala | 54 ++++++++++++++++--- .../spark/scheduler/TaskSetManagerSuite.scala | 28 +++++++++- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513bea057..b749575f09b18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -21,6 +21,7 @@ import java.io.NotSerializableException import java.nio.ByteBuffer import java.util.Arrays import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.math.{max, min} @@ -52,7 +53,8 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, blacklistTracker: Option[BlacklistTracker] = None, - clock: Clock = new SystemClock()) extends Schedulable with Logging { + clock: Clock = new SystemClock(), + newAlgorithm: Boolean = false) extends Schedulable with Logging { private val conf = sched.sc.conf @@ -141,6 +143,19 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] + val successfulTaskIdsSet = new scala.collection.mutable.TreeSet[Long] { + override val ordering: Ordering[Long] = new Ordering[Long] { + override def compare(tid0: Long, tid1: Long): Int = { + ((taskInfos(tid0).duration - taskInfos(tid1).duration).toInt, tid0 - tid1) match { + case (0, 0) => 0 + case (diffDuration, diffTid) + if diffDuration > 0 || (diffDuration == 0 && diffTid > 0) => 1 + case _ => -1 + } + } + } + }.empty + // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = conf.getLong("spark.logging.exceptionPrintInterval", 10000) @@ -692,10 +707,21 @@ private[spark] class TaskSetManager( /** * Marks a task as successful and notifies the DAGScheduler that the task has ended. */ - def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { + val handleSuccessfulTasksCost = new AtomicLong(0L) + + def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_], finishTime: Long = 0L): Unit = { val info = taskInfos(tid) val index = info.index - info.markFinished(TaskState.FINISHED) + if (finishTime != 0L) { + info.markFinished(TaskState.FINISHED, finishTime) + } else { + info.markFinished(TaskState.FINISHED) + } + val start = System.currentTimeMillis() + successfulTaskIdsSet += tid + val end = System.currentTimeMillis() + handleSuccessfulTasksCost.addAndGet(end - start) + removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not @@ -740,6 +766,7 @@ private[spark] class TaskSetManager( } removeRunningTask(tid) info.markFinished(state) + successfulTaskIdsSet -= tid val index = info.index copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty @@ -918,11 +945,24 @@ private[spark] class TaskSetManager( var foundTasks = false val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) - if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { + val successfulTaskIdsSize = successfulTaskIdsSet.size + if (successfulTaskIdsSize >= minFinishedForSpeculation && successfulTaskIdsSize > 0) { val time = clock.getTimeMillis() - val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray - Arrays.sort(durations) - val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) + var medianDuration: Long = 0L + + val startTime = System.currentTimeMillis() + if (newAlgorithm) { + medianDuration = taskInfos(successfulTaskIdsSet.slice( + successfulTaskIdsSize / 2, successfulTaskIdsSize / 2 + 1).head).duration + } else { + val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray + Arrays.sort(durations) + val medianDuration = + durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) + } + val endTime = System.currentTimeMillis() + println(s"Time cost: ${endTime - startTime}") + val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d03a0c990a02b..ae6dcfc02063f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -177,7 +177,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } } - test("TaskSet with no preferences") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) @@ -872,6 +871,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(sched.endedTasks(3) === Success) } + test("Killing speculative tasks does not count towards aborting the taskset") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) @@ -1039,6 +1039,32 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg .updateBlacklistForFailedTask(anyString(), anyString(), anyInt()) } + test("Measurement for SPARK-16929.") { + val tasksNum = 100000 + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc) + val taskSet = FakeTask.createTaskSet(tasksNum) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock, + newAlgorithm = true) + + for(i <- 0 until tasksNum) { + manager.resourceOffer("exec", "host", NO_PREF) + } + + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + val random = new Random() + for (id <- 0 until tasksNum - 1) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)), + random.nextInt(3600000)) + assert(sched.endedTasks(id) === Success) + } + println("handleSuccessfulTasks cost: " + manager.handleSuccessfulTasksCost.get()) + manager.checkSpeculatableTasks(600000) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { From cfc7e331356b2296a66273e8d059635ba1c7991b Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 18 Mar 2017 10:08:15 +0800 Subject: [PATCH 10/10] update --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 533dae6433b08..7589da0c3242d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -703,7 +703,7 @@ private[spark] class TaskSetManager( */ val handleSuccessfulTasksCost = new AtomicLong(0L) - def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_], finishTime: Long = 0L): Unit = { + def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f9525716ce358..44bcbd50e4301 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1130,8 +1130,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } val random = new Random() for (id <- 0 until tasksNum - 1) { - manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)), - random.nextInt(3600000)) + clock.setTime(random.nextInt(360000) + 1) + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) assert(sched.endedTasks(id) === Success) } // scalastyle:off