Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35027][CORE] Close the inputStream in FileAppender when writin… #33263

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner(

// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved

state = ExecutorState.RUNNING
worker.send(ExecutorStateChanged(appId, execId, state, None, None))
Expand Down
Expand Up @@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
/**
* Continuously appends the data from an input stream into the given file.
*/
private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
extends Logging {
private[spark] class FileAppender(
inputStream: InputStream,
file: File,
bufferSize: Int = 8192,
closeStreams: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we name it closeInputStream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at this again. When will we explicitly set closeStreams to false? cc @jhu-chang

) extends Logging {
@volatile private var outputStream: FileOutputStream = null
@volatile private var markedForStop = false // has the appender been asked to stopped

Expand Down Expand Up @@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
}
}
} {
closeFile()
try {
if (closeStreams) {
inputStream.close()
}
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
} finally {
closeFile()
}
}
} catch {
case e: Exception =>
Expand Down Expand Up @@ -113,7 +123,18 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
private[spark] object FileAppender extends Logging {

/** Create the right appender based on Spark configuration */
def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
srowen marked this conversation as resolved.
Show resolved Hide resolved
def apply(
inputStream: InputStream,
file: File,
conf: SparkConf) : FileAppender = {
apply(inputStream, file, conf, false)
}

def apply(
inputStream: InputStream,
file: File,
conf: SparkConf,
closeStreams: Boolean): FileAppender = {

val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
Expand Down Expand Up @@ -141,27 +162,29 @@ private[spark] object FileAppender extends Logging {
validatedParams.map {
case (interval, pattern) =>
new RollingFileAppender(
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf,
closeStreams = closeStreams)
}.getOrElse {
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
}
}

def createSizeBasedAppender(): FileAppender = {
rollingSizeBytes match {
case IntParam(bytes) =>
logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf)
new RollingFileAppender(
inputStream, file, new SizeBasedRollingPolicy(bytes), conf, closeStreams = closeStreams)
case _ =>
logWarning(
s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
}
}

rollingStrategy match {
case "" =>
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
case "time" =>
createTimeBasedAppender()
case "size" =>
Expand All @@ -170,7 +193,7 @@ private[spark] object FileAppender extends Logging {
logWarning(
s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
s"rolling logs not enabled")
new FileAppender(inputStream, file)
new FileAppender(inputStream, file, closeStreams = closeStreams)
}
}
}
Expand Down
Expand Up @@ -42,8 +42,9 @@ private[spark] class RollingFileAppender(
activeFile: File,
val rollingPolicy: RollingPolicy,
conf: SparkConf,
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
) extends FileAppender(inputStream, activeFile, bufferSize) {
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE,
closeStreams: Boolean = false
) extends FileAppender(inputStream, activeFile, bufferSize, closeStreams) {

private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)
Expand Down
35 changes: 35 additions & 0 deletions core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
Expand Up @@ -61,6 +61,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString)
}

test("basic file appender - close stream") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a JIRA prefix here e.g.) "SPARK-35027: basic file ..."

val inputStream = mock(classOf[InputStream])
val appender = new FileAppender(inputStream, testFile, closeStreams = true)
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}

test("rolling file appender - time-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand Down Expand Up @@ -96,6 +105,32 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
}

test("rolling file appender - time-based rolling close stream") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for jira prefix

val inputStream = mock(classOf[InputStream])
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "time")
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
assert(
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[TimeBasedRollingPolicy])
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}

test("rolling file appender - size-based rolling close stream") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

val inputStream = mock(classOf[InputStream])
val sparkConf = new SparkConf()
sparkConf.set(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key, "size")
val appender = FileAppender(inputStream, testFile, sparkConf, closeStreams = true)
assert(
appender.asInstanceOf[RollingFileAppender].rollingPolicy.isInstanceOf[SizeBasedRollingPolicy])
Thread.sleep(10)
appender.stop()
appender.awaitTermination()
verify(inputStream).close()
}

test("rolling file appender - size-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand Down