Skip to content

Commit

Permalink
[SPARK-22763][CORE] SHS: Ignore unknown events and parse through the …
Browse files Browse the repository at this point in the history
…file

## What changes were proposed in this pull request?

While spark code changes, there are new events in event log: #19649
And we used to maintain a whitelist to avoid exceptions: #15663
Currently Spark history server will stop parsing on unknown events or unrecognized properties. We may still see part of the UI data.
For better compatibility, we can ignore unknown events and parse through the log file.

## How was this patch tested?
Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #19953 from gengliangwang/ReplayListenerBus.
  • Loading branch information
gengliangwang authored and gatorsmile committed Dec 13, 2017
1 parent c5a4701 commit 1abcbed
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
Expand Up @@ -69,6 +69,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
eventsFilter: ReplayEventsFilter): Unit = {
var currentLine: String = null
var lineNumber: Int = 0
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]

try {
val lineEntries = lines
Expand All @@ -84,16 +86,22 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {

postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
"Unrecognized field \"queryStatus\" " +
"(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
// Ignore events generated by Structured Streaming in Spark 2.0.2
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case e: ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unknown event.
if (!unrecognizedEvents.contains(e.getMessage)) {
logWarning(s"Drop unrecognized event: ${e.getMessage}")
unrecognizedEvents.add(e.getMessage)
}
logDebug(s"Drop incompatible event log: $currentLine")
case e: UnrecognizedPropertyException =>
// Ignore unrecognized properties, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unrecognized property.
if (!unrecognizedProperties.contains(e.getMessage)) {
logWarning(s"Drop unrecognized property: ${e.getMessage}")
unrecognizedProperties.add(e.getMessage)
}
logDebug(s"Drop incompatible event log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
Expand Down Expand Up @@ -125,13 +133,4 @@ private[spark] object ReplayListenerBus {

// utility filter that selects all event logs during replay
val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }

/**
* Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
* old json may fail and we can just ignore these failures.
*/
val KNOWN_REMOVED_CLASSES = Set(
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
)
}
Expand Up @@ -128,6 +128,35 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
}
}

test("Replay incompatible event log") {
val logFilePath = Utils.getFilePath(testDir, "incompatible.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
125L, "UserUsingIncompatibleVersion", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
replayer.replay(logData, logFilePath.toString)
} finally {
logData.close()
}
assert(eventMonster.loggedEvents.size === 2)
assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
}

// This assumes the correctness of EventLoggingListener
test("End-to-end replay") {
testApplicationReplay()
Expand Down

0 comments on commit 1abcbed

Please sign in to comment.