Skip to content

Commit

Permalink
Add fine-grained test for collecting accumulators during failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 16, 2016
1 parent 28346e5 commit 00a12a4
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ private[spark] class Executor(
}

// Note: accumulator updates must be collected after TaskMetrics is updated
// TODO: add a test
val accumUpdates = task.collectAccumulatorUpdates()
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId}


class TaskMetricsSuite extends SparkFunSuite { self =>
class TaskMetricsSuite extends SparkFunSuite {
import AccumulatorParam._
import InternalAccumulator._
import StorageLevel._
import TaskMetricsSuite._

test("create") {
val internalAccums = InternalAccumulator.create()
Expand Down Expand Up @@ -101,8 +102,8 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
import shuffleRead._
val accums = InternalAccumulator.create()
val tm = new TaskMetrics(accums)
def assertValueEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
self.assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
}
// create shuffle read metrics
assert(tm.shuffleReadMetrics.isEmpty)
Expand All @@ -111,12 +112,12 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
assert(tm.shuffleReadMetrics.isDefined)
val sr = tm.shuffleReadMetrics.get
// initial values
assertValueEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
assertValueEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
assertValueEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L)
assertValueEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L)
assertValueEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L)
assertValueEquals(_.recordsRead, RECORDS_READ, 0L)
assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L)
assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L)
assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L)
assertValEquals(_.recordsRead, RECORDS_READ, 0L)
// set and increment values
sr.setRemoteBlocksFetched(100)
sr.setRemoteBlocksFetched(10)
Expand All @@ -143,30 +144,30 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
sr.incRecordsRead(6L)
sr.incRecordsRead(6L)
// assert new values exist
assertValueEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12)
assertValueEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24)
assertValueEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L)
assertValueEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L)
assertValueEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L)
assertValueEquals(_.recordsRead, RECORDS_READ, 72L)
assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12)
assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24)
assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L)
assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L)
assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L)
assertValEquals(_.recordsRead, RECORDS_READ, 72L)
}

test("mutating shuffle write metrics values") {
import shuffleWrite._
val accums = InternalAccumulator.create()
val tm = new TaskMetrics(accums)
def assertValueEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
self.assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
}
// create shuffle write metrics
assert(tm.shuffleWriteMetrics.isEmpty)
tm.registerShuffleWriteMetrics()
assert(tm.shuffleWriteMetrics.isDefined)
val sw = tm.shuffleWriteMetrics.get
// initial values
assertValueEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
assertValueEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
assertValueEquals(_.writeTime, WRITE_TIME, 0L)
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
assertValEquals(_.writeTime, WRITE_TIME, 0L)
// increment and decrement values
sw.incBytesWritten(100L)
sw.incBytesWritten(10L) // 100 + 10
Expand All @@ -179,17 +180,17 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
sw.incWriteTime(300L)
sw.incWriteTime(30L)
// assert new values exist
assertValueEquals(_.bytesWritten, BYTES_WRITTEN, 108L)
assertValueEquals(_.recordsWritten, RECORDS_WRITTEN, 216L)
assertValueEquals(_.writeTime, WRITE_TIME, 330L)
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 108L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 216L)
assertValEquals(_.writeTime, WRITE_TIME, 330L)
}

test("mutating input metrics values") {
import input._
val accums = InternalAccumulator.create()
val tm = new TaskMetrics(accums)
def assertValueEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
self.assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
(x: Any, y: Any) => assert(x.toString === y.toString))
}
// create input metrics
Expand All @@ -198,27 +199,27 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
assert(tm.inputMetrics.isDefined)
val in = tm.inputMetrics.get
// initial values
assertValueEquals(_.bytesRead, BYTES_READ, 0L)
assertValueEquals(_.recordsRead, RECORDS_READ, 0L)
assertValueEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory)
assertValEquals(_.bytesRead, BYTES_READ, 0L)
assertValEquals(_.recordsRead, RECORDS_READ, 0L)
assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Memory)
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
in.incRecordsRead(1L)
in.incRecordsRead(2L)
in.setReadMethod(DataReadMethod.Disk)
// assert new values exist
assertValueEquals(_.bytesRead, BYTES_READ, 2L)
assertValueEquals(_.recordsRead, RECORDS_READ, 3L)
assertValueEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk)
assertValEquals(_.bytesRead, BYTES_READ, 2L)
assertValEquals(_.recordsRead, RECORDS_READ, 3L)
assertValEquals(_.readMethod, READ_METHOD, DataReadMethod.Disk)
}

