diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index c191122c0630a..7aa9057858a04 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -119,7 +119,7 @@ private[spark] object InternalAccumulator { /** * Accumulators for tracking internal metrics. */ - def create(): Seq[Accumulator[_]] = { + def createAll(): Seq[Accumulator[_]] = { Seq[String]( EXECUTOR_DESERIALIZE_TIME, EXECUTOR_RUN_TIME, @@ -188,7 +188,7 @@ private[spark] object InternalAccumulator { * values across all tasks within each stage. */ def create(sc: SparkContext): Seq[Accumulator[_]] = { - val accums = create() + val accums = createAll() accums.foreach { accum => Accumulators.register(accum) sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 27ca46f73d8ca..1d228b6b86c55 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -32,7 +32,7 @@ private[spark] class TaskContextImpl( override val attemptNumber: Int, override val taskMemoryManager: TaskMemoryManager, @transient private val metricsSystem: MetricsSystem, - initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create()) + initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll()) extends TaskContext with Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index b15ef9c9a9cfb..822bad9f8f52f 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -50,7 +50,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se // Needed for Java tests def this() { - this(InternalAccumulator.create()) + this(InternalAccumulator.createAll()) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 885f70e89fbf5..cd2736e1960c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U]( partition: Partition, locs: Seq[TaskLocation], val outputId: Int, - _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create()) + _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll()) extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums) with Serializable { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 11c97d7d9a447..193c0a2479da6 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false) val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false) val externalAccums = Seq(acc1, acc2) - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() // Set some values; these should not be observed later on the "executors" acc1.setValue(10) acc2.setValue(20L) diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 630b46f828df7..fb779af87afb7 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -86,7 +86,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } test("create") { - val accums = create() + val accums = createAll() val shuffleReadAccums = createShuffleReadAccums() val shuffleWriteAccums = createShuffleWriteAccums() val inputAccums = createInputAccums() @@ -122,7 +122,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } test("naming") { - val accums = create() + val accums = createAll() val shuffleReadAccums = createShuffleReadAccums() val shuffleWriteAccums = createShuffleWriteAccums() val inputAccums = createInputAccums() @@ -236,7 +236,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } assert(Accumulators.originals.isEmpty) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage assert(Accumulators.originals.size === internalAccums.size * 2) val accumsRegistered = sc.cleaner match { diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 15be0b194ed8e..6ee426e1c9a5f 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -31,7 +31,7 @@ class TaskMetricsSuite extends SparkFunSuite { import TaskMetricsSuite._ test("create") { - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() val tm1 = new TaskMetrics val tm2 = new TaskMetrics(internalAccums) assert(tm1.accumulatorUpdates().size === internalAccums.size) @@ -51,7 +51,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("create with unnamed accum") { intercept[IllegalArgumentException] { new TaskMetrics( - InternalAccumulator.create() ++ Seq( + InternalAccumulator.createAll() ++ Seq( new Accumulator(0, IntAccumulatorParam, None, internal = true))) } } @@ -59,7 +59,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("create with duplicate name accum") { intercept[IllegalArgumentException] { new TaskMetrics( - InternalAccumulator.create() ++ Seq( + InternalAccumulator.createAll() ++ Seq( new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true))) } } @@ -67,7 +67,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("create with external accum") { intercept[IllegalArgumentException] { new TaskMetrics( - InternalAccumulator.create() ++ Seq( + InternalAccumulator.createAll() ++ Seq( new Accumulator(0, IntAccumulatorParam, Some("x")))) } } @@ -131,7 +131,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("mutating values") { - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) // initial values assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L) @@ -180,7 +180,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating shuffle read metrics values") { import shuffleRead._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = { assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value) @@ -234,7 +234,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating shuffle write metrics values") { import shuffleWrite._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = { assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value) @@ -267,7 +267,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating input metrics values") { import input._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = { assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value, @@ -296,7 +296,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("mutating output metrics values") { import output._ - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val tm = new TaskMetrics(accums) def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = { assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value, @@ -381,7 +381,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("additional accumulables") { - val internalAccums = InternalAccumulator.create() + val internalAccums = InternalAccumulator.createAll() val tm = new TaskMetrics(internalAccums) assert(tm.accumulatorUpdates().size === internalAccums.size) val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a")) @@ -419,7 +419,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in shuffle read accums") { // set shuffle read accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME)) assert(srAccum.isDefined) srAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -432,7 +432,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in shuffle write accums") { // set shuffle write accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN)) assert(swAccum.isDefined) swAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -445,7 +445,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in input accums") { // set input accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val inAccum = accums.find(_.name === Some(input.RECORDS_READ)) assert(inAccum.isDefined) inAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -458,7 +458,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("existing values in output accums") { // set output accum before passing it into TaskMetrics - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN)) assert(outAccum.isDefined) outAccum.get.asInstanceOf[Accumulator[Long]] += 10L @@ -470,7 +470,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("from accumulator updates") { - val accumUpdates1 = InternalAccumulator.create().map { a => + val accumUpdates1 = InternalAccumulator.createAll().map { a => AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues) } val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index b3bb86db10a32..850e470ca14d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -127,7 +127,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val param = AccumulatorParam.LongAccumulatorParam val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true) val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false) - val initialAccums = InternalAccumulator.create() + val initialAccums = InternalAccumulator.createAll() // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) { diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 18a16a25bfac5..9876bded33a08 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val execId = "exe-1" def makeTaskMetrics(base: Int): TaskMetrics = { - val accums = InternalAccumulator.create() + val accums = InternalAccumulator.createAll() accums.foreach(Accumulators.register) val taskMetrics = new TaskMetrics(accums) val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()