From 002156591cdfba646e5868c4f5aaf3f3d1d80bb4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 3 Mar 2025 17:02:54 -0800 Subject: [PATCH] Also apply accumulableExcludeList to other places where we log accumulables in event logs --- .../org/apache/spark/util/JsonProtocol.scala | 14 ++++----- .../apache/spark/util/JsonProtocolSuite.scala | 30 ++++--------------- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e30380f41566a..df809f4fad745 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -376,9 +376,8 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeNumberField("Task ID", taskId) g.writeNumberField("Stage ID", stageId) g.writeNumberField("Stage Attempt ID", stageAttemptId) - g.writeArrayFieldStart("Accumulator Updates") - updates.foreach(accumulableInfoToJson(_, g)) - g.writeEndArray() + g.writeFieldName("Accumulator Updates") + accumulablesToJson(updates, g) g.writeEndObject() } g.writeEndArray() @@ -496,7 +495,7 @@ private[spark] object JsonProtocol extends JsonUtils { def accumulablesToJson( accumulables: Iterable[AccumulableInfo], g: JsonGenerator, - includeTaskMetricsAccumulators: Boolean = true): Unit = { + includeTaskMetricsAccumulators: Boolean = true): Unit = { g.writeStartArray() accumulables .filterNot { acc => @@ -714,11 +713,8 @@ private[spark] object JsonProtocol extends JsonUtils { reason.foreach(g.writeStringField("Loss Reason", _)) case taskKilled: TaskKilled => g.writeStringField("Kill Reason", taskKilled.reason) - g.writeArrayFieldStart("Accumulator Updates") - taskKilled.accumUpdates.foreach { info => - accumulableInfoToJson(info, g) - } - g.writeEndArray() + g.writeFieldName("Accumulator Updates") + accumulablesToJson(taskKilled.accumUpdates, g) case _ => // no extra fields to write } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 89e3d8371be4c..a9399edeb9ad7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1166,7 +1166,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(taskId1 === taskId2) assert(stageId1 === stageId2) assert(stageAttemptId1 === stageAttemptId2) - assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b)) + val filteredUpdates = updates1 + .filterNot { acc => acc.name.exists(accumulableExcludeList.contains) } + assertSeqEquals[AccumulableInfo](filteredUpdates, updates2, (a, b) => a.equals(b)) }) assertSeqEquals[((Int, Int), ExecutorMetrics)]( e1.executorUpdates.toSeq.sortBy(_._1), @@ -1299,7 +1301,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(r1.description === r2.description) assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals) assert(r1.fullStackTrace === r2.fullStackTrace) - assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b)) + val filteredUpdates = r1.accumUpdates + .filterNot { acc => acc.name.exists(accumulableExcludeList.contains) } + assertSeqEquals[AccumulableInfo](filteredUpdates, r2.accumUpdates, (a, b) => a.equals(b)) case (TaskResultLost, TaskResultLost) => case (r1: TaskKilled, r2: TaskKilled) => assert(r1.reason == r2.reason) @@ -2774,28 +2778,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Count Failed Values": true | }, | { - | "ID": 12, - | "Name": "$UPDATED_BLOCK_STATUSES", - | "Update": [ - | { - | "Block ID": "rdd_0_0", - | "Status": { - | "Storage Level": { - | "Use Disk": true, - | "Use Memory": true, - | "Use Off Heap": false, - | "Deserialized": false, - | "Replication": 2 - | }, - | "Memory Size": 0, - | "Disk Size": 0 - | } - | } - | ], - | "Internal": true, - | "Count Failed Values": true - | }, - | { | "ID": 13, | "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}", | "Update": 0,