Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10620] [SPARK-13054] Minor addendum to #10835 #10958

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/Accumulator.scala
Expand Up @@ -60,19 +60,20 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: T,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was already added in another patch, causing a merge conflict here.

param: AccumulatorParam[T],
name: Option[String],
internal: Boolean,
override val countFailedValues: Boolean = false)
private[spark] override val countFailedValues: Boolean = false)
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
this(initialValue, param, name, false /* internal */)
}

def this(initialValue: T, param: AccumulatorParam[T]) = {
this(initialValue, param, None, false)
this(initialValue, param, None, false /* internal */)
}
}

Expand All @@ -84,7 +85,7 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
@GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
Expand Down
Expand Up @@ -119,7 +119,7 @@ private[spark] object InternalAccumulator {
/**
* Accumulators for tracking internal metrics.
*/
def create(): Seq[Accumulator[_]] = {
def createAll(): Seq[Accumulator[_]] = {
Seq[String](
EXECUTOR_DESERIALIZE_TIME,
EXECUTOR_RUN_TIME,
Expand Down Expand Up @@ -188,7 +188,7 @@ private[spark] object InternalAccumulator {
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = create()
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Expand Up @@ -32,7 +32,7 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends TaskContext
with Logging {

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Expand Up @@ -118,7 +118,7 @@ case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
exceptionWrapper: Option[ThrowableSerializationWrapper],
private val exceptionWrapper: Option[ThrowableSerializationWrapper],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was private in 1.6, so this looks good.

accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it public because TaskMetrics was public, and this is basically task metrics

extends TaskFailedReason {

Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Expand Up @@ -300,15 +300,15 @@ private[spark] class Executor(

// Collect latest accumulator values to report back to the driver
val accumulatorUpdates: Seq[AccumulableInfo] =
if (task != null) {
task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
if (task != null) {
task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
}
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
}
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
}

val serializedTaskEndReason = {
try {
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Expand Up @@ -45,13 +45,12 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* these requirements.
*/
@DeveloperApi
class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {

class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
import InternalAccumulator._

// Needed for Java tests
def this() {
this(InternalAccumulator.create())
this(InternalAccumulator.createAll())
}

/**
Expand Down Expand Up @@ -144,6 +143,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
}

@deprecated("setting updated blocks is not allowed", "2.0.0")
def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
blocks.foreach(setUpdatedBlockStatuses)
}

// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
Expand Down Expand Up @@ -220,6 +224,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics

@deprecated("setting OutputMetrics is for internal use only", "2.0.0")
def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
_outputMetrics = om
}

/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
Expand Down Expand Up @@ -296,6 +305,11 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics

@deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
_shuffleWriteMetrics = swm
}

/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
Expand Down
Expand Up @@ -1144,13 +1144,12 @@ class DAGScheduler(
null
}

// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
// Note: this stage may already have been canceled, in which case this task end event
// maybe posted after the stage completed event. There's not much we can do here without
// introducing additional complexity in the scheduler to wait for all the task end events
// before posting the stage completed event.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(comment for linking)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have a test like #10951. We should combine our efforts on this. This might make more sense to have in a separate PR from the other style things but doesn't matter to much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I separated it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you going to file a separate pr or should I just update mine ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just update yours


if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
Expand All @@ -1160,8 +1159,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down
Expand Up @@ -49,7 +49,7 @@ private[spark] class ResultTask[T, U](
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
_initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {

Expand Down
Expand Up @@ -61,6 +61,7 @@ case class SparkListenerTaskEnd(
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
// may be null if the task has failed
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Expand Up @@ -408,9 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)

val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemoryQuantiles = {
<td>
<span data-toggle="tooltip"
Expand Down
Expand Up @@ -236,7 +236,7 @@ private[spark] object JsonProtocol {
val accumUpdates = metricsUpdate.accumUpdates
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Executor ID" -> execId) ~
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
("Stage Attempt ID" -> stageAttemptId) ~
Expand Down
Expand Up @@ -268,7 +268,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
val externalAccums = Seq(acc1, acc2)
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
Expand Down
Expand Up @@ -86,7 +86,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}

test("create") {
val accums = create()
val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
Expand Down Expand Up @@ -122,7 +122,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}

test("naming") {
val accums = create()
val accums = createAll()
val shuffleReadAccums = createShuffleReadAccums()
val shuffleWriteAccums = createShuffleWriteAccums()
val inputAccums = createInputAccums()
Expand Down Expand Up @@ -220,7 +220,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
rdd.count()
}

// TODO: these two tests are incorrect; they don't actually trigger stage retries.
// TODO: these two tests are incorrect; they don't actually trigger stage retries (SPARK-13053).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore("internal accumulators in fully resubmitted stages") {
testInternalAccumulatorsWithFailedTasks((i: Int) => true) // fail all tasks
}
Expand All @@ -236,7 +236,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
assert(Accumulators.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val internalAccums = InternalAccumulator.create()
val internalAccums = InternalAccumulator.createAll()
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
assert(Accumulators.originals.size === internalAccums.size * 2)
val accumsRegistered = sc.cleaner match {
Expand All @@ -259,7 +259,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {

/**
* Test whether internal accumulators are merged properly if some tasks fail.
* TODO: make this actually retry the stage.
* TODO: make this actually retry the stage (SPARK-13053).
*/
private def testInternalAccumulatorsWithFailedTasks(failCondition: (Int => Boolean)): Unit = {
val listener = new SaveInfoListener
Expand Down