Skip to content

Commit

Permalink
try change to use CopyOnWriteArrayList
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang authored and eejbyfeldt committed Apr 6, 2023
1 parent b8a400c commit d5b6620
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import java.util.concurrent.CopyOnWriteArrayList

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}

Expand Down Expand Up @@ -206,7 +208,6 @@ class TaskMetrics private[spark] () extends Serializable {
// Only used for test
private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator)


import InternalAccumulator._
@transient private[spark] lazy val nameToAccums = LinkedHashMap(
EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
Expand Down Expand Up @@ -262,13 +263,16 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* External accumulators registered with this task.
*/
@transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]]
@transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]]

private[spark] def externalAccums() = _externalAccums.asScala

private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
externalAccums += a
_externalAccums.add(a)
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] =
internalAccums ++ _externalAccums.asScala

private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = {
// RESULT_SIZE accumulator is always zero at executor, we need to send it back as its
Expand Down Expand Up @@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging {
tmAcc.metadata = acc.metadata
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
} else {
tm.externalAccums += acc
tm._externalAccums.add(acc)
}
}
tm
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private[spark] abstract class Task[T](
context.taskMetrics.nonZeroInternalAccums() ++
// zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
// filter them out.
context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
context.taskMetrics.externalAccums().filter(a => !taskFailed || a.countFailedValues)
} else {
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class SQLAppStatusListener(
// work around a race in the DAGScheduler. The metrics info does not contain accumulator info
// when reading event logs in the SHS, so we have to rely on the accumulator in that case.
val accums = if (live && event.taskMetrics != null) {
event.taskMetrics.externalAccums.flatMap { a =>
event.taskMetrics.externalAccums().flatMap { a =>
// This call may fail if the accumulator is gc'ed, so account for that.
try {
Some(a.toInfo(Some(a.value), None))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object InputOutputMetricsHelper {
res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead

var maxOutputRows = 0L
for (accum <- taskEnd.taskMetrics.externalAccums) {
for (accum <- taskEnd.taskMetrics.externalAccums()) {
val info = accum.toInfo(Some(accum.value), None)
if (info.name.toString.contains("number of output rows")) {
info.update match {
Expand Down

0 comments on commit d5b6620

Please sign in to comment.