Skip to content

Commit

Permalink
create -> createAll
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 27, 2016
1 parent 9de795b commit 5404254
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 26 deletions.
Expand Up @@ -119,7 +119,7 @@ private[spark] object InternalAccumulator {
/**
* Accumulators for tracking internal metrics.
*/
def create(): Seq[Accumulator[_]] = {
def createAll(): Seq[Accumulator[_]] = {
Seq[String](
EXECUTOR_DESERIALIZE_TIME,
EXECUTOR_RUN_TIME,
Expand Down Expand Up @@ -188,7 +188,7 @@ private[spark] object InternalAccumulator {
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = create()
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Expand Up @@ -32,7 +32,7 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends TaskContext
with Logging {

Expand Down
Expand Up @@ -50,7 +50,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se

// Needed for Java tests
def this() {
this(InternalAccumulator.create())
this(InternalAccumulator.createAll())
}

/**
Expand Down
Expand Up @@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U](
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {

Expand Down
Expand Up @@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
val externalAccums = Seq(acc1, acc2)
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
Expand Down
Expand Up @@ -86,7 +86,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}

test("create") {
val accums = create()
val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
Expand Down Expand Up @@ -122,7 +122,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}

test("naming") {
val accums = create()
val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
Expand Down Expand Up @@ -236,7 +236,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
assert(Accumulators.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
assert(Accumulators.originals.size === internalAccums.size * 2)
val accumsRegistered = sc.cleaner match {
Expand Down
Expand Up @@ -31,7 +31,7 @@ class TaskMetricsSuite extends SparkFunSuite {
import TaskMetricsSuite._

test("create") {
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
val tm1 = new TaskMetrics
val tm2 = new TaskMetrics(internalAccums)
assert(tm1.accumulatorUpdates().size === internalAccums.size)
Expand All @@ -51,23 +51,23 @@ class TaskMetricsSuite extends SparkFunSuite {
test("create with unnamed accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
InternalAccumulator.create() ++ Seq(
InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, None, internal = true)))
}
}

test("create with duplicate name accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
InternalAccumulator.create() ++ Seq(
InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
}
}

test("create with external accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
InternalAccumulator.create() ++ Seq(
InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, Some("x"))))
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}

test("mutating values") {
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
// initial values
assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
Expand Down Expand Up @@ -180,7 +180,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating shuffle read metrics values") {
import shuffleRead._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
Expand Down Expand Up @@ -234,7 +234,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating shuffle write metrics values") {
import shuffleWrite._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
Expand Down Expand Up @@ -267,7 +267,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating input metrics values") {
import input._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
Expand Down Expand Up @@ -296,7 +296,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating output metrics values") {
import output._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
Expand Down Expand Up @@ -381,7 +381,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}

test("additional accumulables") {
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
val tm = new TaskMetrics(internalAccums)
assert(tm.accumulatorUpdates().size === internalAccums.size)
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
Expand Down Expand Up @@ -419,7 +419,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in shuffle read accums") {
// set shuffle read accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
assert(srAccum.isDefined)
srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -432,7 +432,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in shuffle write accums") {
// set shuffle write accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
assert(swAccum.isDefined)
swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -445,7 +445,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in input accums") {
// set input accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
assert(inAccum.isDefined)
inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -458,7 +458,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in output accums") {
// set output accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
assert(outAccum.isDefined)
outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -470,7 +470,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}

test("from accumulator updates") {
val accumUpdates1 = InternalAccumulator.create().map { a =>
val accumUpdates1 = InternalAccumulator.createAll().map { a =>
AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
Expand Down
Expand Up @@ -127,7 +127,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val param = AccumulatorParam.LongAccumulatorParam
val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
val initialAccums = InternalAccumulator.create()
val initialAccums = InternalAccumulator.createAll()
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
Expand Down
Expand Up @@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"

def makeTaskMetrics(base: Int): TaskMetrics = {
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
accums.foreach(Accumulators.register)
val taskMetrics = new TaskMetrics(accums)
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
Expand Down

0 comments on commit 5404254

Please sign in to comment.