Skip to content

Commit

Permalink
[SPARK-43340][CORE] Handle missing stack-trace field in eventlogs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR fixes a regression introduced by #36885 which broke JsonProtocol's ability to handle missing fields from exception field. old eventlogs missing a `Stack Trace` will raise a NPE.
As a result, SHS misinterprets  failed-jobs/SQLs as `Active/Incomplete`

This PR solves this problem by checking the JsonNode for null. If it is null, an empty array of `StackTraceElements`

### Why are the changes needed?

Fix a case which prevents the history server from identifying failed jobs if the stacktrace was not set.

Example eventlog

```
{
   "Event":"SparkListenerJobEnd",
   "Job ID":31,
   "Completion Time":1616171909785,
   "Job Result":{
      "Result":"JobFailed",
      "Exception":{
         "Message":"Job aborted"
      }
   }
}
```

**Original behavior:**

The job is marked as `incomplete`

Error from the SHS logs:

```
23/05/01 21:57:16 INFO FsHistoryProvider: Parsing file:/tmp/nds_q86_fail_test to re-build UI...
23/05/01 21:57:17 ERROR ReplayListenerBus: Exception parsing Spark event log: file:/tmp/nds_q86_fail_test
java.lang.NullPointerException
    at org.apache.spark.util.JsonProtocol$JsonNodeImplicits.extractElements(JsonProtocol.scala:1589)
    at org.apache.spark.util.JsonProtocol$.stackTraceFromJson(JsonProtocol.scala:1558)
    at org.apache.spark.util.JsonProtocol$.exceptionFromJson(JsonProtocol.scala:1569)
    at org.apache.spark.util.JsonProtocol$.jobResultFromJson(JsonProtocol.scala:1423)
    at org.apache.spark.util.JsonProtocol$.jobEndFromJson(JsonProtocol.scala:967)
    at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:878)
    at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:865)
....
23/05/01 21:57:17 ERROR ReplayListenerBus: Malformed line #24368: {"Event":"SparkListenerJobEnd","Job ID":31,"Completion Time":1616171909785,"Job Result":{"Result":"JobFailed","Exception":
{"Message":"Job aborted"}
}}
```

**After the fix:**

Job 31 is marked as `failedJob`

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new unit test in JsonProtocolSuite.

Closes #41050 from amahussein/aspark-43340-b.

Authored-by: Ahmed Hussein <ahussein@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
amahussein authored and dongjoon-hyun committed May 5, 2023
1 parent 1b54b01 commit dcd710d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1555,13 +1555,13 @@ private[spark] object JsonProtocol {
}

def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
json.extractElements.map { line =>
jsonOption(json).map(_.extractElements.map { line =>
val declaringClass = line.get("Declaring Class").extractString
val methodName = line.get("Method Name").extractString
val fileName = jsonOption(line.get("File Name")).map(_.extractString).orNull
val lineNumber = line.get("Line Number").extractInt
new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
}.toArray
}.toArray).getOrElse(Array[StackTraceElement]())
}

def exceptionFromJson(json: JsonNode): Exception = {
Expand Down
54 changes: 54 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,60 @@ class JsonProtocolSuite extends SparkFunSuite {
ex.setStackTrace(Array(new StackTraceElement("class", "method", null, -1)))
testException(ex)
}

test("SPARK-43340: Handle missing Stack Trace in event log") {
val exNoStackJson =
"""
|{
| "Message": "Job aborted"
|}
|""".stripMargin
val exNoStack = JsonProtocol.exceptionFromJson(exNoStackJson)
assert(exNoStack.getStackTrace.isEmpty)

val exEmptyStackJson =
"""
|{
| "Message": "Job aborted",
| "Stack Trace": []
|}
|""".stripMargin
val exEmptyStack = JsonProtocol.exceptionFromJson(exEmptyStackJson)
assert(exEmptyStack.getStackTrace.isEmpty)

// test entire job failure event is equivalent
val exJobFailureNoStackJson =
"""
|{
| "Event": "SparkListenerJobEnd",
| "Job ID": 31,
| "Completion Time": 1616171909785,
| "Job Result":{
| "Result": "JobFailed",
| "Exception": {
| "Message": "Job aborted"
| }
| }
|}
|""".stripMargin
val exJobFailureExpectedJson =
"""
|{
| "Event": "SparkListenerJobEnd",
| "Job ID": 31,
| "Completion Time": 1616171909785,
| "Job Result": {
| "Result": "JobFailed",
| "Exception": {
| "Message": "Job aborted",
| "Stack Trace": []
| }
| }
|}
|""".stripMargin
val jobFailedEvent = JsonProtocol.sparkEventFromJson(exJobFailureNoStackJson)
testEvent(jobFailedEvent, exJobFailureExpectedJson)
}
}


Expand Down

0 comments on commit dcd710d

Please sign in to comment.