Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22763][Core]SHS: Ignore unknown events and parse through the file #19953

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
eventsFilter: ReplayEventsFilter): Unit = {
var currentLine: String = null
var lineNumber: Int = 0
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]

try {
val lineEntries = lines
Expand All @@ -84,16 +86,22 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {

postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
"Unrecognized field \"queryStatus\" " +
"(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
// Ignore events generated by Structured Streaming in Spark 2.0.2
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case e: ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unknown event.
if (!unrecognizedEvents.contains(e.getMessage)) {
logWarning(s"Drop unrecognized event: ${e.getMessage}")
unrecognizedEvents.add(e.getMessage)
}
logDebug(s"Drop incompatible event log: $currentLine")
case e: UnrecognizedPropertyException =>
// Ignore unrecognized properties, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unrecognized property.
if (!unrecognizedProperties.contains(e.getMessage)) {
logWarning(s"Drop unrecognized property: ${e.getMessage}")
unrecognizedProperties.add(e.getMessage)
}
logDebug(s"Drop incompatible event log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
Expand Down Expand Up @@ -125,13 +133,4 @@ private[spark] object ReplayListenerBus {

// utility filter that selects all event logs during replay
val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }

/**
* Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
* old json may fail and we can just ignore these failures.
*/
val KNOWN_REMOVED_CLASSES = Set(
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,35 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
}
}

test("Replay incompatible event log") {
val logFilePath = Utils.getFilePath(testDir, "incompatible.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
125L, "UserUsingIncompatibleVersion", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
replayer.replay(logData, logFilePath.toString)
} finally {
logData.close()
}
assert(eventMonster.loggedEvents.size === 2)
assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
}

// This assumes the correctness of EventLoggingListener
test("End-to-end replay") {
testApplicationReplay()
Expand Down