Skip to content

Commit

Permalink
[SPARK-20084][CORE] Remove internal.metrics.updatedBlockStatuses from…
Browse files Browse the repository at this point in the history
… history files.

## What changes were proposed in this pull request?

Remove accumulator updates for internal.metrics.updatedBlockStatuses from SparkListenerTaskEnd entries in the history file. These can cause history files to grow to hundreds of GB because the value of the accumulator contains all tracked blocks.

## How was this patch tested?

Current History UI tests cover use of the history file.

Author: Ryan Blue <blue@apache.org>

Closes #17412 from rdblue/SPARK-20084-remove-block-accumulator-info.

(cherry picked from commit c4c03ee)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
rdblue authored and Marcelo Vanzin committed Mar 31, 2017
1 parent 6a1b2eb commit e3cec18
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ private[spark] object JsonProtocol {
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
("Accumulables" -> JArray(
stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
}

def taskInfoToJson(taskInfo: TaskInfo): JValue = {
Expand All @@ -281,7 +280,15 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
}

private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")

def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = {
JArray(accumulables
.filterNot(_.name.exists(accumulableBlacklist.contains))
.toList.map(accumulableInfoToJson))
}

def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
Expand Down Expand Up @@ -376,7 +383,7 @@ private[spark] object JsonProtocol {
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList)
val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
Expand Down

0 comments on commit e3cec18

Please sign in to comment.