Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
/**
* Continuously appends the data from an input stream into the given file.
*/
private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
private[spark] class FileAppender(inputStream: InputStream, file: File, conf: SparkConf,
bufferSize: Int = 8192)
extends Logging {
@volatile private var outputStream: FileOutputStream = null
@volatile private var markedForStop = false // has the appender been asked to stopped
val fileOutEnabled: Boolean = conf.getBoolean("spark.executorLog.enabled", true)

// Thread that reads the input stream and writes to file
private val writingThread = new Thread("File appending thread for " + file) {
Expand Down Expand Up @@ -71,7 +73,7 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
// asynchronously, so once appender has been flagged to stop these will be ignored
case _: IOException if markedForStop => // do nothing and proceed to stop appending
}
if (n > 0) {
if (n > 0 && fileOutEnabled) {
appendToFile(buf, n)
}
}
Expand Down Expand Up @@ -145,7 +147,7 @@ private[spark] object FileAppender extends Logging {
new RollingFileAppender(
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
}.getOrElse {
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, conf)
}
}

Expand All @@ -157,13 +159,13 @@ private[spark] object FileAppender extends Logging {
case _ =>
logWarning(
s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, conf)
}
}

rollingStrategy match {
case "" =>
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, conf)
case "time" =>
createTimeBasedAppender()
case "size" =>
Expand All @@ -172,7 +174,7 @@ private[spark] object FileAppender extends Logging {
logWarning(
s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
s"rolling logs not enabled")
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, conf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class RollingFileAppender(
val rollingPolicy: RollingPolicy,
conf: SparkConf,
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
) extends FileAppender(inputStream, activeFile, bufferSize) {
) extends FileAppender(inputStream, activeFile, conf, bufferSize) {

import RollingFileAppender._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,23 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// The `header` should not be covered
val header = "Add header"
Files.write(header, testFile, StandardCharsets.UTF_8)
val appender = new FileAppender(inputStream, testFile)
val appender = new FileAppender(inputStream, testFile, new SparkConf())
inputStream.close()
appender.awaitTermination()
assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString)
}

test("basic file appender disable") {
val testString = (1 to 1000).mkString(", ")
val inputStream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8))
val sparkConf = new SparkConf()
sparkConf.set("spark.executorLog.enabled", "false")
val appender = new FileAppender(inputStream, testFile, sparkConf)
inputStream.close()
appender.awaitTermination()
assert(Files.toString(testFile, StandardCharsets.UTF_8) === "")
}

test("rolling file appender - time-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand Down