Skip to content

Commit

Permalink
[SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
Browse files Browse the repository at this point in the history
…g the logs failure

### What changes were proposed in this pull request?

1. Add an option to close the input streams in FileAppender
1. Set the closeStreams to true in FileAppender when used in ExecutorRunner

### Why are the changes needed?

The "inputStream" in FileAppender is not closed when error happened in writting to outputStream

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add new tests for FileAppender
  • Loading branch information
jhu-chang committed Jul 8, 2021
1 parent 23943e5 commit 8b2b540
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 14 deletions.
Expand Up @@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner(

// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)

state = ExecutorState.RUNNING
worker.send(ExecutorStateChanged(appId, execId, state, None, None))
Expand Down
Expand Up @@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
/**
* Continuously appends the data from an input stream into the given file.
*/
private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
extends Logging {
private[spark] class FileAppender(
inputStream: InputStream,
file: File,
bufferSize: Int = 8192,
closeStreams: Boolean = false
) extends Logging {
@volatile private var outputStream: FileOutputStream = null
@volatile private var markedForStop = false // has the appender been asked to stopped

Expand Down Expand Up @@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
}
}
} {
closeFile()
try {
if (closeStreams) {
inputStream.close()
}
} finally {
closeFile()
}
}
} catch {
case e: Exception =>
Expand Down Expand Up @@ -113,7 +123,18 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
private[spark] object FileAppender extends Logging {

/** Create the right appender based on Spark configuration */
def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
def apply(
inputStream: InputStream,
file: File,
conf: SparkConf) : FileAppender = {
apply(inputStream, file, conf, false)
}

def apply(
inputStream: InputStream,
file: File,
conf: SparkConf,
closeStreams: Boolean): FileAppender = {

val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
Expand Down Expand Up @@ -141,27 +162,29 @@ private[spark] object FileAppender extends Logging {
validatedParams.map {
case (interval, pattern) =>
new RollingFileAppender(
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf,
closeStreams = closeStreams)
}.getOrElse {
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
}
}

def createSizeBasedAppender(): FileAppender = {
rollingSizeBytes match {
case IntParam(bytes) =>
logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf)
new RollingFileAppender(
inputStream, file, new SizeBasedRollingPolicy(bytes), conf, closeStreams = closeStreams)
case _ =>
logWarning(
s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
}
}

rollingStrategy match {
case "" =>
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
case "time" =>
createTimeBasedAppender()
case "size" =>
Expand All @@ -170,7 +193,7 @@ private[spark] object FileAppender extends Logging {
logWarning(
s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
s"rolling logs not enabled")
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
}
}
}
Expand Down
Expand Up @@ -42,8 +42,9 @@ private[spark] class RollingFileAppender(
activeFile: File,
val rollingPolicy: RollingPolicy,
conf: SparkConf,
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
) extends FileAppender(inputStream, activeFile, bufferSize) {
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE,
closeStreams: Boolean = false
) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) {

private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)
Expand Down
35 changes: 35 additions & 0 deletions core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
Expand Up @@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString)
}

test("basic file appender - close stream") {
val inputStream = mock(classOf[InputStream])
val appender = new FileAppender(inputStream, testFile, closeStreams = true)
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}

test("rolling file appender - time-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand Down Expand Up @@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
}

test("rolling file appender - time-based rolling close stream") {
val inputStream = mock(classOf[InputStream])
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time")
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
assert(
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy])
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}

test("rolling file appender - size-based rolling close stream") {
val inputStream = mock(classOf[InputStream])
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size")
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
assert(
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy])
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}

test("rolling file appender - size-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand Down

0 comments on commit 8b2b540

Please sign in to comment.