Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
And remove unneeded config in spec
  • Loading branch information
eejbyfeldt committed Apr 6, 2023
1 parent d5b6620 commit a0c7646
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,13 @@ class TaskMetrics private[spark] () extends Serializable {
*/
@transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]]

private[spark] def externalAccums() = _externalAccums.asScala
private[spark] def externalAccums = _externalAccums.asScala

private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
_externalAccums.add(a)
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] =
internalAccums ++ _externalAccums.asScala
private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums

private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = {
// RESULT_SIZE accumulator is always zero at executor, we need to send it back as its
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private[spark] abstract class Task[T](
context.taskMetrics.nonZeroInternalAccums() ++
// zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
// filter them out.
context.taskMetrics.externalAccums().filter(a => !taskFailed || a.countFailedValues)
context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
} else {
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,6 @@ class ExecutorSuite extends SparkFunSuite
test("SPARK-39696: Using accumulators should not cause heartbeat to fail") {
val conf = new SparkConf().setMaster("local").setAppName("executor suite test")
conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms")
conf.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "500ms")
conf.set(Network.NETWORK_TIMEOUT_INTERVAL.key, "400ms")
sc = new SparkContext(conf)

val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator${i}"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class SQLAppStatusListener(
// work around a race in the DAGScheduler. The metrics info does not contain accumulator info
// when reading event logs in the SHS, so we have to rely on the accumulator in that case.
val accums = if (live && event.taskMetrics != null) {
event.taskMetrics.externalAccums().flatMap { a =>
event.taskMetrics.externalAccums.flatMap { a =>
// This call may fail if the accumulator is gc'ed, so account for that.
try {
Some(a.toInfo(Some(a.value), None))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object InputOutputMetricsHelper {
res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead

var maxOutputRows = 0L
for (accum <- taskEnd.taskMetrics.externalAccums()) {
for (accum <- taskEnd.taskMetrics.externalAccums) {
val info = accum.toInfo(Some(accum.value), None)
if (info.name.toString.contains("number of output rows")) {
info.update match {
Expand Down

0 comments on commit a0c7646

Please sign in to comment.