test("mutating output metrics values") {
import output._
val accums = InternalAccumulator.create()
val tm = new TaskMetrics(accums)
def assertValueEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
self.assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
(x: Any, y: Any) => assert(x.toString === y.toString))
}
// create input metrics
Expand All @@ -227,21 +228,21 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
assert(tm.outputMetrics.isDefined)
val out = tm.outputMetrics.get
// initial values
assertValueEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
assertValueEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
assertValueEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
// set values
out.setBytesWritten(1L)
out.setBytesWritten(2L)
out.setRecordsWritten(3L)
out.setRecordsWritten(4L)
out.setWriteMethod(DataWriteMethod.Hadoop)
// assert new values exist
assertValueEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
assertValueEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
// Note: this doesn't actually test anything, but there's only one DataWriteMethod
// so we can't set it to anything else
assertValueEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
assertValEquals(_.writeMethod, WRITE_METHOD, DataWriteMethod.Hadoop)
}

test("merging multiple shuffle read metrics") {
Expand Down Expand Up @@ -420,19 +421,19 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
// accumulators that were not registered with `Accumulators` will not show up
assertUpdatesEquals(metrics2.accumulatorUpdates(), accumUpdates1 ++ registeredAccumInfos)
}
}


/* --------------------- *
| Helper test methods |
* --------------------- */
// This extends SparkFunSuite only because we want its `assert` method.
private[spark] object TaskMetricsSuite extends SparkFunSuite {

/**
* Assert that the following three things are equal to `value`:
* (1) TaskMetrics value
* (2) TaskMetrics accumulator update value
* (3) Original accumulator value
*/
private def assertValueEquals(
def assertValueEquals(
tm: TaskMetrics,
tmValue: TaskMetrics => Any,
accums: Seq[Accumulator[_]],
Expand All @@ -453,7 +454,7 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
* Assert that two lists of accumulator updates are equal.
* Note: this does NOT check accumulator ID equality.
*/
private def assertUpdatesEquals(
def assertUpdatesEquals(
updates1: Seq[AccumulableInfo],
updates2: Seq[AccumulableInfo]): Unit = {
assert(updates1.size === updates2.size)
Expand All @@ -471,7 +472,7 @@ class TaskMetricsSuite extends SparkFunSuite { self =>
* Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
* info as an accumulator update.
*/
private def makeInfo(a: Accumulable[_, _]): AccumulableInfo = {
def makeInfo(a: Accumulable[_, _]): AccumulableInfo = {
new AccumulableInfo(
a.id, a.name.orNull, Some(a.value), None, a.isInternal, a.countFailedValues)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

override def beforeEach(): Unit = {
super.beforeEach()
// This means use 1 core and allow up to 4 failed tasks
sc = new SparkContext("local[1, 4]", "DAGSchedulerSuite")
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
Expand Down Expand Up @@ -1593,29 +1592,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(Accumulators.get(acc3.id).get.value === 18L)
}

test("accumulators are updated on exception failures (end-to-end)") {
import AccumulatorParam._
// Create 2 accumulators, one that counts failed values and another that doesn't
val acc1 = new Accumulator(
0L, LongAccumulatorParam, Some("ingenieur"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(
0L, LongAccumulatorParam, Some("boulangere"), internal = false, countFailedValues = false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
acc1 += 1
acc2 += 1
if (TaskContext.get.attemptNumber() <= 2) {
throw new Exception("you did something wrong")
} else {
0
}
}.count()
// The one that counts failed values should be 4x the one that didn't,
// since we ran each task 4 times
assert(Accumulators.get(acc1.id).get.value === 40L)
assert(Accumulators.get(acc2.id).get.value === 10L)
}

test("reduce tasks should be placed locally with map output") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter

import org.apache.spark._
import org.apache.spark.executor.TaskMetricsSuite
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
Expand Down Expand Up @@ -97,6 +99,57 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}.collect()
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}

test("accumulators are updated on exception failures") {
// This means use 1 core and 4 max task failures
sc = new SparkContext("local[1,4]", "test")
val param = AccumulatorParam.LongAccumulatorParam
// Create 2 accumulators, one that counts failed values and another that doesn't
val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
acc1 += 1
acc2 += 1
if (TaskContext.get.attemptNumber() <= 2) {
throw new Exception("you did something wrong")
} else {
0
}
}.count()
// The one that counts failed values should be 4x the one that didn't,
// since we ran each task 4 times
assert(Accumulators.get(acc1.id).get.value === 40L)
assert(Accumulators.get(acc2.id).get.value === 10L)
}

test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
val param = AccumulatorParam.LongAccumulatorParam
val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
val initialAccums = InternalAccumulator.create()
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
context = new TaskContextImpl(0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
SparkEnv.get.metricsSystem,
initialAccums)
context.taskMetrics.registerAccumulator(acc1)
context.taskMetrics.registerAccumulator(acc2)
override def runTask(tc: TaskContext): Int = 0
}
// First, simulate task success. This should give us all the accumulators.
val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
val accumUpdates2 = (initialAccums ++ Seq(acc1, acc2)).map(TaskMetricsSuite.makeInfo)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
// Now, simulate task failures. This should give us only the accums that count failed values.
val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
val accumUpdates4 = (initialAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
}

}

private object TaskContextSuite {
Expand Down

0 comments on commit 00a12a4

Please sign in to comment.