Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10620] [SPARK-13054] Minor addendum to #10835 #10958

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/Accumulator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,20 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: T,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was already added in another patch, causing a merge conflict here.

param: AccumulatorParam[T],
name: Option[String],
internal: Boolean,
override val countFailedValues: Boolean = false)
private[spark] override val countFailedValues: Boolean = false)
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
this(initialValue, param, name, false /* internal */)
}

def this(initialValue: T, param: AccumulatorParam[T]) = {
this(initialValue, param, None, false)
this(initialValue, param, None, false /* internal */)
}
}

Expand All @@ -84,7 +85,7 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
@GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private[spark] object InternalAccumulator {
/**
* Accumulators for tracking internal metrics.
*/
def create(): Seq[Accumulator[_]] = {
def createAll(): Seq[Accumulator[_]] = {
Seq[String](
EXECUTOR_DESERIALIZE_TIME,
EXECUTOR_RUN_TIME,
Expand Down Expand Up @@ -188,7 +188,7 @@ private[spark] object InternalAccumulator {
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = create()
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends TaskContext
with Logging {

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
exceptionWrapper: Option[ThrowableSerializationWrapper],
private val exceptionWrapper: Option[ThrowableSerializationWrapper],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was private in 1.6, so this looks good.

accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it public because TaskMetrics was public, and this is basically task metrics

extends TaskFailedReason {

Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,15 @@ private[spark] class Executor(

// Collect latest accumulator values to report back to the driver
val accumulatorUpdates: Seq[AccumulableInfo] =
if (task != null) {
task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
if (task != null) {
task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
}
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
}
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
}

val serializedTaskEndReason = {
try {
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* these requirements.
*/
@DeveloperApi
class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {

class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
import InternalAccumulator._

// Needed for Java tests
def this() {
this(InternalAccumulator.create())
this(InternalAccumulator.createAll())
}

/**
Expand Down Expand Up @@ -144,6 +143,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
}

@deprecated("setting updated blocks is not allowed", "2.0.0")
def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
blocks.foreach(setUpdatedBlockStatuses)
}

// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
Expand Down Expand Up @@ -220,6 +224,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics

@deprecated("setting OutputMetrics is for internal use only", "2.0.0")
def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
_outputMetrics = om
}

/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
Expand Down Expand Up @@ -296,6 +305,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics

@deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
_shuffleWriteMetrics = swm
}

/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U](
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ case class SparkListenerTaskEnd(
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
// may be null if the task has failed
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)

val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemoryQuantiles = {
<td>
<span data-toggle="tooltip"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private[spark] object JsonProtocol {
val accumUpdates = metricsUpdate.accumUpdates
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Executor ID" -> execId) ~
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
("Stage Attempt ID" -> stageAttemptId) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
val externalAccums = Seq(acc1, acc2)
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}

test("create") {
val accums = create()
val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
Expand Down Expand Up @@ -123,7 +123,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}

test("naming") {
val accums = create()
val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
Expand Down Expand Up @@ -291,7 +291,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
assert(Accumulators.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
assert(Accumulators.originals.size === internalAccums.size * 2)
val accumsRegistered = sc.cleaner match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TaskMetricsSuite extends SparkFunSuite {
import TaskMetricsSuite._

test("create") {
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
val tm1 = new TaskMetrics
val tm2 = new TaskMetrics(internalAccums)
assert(tm1.accumulatorUpdates().size === internalAccums.size)
Expand All @@ -51,23 +51,23 @@ class TaskMetricsSuite extends SparkFunSuite {
test("create with unnamed accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
InternalAccumulator.create() ++ Seq(
InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, None, internal = true)))
}
}

test("create with duplicate name accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
InternalAccumulator.create() ++ Seq(
InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
}
}

test("create with external accum") {
intercept[IllegalArgumentException] {
new TaskMetrics(
InternalAccumulator.create() ++ Seq(
InternalAccumulator.createAll() ++ Seq(
new Accumulator(0, IntAccumulatorParam, Some("x"))))
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}

test("mutating values") {
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
// initial values
assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
Expand Down Expand Up @@ -180,7 +180,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating shuffle read metrics values") {
import shuffleRead._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics.get), accums, name, value)
Expand Down Expand Up @@ -234,7 +234,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating shuffle write metrics values") {
import shuffleWrite._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics.get), accums, name, value)
Expand Down Expand Up @@ -267,7 +267,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating input metrics values") {
import input._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.inputMetrics.get), accums, name, value,
Expand Down Expand Up @@ -296,7 +296,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("mutating output metrics values") {
import output._
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val tm = new TaskMetrics(accums)
def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
assertValueEquals(tm, tm => tmValue(tm.outputMetrics.get), accums, name, value,
Expand Down Expand Up @@ -381,7 +381,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}

test("additional accumulables") {
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
val tm = new TaskMetrics(internalAccums)
assert(tm.accumulatorUpdates().size === internalAccums.size)
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
Expand Down Expand Up @@ -419,7 +419,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in shuffle read accums") {
// set shuffle read accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
assert(srAccum.isDefined)
srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -432,7 +432,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in shuffle write accums") {
// set shuffle write accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
assert(swAccum.isDefined)
swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -445,7 +445,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in input accums") {
// set input accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
assert(inAccum.isDefined)
inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -458,7 +458,7 @@ class TaskMetricsSuite extends SparkFunSuite {

test("existing values in output accums") {
// set output accum before passing it into TaskMetrics
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
assert(outAccum.isDefined)
outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
Expand All @@ -470,7 +470,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}

test("from accumulator updates") {
val accumUpdates1 = InternalAccumulator.create().map { a =>
val accumUpdates1 = InternalAccumulator.createAll().map { a =>
AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val param = AccumulatorParam.LongAccumulatorParam
val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
val initialAccums = InternalAccumulator.create()
val initialAccums = InternalAccumulator.createAll()
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"

def makeTaskMetrics(base: Int): TaskMetrics = {
val accums = InternalAccumulator.create()
val accums = InternalAccumulator.createAll()
accums.foreach(Accumulators.register)
val taskMetrics = new TaskMetrics(accums)
val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ private[spark] object JsonProtocolSuite extends Assertions {

/** -------------------------------- *
| Util methods for comparing events |
* --------------------------------- */
* --------------------------------- */

private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
(event1, event2) match {
Expand Down Expand Up @@ -773,7 +773,7 @@ private[spark] object JsonProtocolSuite extends Assertions {

/** ----------------------------------- *
| Util methods for constructing events |
* ------------------------------------ */
* ------------------------------------ */

private val properties = {
val p = new Properties
Expand Down
Loading