diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 2f9ad4c8cc3e1..892cff483d0ea 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -26,10 +26,12 @@ import org.apache.spark.util.{IntParam, Utils} /** * Continuously appends the data from an input stream into the given file. */ -private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192) +private[spark] class FileAppender(inputStream: InputStream, file: File, conf: SparkConf, + bufferSize: Int = 8192) extends Logging { @volatile private var outputStream: FileOutputStream = null @volatile private var markedForStop = false // has the appender been asked to stopped + val fileOutEnabled: Boolean = conf.getBoolean("spark.executorLog.enabled", true) // Thread that reads the input stream and writes to file private val writingThread = new Thread("File appending thread for " + file) { @@ -71,7 +73,7 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi // asynchronously, so once appender has been flagged to stop these will be ignored case _: IOException if markedForStop => // do nothing and proceed to stop appending } - if (n > 0) { + if (n > 0 && fileOutEnabled) { appendToFile(buf, n) } } @@ -145,7 +147,7 @@ private[spark] object FileAppender extends Logging { new RollingFileAppender( inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf) }.getOrElse { - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, conf) } } @@ -157,13 +159,13 @@ private[spark] object FileAppender extends Logging { case _ => logWarning( s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled") - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, conf) } } rollingStrategy match { case "" => - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, conf) case "time" => createTimeBasedAppender() case "size" => @@ -172,7 +174,7 @@ private[spark] object FileAppender extends Logging { logWarning( s"Illegal strategy [$rollingStrategy] for rolling executor logs, " + s"rolling logs not enabled") - new FileAppender(inputStream, file) + new FileAppender(inputStream, file, conf) } } } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index 5d8cec8447b53..bfb03e4ea4485 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -42,7 +42,7 @@ private[spark] class RollingFileAppender( val rollingPolicy: RollingPolicy, conf: SparkConf, bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE - ) extends FileAppender(inputStream, activeFile, bufferSize) { + ) extends FileAppender(inputStream, activeFile, conf, bufferSize) { import RollingFileAppender._ diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index cd0ed5b036bf9..17a8e7db3e5ac 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -55,12 +55,23 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // The `header` should not be covered val header = "Add header" Files.write(header, testFile, StandardCharsets.UTF_8) - val appender = new FileAppender(inputStream, testFile) + val appender = new FileAppender(inputStream, testFile, new SparkConf()) inputStream.close() appender.awaitTermination() assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString) } + test("basic file appender disable") { + val testString = (1 to 1000).mkString(", ") + val inputStream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8)) + val sparkConf = new SparkConf() + sparkConf.set("spark.executorLog.enabled", "false") + val appender = new FileAppender(inputStream, testFile, sparkConf) + inputStream.close() + appender.awaitTermination() + assert(Files.toString(testFile, StandardCharsets.UTF_8) === "") + } + test("rolling file appender - time-based rolling") { // setup input stream and appender val testOutputStream = new PipedOutputStream()