From 8be6c863097d4eef0ac1b03b94165b2e61f1df7d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Jan 2016 14:31:09 -0800 Subject: [PATCH 1/6] Fix indentations, visibility, deprecation etc. --- .../scala/org/apache/spark/Accumulable.scala | 2 +- .../scala/org/apache/spark/Accumulator.scala | 8 ++++---- .../scala/org/apache/spark/TaskEndReason.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 16 ++++++++-------- .../apache/spark/executor/TaskMetrics.scala | 18 ++++++++++++++++-- .../org/apache/spark/ui/jobs/StagePage.scala | 6 +++--- .../org/apache/spark/util/JsonProtocol.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 8 files changed, 36 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index bde136141f40d..dbec12b60b902 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -57,7 +57,7 @@ import org.apache.spark.util.Utils */ class Accumulable[R, T] private ( val id: Long, - @transient initialValue: R, + @transient private val initialValue: R, param: AccumulableParam[R, T], val name: Option[String], internal: Boolean, diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index 558bd447e22c5..725fb702abf4e 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -60,19 +60,19 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * @tparam T result type */ class Accumulator[T] private[spark] ( - @transient private[spark] val initialValue: T, + @transient private val initialValue: T, param: AccumulatorParam[T], name: Option[String], internal: Boolean, - override val countFailedValues: Boolean = false) + private[spark] override val countFailedValues: Boolean = false) extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) { def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { - this(initialValue, param, name, false) + this(initialValue, param, name, false /* internal */) } def this(initialValue: T, param: AccumulatorParam[T]) = { - this(initialValue, param, None, false) + this(initialValue, param, None, false /* internal */) } } diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 68340cc704dae..c8f201ea9e4d5 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -118,7 +118,7 @@ case class ExceptionFailure( description: String, stackTrace: Array[StackTraceElement], fullStackTrace: String, - exceptionWrapper: Option[ThrowableSerializationWrapper], + private val exceptionWrapper: Option[ThrowableSerializationWrapper], accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]) extends TaskFailedReason { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 51c000ea5c574..00be3a240dbac 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -300,15 +300,15 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accumulatorUpdates: Seq[AccumulableInfo] = - if (task != null) { - task.metrics.foreach { m => - m.setExecutorRunTime(System.currentTimeMillis() - taskStart) - m.setJvmGCTime(computeTotalGcTime() - startGCTime) + if (task != null) { + task.metrics.foreach { m => + m.setExecutorRunTime(System.currentTimeMillis() - taskStart) + m.setJvmGCTime(computeTotalGcTime() - startGCTime) + } + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty[AccumulableInfo] } - task.collectAccumulatorUpdates(taskFailed = true) - } else { - Seq.empty[AccumulableInfo] - } val serializedTaskEndReason = { try { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 8d10bf588ef1f..b15ef9c9a9cfb 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -45,8 +45,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * these requirements. */ @DeveloperApi -class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable { - +class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable { import InternalAccumulator._ // Needed for Java tests @@ -144,6 +143,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable { if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None } + @deprecated("setting updated blocks is not allowed", "2.0.0") + def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = { + blocks.foreach(setUpdatedBlockStatuses) + } + // Setters and increment-ers private[spark] def setExecutorDeserializeTime(v: Long): Unit = _executorDeserializeTime.setValue(v) @@ -220,6 +224,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable { */ def outputMetrics: Option[OutputMetrics] = _outputMetrics + @deprecated("setting OutputMetrics is for internal use only", "2.0.0") + def outputMetrics_=(om: Option[OutputMetrics]): Unit = { + _outputMetrics = om + } + /** * Get or create a new [[OutputMetrics]] associated with this task. */ @@ -296,6 +305,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable { */ def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics + @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0") + def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = { + _shuffleWriteMetrics = swm + } + /** * Get or create a new [[ShuffleWriteMetrics]] associated with this task. */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 29c5ff0b5cf0b..0b68b88566b70 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -408,9 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(gettingResultTimes) - val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.peakExecutionMemory.toDouble - } + val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.peakExecutionMemory.toDouble + } val peakExecutionMemoryQuantiles = { Utils.getFormattedClassName(metricsUpdate)) ~ ("Executor ID" -> execId) ~ - ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => + ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) ~ diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 57021d1d3d528..1345881a2aea3 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -505,7 +505,7 @@ private[spark] object JsonProtocolSuite extends Assertions { /** -------------------------------- * | Util methods for comparing events | - * --------------------------------- */ + * --------------------------------- */ private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) { (event1, event2) match { @@ -770,7 +770,7 @@ private[spark] object JsonProtocolSuite extends Assertions { /** ----------------------------------- * | Util methods for constructing events | - * ------------------------------------ */ + * ------------------------------------ */ private val properties = { val p = new Properties From 540425450ea0e5376d99f6ccb43857b74f34204e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Jan 2016 14:37:23 -0800 Subject: [PATCH 2/6] create -> createAll --- .../apache/spark/InternalAccumulator.scala | 4 +-- .../org/apache/spark/TaskContextImpl.scala | 2 +- .../apache/spark/executor/TaskMetrics.scala | 2 +- .../apache/spark/scheduler/ResultTask.scala | 2 +- .../org/apache/spark/AccumulatorSuite.scala | 2 +- .../spark/InternalAccumulatorSuite.scala | 6 ++-- .../spark/executor/TaskMetricsSuite.scala | 30 +++++++++---------- .../spark/scheduler/TaskContextSuite.scala | 2 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- 9 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index c191122c0630a..7aa9057858a04 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -119,7 +119,7 @@ private[spark] object InternalAccumulator { /** * Accumulators for tracking internal metrics. */ - def create(): Seq[Accumulator[_]] = { + def createAll(): Seq[Accumulator[_]] = { Seq[String]( EXECUTOR_DESERIALIZE_TIME, EXECUTOR_RUN_TIME, @@ -188,7 +188,7 @@ private[spark] object InternalAccumulator { * values across all tasks within each stage. */ def create(sc: SparkContext): Seq[Accumulator[_]] = { - val accums = create() + val accums = createAll() accums.foreach { accum => Accumulators.register(accum) sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 27ca46f73d8ca..1d228b6b86c55 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -32,7 +32,7 @@ private[spark] class TaskContextImpl( override val attemptNumber: Int, override val taskMemoryManager: TaskMemoryManager, @transient private val metricsSystem: MetricsSystem, - initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create()) + initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll()) extends TaskContext with Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index b15ef9c9a9cfb..822bad9f8f52f 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -50,7 +50,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se // Needed for Java tests def this() { - this(InternalAccumulator.create()) + this(InternalAccumulator.createAll()) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 885f70e89fbf5..cd2736e1960c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U]( partition: Partition, locs: Seq[TaskLocation], val outputId: Int, - _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create()) + _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll()) extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums) with Serializable { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 11c97d7d9a447..193c0a2479da6 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false) val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false) val externalAccums = Seq(acc1, acc2) - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() // Set some values; these should not be observed later on the "executors" acc1.setValue(10) acc2.setValue(20L) diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 630b46f828df7..fb779af87afb7 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -86,7 +86,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } test("create") { - val accums = create() + val accums = createAll() val shuffleReadAccums = createShuffleReadAccums() val shuffleWriteAccums = createShuffleWriteAccums() val inputAccums = createInputAccums() @@ -122,7 +122,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } test("naming") { - val accums = create() + val accums = createAll() val shuffleReadAccums = createShuffleReadAccums() val shuffleWriteAccums = createShuffleWriteAccums() val inputAccums = createInputAccums() @@ -236,7 +236,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } assert(Accumulators.originals.isEmpty) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage assert(Accumulators.originals.size === internalAccums.size * 2) val accumsRegistered = sc.cleaner match { diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 15be0b194ed8e..6ee426e1c9a5f 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -31,7 +31,7 @@ class TaskMetricsSuite extends SparkFunSuite { import TaskMetricsSuite._ test("create") { - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() val tm1 = new TaskMetrics val tm2 = new TaskMetrics(internalAccums) assert(tm1.accumulatorUpdates().size === internalAccums.size) @@ -51,7 +51,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("create with unnamed accum") { intercept[IllegalArgumentException] { new TaskMetrics( - InternalAccumulator.create() ++ Seq( + InternalAccumulator.createAll() ++ Seq( new Accumulator(0, IntAccumulatorParam, None, internal = true))) } } @@ -59,7 +59,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("create with duplicate name accum") { intercept[IllegalArgumentException] { new TaskMetrics( - InternalAccumulator.create() ++ Seq( + InternalAccumulator.createAll() ++ Seq( new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true))) } } @@ -67,7 +67,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("create with external accum") { intercept[IllegalArgumentException] { new TaskMetrics( - InternalAccumulator.create() ++ Seq( + InternalAccumulator.createAll() ++ Seq( new Accumulator(0, IntAccumulatorParam, Some("x")))) } } @@ -131,7 +131,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("mutating values") { - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) // initial values assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L) @@ -180,7 +180,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating shuffle read metrics values") { import shuffleRead._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = { assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value) @@ -234,7 +234,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating shuffle write metrics values") { import shuffleWrite._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = { assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value) @@ -267,7 +267,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating input metrics values") { import input._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = { assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value, @@ -296,7 +296,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating output metrics values") { import output._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = { assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value, @@ -381,7 +381,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("additional accumulables") { - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() val tm = new TaskMetrics(internalAccums) assert(tm.accumulatorUpdates().size === internalAccums.size) val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a")) @@ -419,7 +419,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in shuffle read accums") { // set shuffle read accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME)) assert(srAccum.isDefined) srAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -432,7 +432,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in shuffle write accums") { // set shuffle write accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN)) assert(swAccum.isDefined) swAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -445,7 +445,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in input accums") { // set input accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val inAccum = accums.find(_.name === Some(input.RECORDS_READ)) assert(inAccum.isDefined) inAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -458,7 +458,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in output accums") { // set output accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN)) assert(outAccum.isDefined) outAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -470,7 +470,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("from accumulator updates") { - val accumUpdates1 = InternalAccumulator.create().map { a => + val accumUpdates1 = InternalAccumulator.createAll().map { a => AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues) } val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index b3bb86db10a32..850e470ca14d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -127,7 +127,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val param = AccumulatorParam.LongAccumulatorParam val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true) val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false) - val initialAccums = InternalAccumulator.create() + val initialAccums = InternalAccumulator.createAll() // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) { diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 18a16a25bfac5..9876bded33a08 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val execId = "exe-1" def makeTaskMetrics(base: Int): TaskMetrics = { - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() accums.foreach(Accumulators.register) val taskMetrics = new TaskMetrics(accums) val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() From 822899df41d071b681f66e0eabc566fa3a25975c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Jan 2016 14:47:59 -0800 Subject: [PATCH 3/6] Add comments --- core/src/main/scala/org/apache/spark/Accumulator.scala | 2 +- .../main/scala/org/apache/spark/scheduler/SparkListener.scala | 1 + .../scala/org/apache/spark/InternalAccumulatorSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index b4649e0820a8d..5e8f1d4a705c3 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -85,7 +85,7 @@ private[spark] object Accumulators extends Logging { * This global map holds the original accumulator objects that are created on the driver. * It keeps weak references to these objects so that accumulators can be garbage-collected * once the RDDs and user-code that reference them are cleaned up. - * TODO: Don't use a global map; these should be tied to a SparkContext at the very least. + * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). */ @GuardedBy("Accumulators") val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index ed3adbd81c28e..28974740e91d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -61,6 +61,7 @@ case class SparkListenerTaskEnd( taskType: String, reason: TaskEndReason, taskInfo: TaskInfo, + // may be null if the task has failed @Nullable taskMetrics: TaskMetrics) extends SparkListenerEvent diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index fb779af87afb7..b69f70cab3d3f 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -220,7 +220,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { rdd.count() } - // TODO: these two tests are incorrect; they don't actually trigger stage retries. + // TODO: these two tests are incorrect; they don't actually trigger stage retries (SPARK-13053). ignore("internal accumulators in fully resubmitted stages") { testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks } @@ -259,7 +259,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { /** * Test whether internal accumulators are merged properly if some tasks fail. - * TODO: make this actually retry the stage. + * TODO: make this actually retry the stage (SPARK-13053). */ private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = { val listener = new SaveInfoListener From ba5f5558b4004e16fe7c82fa90b623e54a285388 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Jan 2016 15:03:20 -0800 Subject: [PATCH 4/6] Simplify unnecessary branching in DAGScheduler With this change, we ALWAYS post task end events in canceled stages for both tasks that succeeded and tasks that failed. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 897479b50010d..48d691be939a4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1144,13 +1144,12 @@ class DAGScheduler( null } - // The success case is dealt with separately below. - // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here. - if (event.reason != Success) { - val attemptId = task.stageAttemptId - listenerBus.post(SparkListenerTaskEnd( - stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics)) - } + // Note: this stage may already have been canceled, in which case this task end event + // maybe posted after the stage completed event. There's not much we can do here without + // introducing additional complexity in the scheduler to wait for all the task end events + // before posting the stage completed event. + listenerBus.post(SparkListenerTaskEnd( + stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics)) if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. @@ -1160,8 +1159,6 @@ class DAGScheduler( val stage = stageIdToStage(task.stageId) event.reason match { case Success => - listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, - event.reason, event.taskInfo, taskMetrics)) stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => From 6e4859d0aff3dbbd1c59e88101b7112610eb7d3c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 27 Jan 2016 15:07:25 -0800 Subject: [PATCH 5/6] Fix MiMa --- project/MimaExcludes.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 968a2903f3010..0b5a2e4ede6e0 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -183,7 +183,8 @@ object MimaExcludes { ) ++ Seq( // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.Accumulator.initialValue") ) ++ Seq( // SPARK-12692 Scala style: Fix the style violation (Space before "," or ":") ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"), From 3b1e41470be1852760a35951dfb6b33c0be83c70 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 8 Feb 2016 13:51:21 -0800 Subject: [PATCH 6/6] Revert "Simplify unnecessary branching in DAGScheduler" This reverts commit ba5f5558b4004e16fe7c82fa90b623e54a285388. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 48d691be939a4..897479b50010d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1144,12 +1144,13 @@ class DAGScheduler( null } - // Note: this stage may already have been canceled, in which case this task end event - // maybe posted after the stage completed event. There's not much we can do here without - // introducing additional complexity in the scheduler to wait for all the task end events - // before posting the stage completed event. - listenerBus.post(SparkListenerTaskEnd( - stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics)) + // The success case is dealt with separately below. + // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here. + if (event.reason != Success) { + val attemptId = task.stageAttemptId + listenerBus.post(SparkListenerTaskEnd( + stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics)) + } if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. @@ -1159,6 +1160,8 @@ class DAGScheduler( val stage = stageIdToStage(task.stageId) event.reason match { case Success => + listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, + event.reason, event.taskInfo, taskMetrics)) stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] =>