Skip to content

Commit

Permalink
[SPARK-33146][CORE] Check for non-fatal errors when loading new appli…
Browse files Browse the repository at this point in the history
…cations in SHS

### What changes were proposed in this pull request?

Adds an additional check for non-fatal errors when attempting to add a new entry to the history server application listing.

### Why are the changes needed?

A bad rolling event log folder (missing appstatus file or no log files) would cause no applications to be loaded by the Spark history server. Figuring out why invalid event log folders are created in the first place will be addressed in separate issues, this just lets the history server skip the invalid folder and successfully load all the valid applications.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New UT

Closes #30037 from Kimahriman/bug/rolling-log-crashing-history.

Authored-by: Adam Binford <adam.binford@radiantsolutions.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
  • Loading branch information
Adam Binford authored and HeartSaVioR committed Oct 15, 2020
1 parent f3ad32f commit 9ab0ec4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
reader.fileSizeForLastIndex > 0
} catch {
case _: FileNotFoundException => false
case NonFatal(e) =>
logWarning(s"Error while reading new log ${reader.rootPath}", e)
false
}

case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}

test("SPARK-33146: don't let one bad rolling log folder prevent loading other applications") {
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)

val provider = new FsHistoryProvider(conf)

val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf)
writer.start()

writeEventsToRollingWriter(writer, Seq(
SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
provider.checkForLogs()
provider.cleanLogs()
assert(dir.listFiles().size === 1)
assert(provider.getListing.length === 1)

// Manually delete the appstatus file to make an invalid rolling event log
val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new Path(writer.logPath),
"app", None, true)
fs.delete(appStatusPath, false)
provider.checkForLogs()
provider.cleanLogs()
assert(provider.getListing.length === 0)

// Create a new application
val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, conf, hadoopConf)
writer2.start()
writeEventsToRollingWriter(writer2, Seq(
SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)

// Both folders exist but only one application found
provider.checkForLogs()
provider.cleanLogs()
assert(provider.getListing.length === 1)
assert(dir.listFiles().size === 2)

// Make sure a new provider sees the valid application
provider.stop()
val newProvider = new FsHistoryProvider(conf)
newProvider.checkForLogs()
assert(newProvider.getListing.length === 1)
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down

0 comments on commit 9ab0ec4

Please sign in to comment.