Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,8 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeNumberField("Task ID", taskId)
g.writeNumberField("Stage ID", stageId)
g.writeNumberField("Stage Attempt ID", stageAttemptId)
g.writeArrayFieldStart("Accumulator Updates")
updates.foreach(accumulableInfoToJson(_, g))
g.writeEndArray()
g.writeFieldName("Accumulator Updates")
accumulablesToJson(updates, g)
g.writeEndObject()
}
g.writeEndArray()
Expand Down Expand Up @@ -496,7 +495,7 @@ private[spark] object JsonProtocol extends JsonUtils {
def accumulablesToJson(
accumulables: Iterable[AccumulableInfo],
g: JsonGenerator,
includeTaskMetricsAccumulators: Boolean = true): Unit = {
includeTaskMetricsAccumulators: Boolean = true): Unit = {
g.writeStartArray()
accumulables
.filterNot { acc =>
Expand Down Expand Up @@ -714,11 +713,8 @@ private[spark] object JsonProtocol extends JsonUtils {
reason.foreach(g.writeStringField("Loss Reason", _))
case taskKilled: TaskKilled =>
g.writeStringField("Kill Reason", taskKilled.reason)
g.writeArrayFieldStart("Accumulator Updates")
taskKilled.accumUpdates.foreach { info =>
accumulableInfoToJson(info, g)
}
g.writeEndArray()
g.writeFieldName("Accumulator Updates")
accumulablesToJson(taskKilled.accumUpdates, g)
case _ =>
// no extra fields to write
}
Expand Down
30 changes: 6 additions & 24 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
assert(taskId1 === taskId2)
assert(stageId1 === stageId2)
assert(stageAttemptId1 === stageAttemptId2)
assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b))
val filteredUpdates = updates1
.filterNot { acc => acc.name.exists(accumulableExcludeList.contains) }
assertSeqEquals[AccumulableInfo](filteredUpdates, updates2, (a, b) => a.equals(b))
})
assertSeqEquals[((Int, Int), ExecutorMetrics)](
e1.executorUpdates.toSeq.sortBy(_._1),
Expand Down Expand Up @@ -1299,7 +1301,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
assert(r1.description === r2.description)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assert(r1.fullStackTrace === r2.fullStackTrace)
assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b))
val filteredUpdates = r1.accumUpdates
.filterNot { acc => acc.name.exists(accumulableExcludeList.contains) }
assertSeqEquals[AccumulableInfo](filteredUpdates, r2.accumUpdates, (a, b) => a.equals(b))
case (TaskResultLost, TaskResultLost) =>
case (r1: TaskKilled, r2: TaskKilled) =>
assert(r1.reason == r2.reason)
Expand Down Expand Up @@ -2774,28 +2778,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Count Failed Values": true
| },
| {
| "ID": 12,
| "Name": "$UPDATED_BLOCK_STATUSES",
| "Update": [
| {
| "Block ID": "rdd_0_0",
| "Status": {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
| "Memory Size": 0,
| "Disk Size": 0
| }
| }
| ],
| "Internal": true,
| "Count Failed Values": true
| },
| {
| "ID": 13,
| "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}",
| "Update": 0,
Expand Down