From 5af1f8a4abc4ee48e862c052a406407f184e7161 Mon Sep 17 00:00:00 2001 From: Parag Chaudhari Date: Mon, 14 Mar 2016 10:54:17 -0700 Subject: [PATCH] Added functionality to optionally configure a backup path for the Spark event log. --- .../scheduler/EventLoggingListener.scala | 81 ++++++++- .../scheduler/EventLoggingListenerSuite.scala | 164 +++++++++++++++--- docs/configuration.md | 17 ++ docs/monitoring.md | 3 +- 4 files changed, 230 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 2d76d08af6cdd..4d7f1a4fbc78f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -61,9 +61,24 @@ private[spark] class EventLoggingListener( private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) + private val shouldBackup = sparkConf.getBoolean("spark.eventLog.backup.enabled", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) + private val logBackupBaseDir: Option[URI] = + if (shouldBackup) { + val unresolvedDir = sparkConf.get("spark.eventLog.backup.dir", + EventLoggingListener.DEFAULT_LOG_BACKUP_DIR).stripSuffix("/") + Some(Utils.resolveURI(unresolvedDir)) + } else { + None + } + private val backupFileSystem: Option[FileSystem] = + if (shouldBackup) { + Some(Utils.getHadoopFileSystem(logBackupBaseDir.get, hadoopConf)) + } else { + None + } private val compressionCodec = if (shouldCompress) { Some(CompressionCodec.createCodec(sparkConf)) @@ -75,9 +90,9 @@ private[spark] class EventLoggingListener( } // Only defined if the file system scheme is not local - private var hadoopDataStream: Option[FSDataOutputStream] = None + private var hadoopDataStreams: Option[ArrayBuffer[FSDataOutputStream]] = None - private var writer: Option[PrintWriter] = None + private var writers: Option[ArrayBuffer[PrintWriter]] = None // For testing. Keep track of all JSON serialized events that have been logged. private[scheduler] val loggedEvents = new ArrayBuffer[JValue] @@ -85,10 +100,25 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // Visible for tests only. + private[scheduler] val backupLogPath: Option[String] = + if (shouldBackup) { + Some(getLogPath(logBackupBaseDir.get, appId, appAttemptId, compressionCodecName)) + } else { + None + } + /** * Creates the log file in the configured log directory. */ def start() { + createLogFile(fileSystem, logBaseDir, logPath) + if (shouldBackup) { + createLogFile(backupFileSystem.get, logBackupBaseDir.get, backupLogPath.get) + } + } + + private def createLogFile(fileSystem: FileSystem, logBaseDir: URI, logPath: String): Unit = { if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.") } @@ -112,8 +142,11 @@ private[spark] class EventLoggingListener( if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { new FileOutputStream(uri.getPath) } else { - hadoopDataStream = Some(fileSystem.create(path)) - hadoopDataStream.get + if (hadoopDataStreams.isEmpty) { + hadoopDataStreams = Some(new ArrayBuffer[FSDataOutputStream]()) + } + hadoopDataStreams.get += fileSystem.create(path) + hadoopDataStreams.get.last } try { @@ -122,7 +155,10 @@ private[spark] class EventLoggingListener( EventLoggingListener.initEventLog(bstream) fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - writer = Some(new PrintWriter(bstream)) + if (writers.isEmpty) { + writers = Some(new ArrayBuffer[PrintWriter]()) + } + writers.get += new PrintWriter(bstream) logInfo("Logging events to %s".format(logPath)) } catch { case e: Exception => @@ -135,11 +171,11 @@ private[spark] class EventLoggingListener( private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { val eventJson = JsonProtocol.sparkEventToJson(event) // scalastyle:off println - writer.foreach(_.println(compact(render(eventJson)))) + writers.foreach(_.foreach(_.println(compact(render(eventJson))))) // scalastyle:on println if (flushLogger) { - writer.foreach(_.flush()) - hadoopDataStream.foreach(_.hflush()) + writers.foreach(_.foreach(_.flush())) + hadoopDataStreams.foreach(_.foreach(_.hflush())) } if (testing) { loggedEvents += eventJson @@ -210,7 +246,33 @@ private[spark] class EventLoggingListener( * ".inprogress" suffix. */ def stop(): Unit = { - writer.foreach(_.close()) + writers.foreach { writerArray => + var e: Option[Exception] = None + writerArray.foreach { writer => + try { + writer.close() + } catch { + case ex: Exception => + if (e.isEmpty) { + e = Some(ex) + } + } + } + if (e.isDefined) { + throw e.get; + } + } + + try { + completeLogFile(fileSystem, logPath) + } finally { + if (shouldBackup) { + completeLogFile(backupFileSystem.get, backupLogPath.get) + } + } + } + + def completeLogFile(fileSystem: FileSystem, logPath: String): Unit = { val target = new Path(logPath) if (fileSystem.exists(target)) { @@ -239,6 +301,7 @@ private[spark] object EventLoggingListener extends Logging { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" + val DEFAULT_LOG_BACKUP_DIR = "/tmp/spark-events-backup" private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 43da6fc5b5474..a7c6a4e44fb8d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -47,40 +47,48 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit SparkHadoopUtil.get.newConfiguration(new SparkConf())) private var testDir: File = _ private var testDirPath: Path = _ + private var backupTestDir: File = _ + private var backupTestDirPath: Path = _ before { testDir = Utils.createTempDir() + backupTestDir = Utils.createTempDir("./backup") testDir.deleteOnExit() + backupTestDir.deleteOnExit() testDirPath = new Path(testDir.getAbsolutePath()) + backupTestDirPath = new Path(backupTestDir.getAbsolutePath()) } after { Utils.deleteRecursively(testDir) + Utils.deleteRecursively(backupTestDir) } test("Verify log file exist") { - // Verify logging directory exists - val conf = getLoggingConf(testDirPath) - val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) - eventLogger.start() - - val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) - assert(fileSystem.exists(logPath)) - val logStatus = fileSystem.getFileStatus(logPath) - assert(!logStatus.isDirectory) + testLogFile() + } - // Verify log is renamed after stop() - eventLogger.stop() - assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDirectory) + test("Verify backup log file exist") { + testLogFile(true) } test("Basic event logging") { testEventLogging() } + test("Basic event logging with backup") { + testEventLogging(true) + } + test("Basic event logging with compression") { CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => - testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec))) + testEventLogging(false, compressionCodec = Some(CompressionCodec.getShortName(codec))) + } + } + + test("Basic event logging with backup and compression") { + CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => + testEventLogging(true, compressionCodec = Some(CompressionCodec.getShortName(codec))) } } @@ -88,9 +96,21 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit testApplicationEventLogging() } + test("End-to-end event logging with backup") { + testApplicationEventLogging(true) + } + test("End-to-end event logging with compression") { CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => - testApplicationEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec))) + testApplicationEventLogging(false, + compressionCodec = Some(CompressionCodec.getShortName(codec))) + } + } + + test("End-to-end event logging with backup and compression") { + CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec => + testApplicationEventLogging(false, + compressionCodec = Some(CompressionCodec.getShortName(codec))) } } @@ -128,6 +148,51 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit import EventLoggingListenerSuite._ + /** + * This verifies that log file(original and backup) exists and + * verifies that log file is renamed after stop + */ + private def testLogFile(testBackup: Boolean = false): Unit = { + // Verify logging directory exists + val conf = getLoggingConf(testDirPath) + if (testBackup) { + enableLoggingBackup(conf) + } + val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) + eventLogger.start() + + val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) + assert(fileSystem.exists(logPath)) + val logStatus = fileSystem.getFileStatus(logPath) + assert(!logStatus.isDir) + + if (testBackup) { + // Verify back up log file is created + val backupLogPath = new Path(eventLogger.backupLogPath.get + EventLoggingListener.IN_PROGRESS) + assert(fileSystem.exists(backupLogPath)) + val backupLogStatus = fileSystem.getFileStatus(backupLogPath) + assert(!backupLogStatus.isDir) + } else { + // Verify back up log file is *not* created + val unexpectedBackupLogPath = EventLoggingListener.getLogPath( + backupTestDirPath.toUri(), "test", None, None) + assert(!fileSystem.exists( + new Path(unexpectedBackupLogPath + EventLoggingListener.IN_PROGRESS))) + } + + // Verify log is renamed after stop() + eventLogger.stop() + assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDir) + if (testBackup) { + assert(!fileSystem.getFileStatus(new Path(eventLogger.backupLogPath.get)).isDir) + } else { + // Verify back up log file is *not* created + val unexpectedBackupLogPath = EventLoggingListener.getLogPath( + backupTestDirPath.toUri(), "test", None, None) + assert(!fileSystem.exists(new Path(unexpectedBackupLogPath))) + } + } + /** * Test basic event logging functionality. * @@ -135,10 +200,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit * exactly these two events are logged in the expected file. */ private def testEventLogging( + testBackup: Boolean = false, compressionCodec: Option[String] = None, extraConf: Map[String, String] = Map()) { val conf = getLoggingConf(testDirPath, compressionCodec) extraConf.foreach { case (k, v) => conf.set(k, v) } + if (testBackup) { + enableLoggingBackup(conf) + } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) val listenerBus = new LiveListenerBus @@ -157,29 +226,52 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Verify file contains exactly the two events logged val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) try { - val lines = readLines(logData) - val logStart = SparkListenerLogStart(SPARK_VERSION) - assert(lines.size === 3) - assert(lines(0).contains("SparkListenerLogStart")) - assert(lines(1).contains("SparkListenerApplicationStart")) - assert(lines(2).contains("SparkListenerApplicationEnd")) - assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) - assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) - assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) + verifyStartAndEndEvents(logData, applicationStart, applicationEnd) } finally { logData.close() } + + if (testBackup) { + // Verify backup file contains exactly the two events logged + val backupLogData = EventLoggingListener.openEventLog(new Path(eventLogger.backupLogPath.get), + fileSystem) + try { + verifyStartAndEndEvents(backupLogData, applicationStart, applicationEnd) + } finally { + backupLogData.close() + } + } + } + + private def verifyStartAndEndEvents( + logData: InputStream, + applicationStart: SparkListenerApplicationStart, + applicationEnd: SparkListenerApplicationEnd): Unit = { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 3) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(lines(2).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) } /** * Test end-to-end event logging functionality in an application. * This runs a simple Spark job and asserts that the expected events are logged when expected. */ - private def testApplicationEventLogging(compressionCodec: Option[String] = None) { + private def testApplicationEventLogging( + testbackup: Boolean = false, + compressionCodec: Option[String] = None) { // Set defaultFS to something that would cause an exception, to make sure we don't run // into SPARK-6688. val conf = getLoggingConf(testDirPath, compressionCodec) .set("spark.hadoop.fs.defaultFS", "unsupported://example.com") + if (testbackup) { + enableLoggingBackup(conf) + } val sc = new SparkContext("local-cluster[2,2,1024]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get @@ -187,6 +279,12 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val expectedLogDir = testDir.toURI() assert(eventLogPath === EventLoggingListener.getLogPath( expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName))) + if (testbackup) { + val eventLogBackupPath = eventLogger.backupLogPath.get + val expectedBackupLogDir = backupTestDir.toURI + assert(eventLogBackupPath === EventLoggingListener.getLogPath(expectedBackupLogDir, + sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName))) + } // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) @@ -201,8 +299,19 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Make sure expected events exist in the log file. val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) - val logStart = SparkListenerLogStart(SPARK_VERSION) + verifyExpectedEvents(logData) + + if (testbackup) { + // Make sure expected events exist in the backup log file. + val backupLogData = EventLoggingListener.openEventLog( + new Path(eventLogger.backupLogPath.get), fileSystem) + verifyExpectedEvents(backupLogData) + } + } + + private def verifyExpectedEvents(logData: InputStream): Unit = { val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) val eventSet = mutable.Set( SparkListenerApplicationStart, SparkListenerBlockManagerAdded, @@ -234,6 +343,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit Source.fromInputStream(in).getLines().toSeq } + def enableLoggingBackup(conf: SparkConf): Unit = { + conf.set("spark.eventLog.backup.enabled", "true") + conf.set("spark.eventLog.backup.dir", backupTestDir.getAbsolutePath) + } + /** * A listener that asserts certain events are logged by the given EventLoggingListener. * This is necessary because events are posted asynchronously in a different thread. diff --git a/docs/configuration.md b/docs/configuration.md index 937852ffdecda..e81d9840ca3fd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -501,6 +501,23 @@ Apart from these, the following properties are also available, and may be useful #### Spark UI + + + + + + + + + + diff --git a/docs/monitoring.md b/docs/monitoring.md index c139e1cb5ac5f..3b255cfe7f111 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -23,7 +23,8 @@ beginning with 4040 (4041, 4042, etc). Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set `spark.eventLog.enabled` to true before starting the application. This configures Spark to log Spark events that encode the information displayed -in the UI to persisted storage. +in the UI to persisted storage. You can also set `spark.eventLog.backup.enabled` to true to +copy Spark events to another backup location in real time. ## Viewing After the Fact
Property NameDefaultMeaning
spark.eventLog.backup.dirfile:///tmp/spark-events-backup + Base directory in which Spark events are copied in real time, if spark.eventLog. + backup.enabled is true. In cloud environment, users may want to set this to a cloud + storage service like S3 location for issue investigation and performance analysis before + and after the cluster is terminated. +
spark.eventLog.backup.enabledfalse + Whether to back up Spark events, if spark.eventLog.enabled is true. +
spark.eventLog.compress false