Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22058][CORE]the BufferedInputStream will not be closed if an exception occurs. #19277

Closed
wants to merge 4 commits into from

Conversation

zuotingbing
Copy link

@zuotingbing zuotingbing commented Sep 19, 2017

What changes were proposed in this pull request?

EventLoggingListener use val in = new BufferedInputStream(fs.open(log)) and will close it if codec.map(_.compressedInputStream(in)).getOrElse(in) occurs an exception .
But, if CompressionCodec.createCodec(new SparkConf, c) throws an exception, the BufferedInputStream in will not be closed anymore.

How was this patch tested?

exist tests

// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logName = log.getName.stripSuffix(IN_PROGRESS)
val codecName: Option[String] = logName.split("\\.").tail.lastOption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines don't need to be in the try block. Best to keep it small.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will fix it. Thanks srowen.

@zuotingbing zuotingbing changed the title [SPARK-22058][COREthe BufferedInputStream will not be closed if an exception occurs. [SPARK-22058][CORE]the BufferedInputStream will not be closed if an exception occurs. Sep 19, 2017
try {
val codec = codecName.map { c =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would here throw an exception?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there can throws an exception with "Codec [$codecName] is not available" in CompressionCodec.createCodec function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're tightening, this whole body could be

      codecName.map { c =>
        val codec = codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
        codec.compressedInputStream(in)
      }.getOrElse(in)

try {
val codec = codecName.map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
codec.map(_.compressedInputStream(in)).getOrElse(in)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to move this line val in = new BufferedInputStream(fs.open(log)) to here to solve your problem?

@zuotingbing
Copy link
Author

I am not sure. If we move this line val in = new BufferedInputStream(fs.open(log)) into try~catch, we should define var in: BufferedInputStream = null before, and use catch { case e: Exception => if (in != null) { in.close() in = null } throw e } to close the BufferedInputStream when exception occurs. It is more complex, so i would like the code before.

@jerryshao
Copy link
Contributor

Strictly saying, this line new BufferedInputStream(fs.open(log)) will also throw exception, shouldn't you try-catch it?

try {
val codec = codecName.map { c =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're tightening, this whole body could be

      codecName.map { c =>
        val codec = codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
        codec.compressedInputStream(in)
      }.getOrElse(in)

@@ -351,11 +351,11 @@ private[spark] object EventLoggingListener extends Logging {
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logName = log.getName.stripSuffix(IN_PROGRESS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move declaration of in to just before the try

try {
val codec = codecName.map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch {
case e: Exception =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also note this doesn't handle Throwable, nor does a similar block earlier in the file. In case of say OutOfMemoryError it wouldn't be closed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes , but if occurs error , it is of small significance to close the BufferedInputStream.
will fix as you said.

@zuotingbing
Copy link
Author

@jerryshao if this line new BufferedInputStream(fs.open(log)) throws exception, it mean we do not need to close the object of BufferedInputStream because it new failed.

@srowen
Copy link
Member

srowen commented Sep 20, 2017

@zuotingbing no, fs.open could succeed, but new BufferedInputStream could fail, leaving the underlying stream open. In practice, it can't actually fail because this constructor does no I/O or operations that can fail. You could get away with leaving it as is.

codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch {
case e: Exception =>
case e: Throwable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What specific case do we want to catch here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

srowen 19 hours ago Member
I also note this doesn't handle Throwable, nor does a similar block earlier in the file. In case of say OutOfMemoryError it wouldn't be closed

with srowen's suggest, we could handle everything include exceptions and errors.

@srowen
Copy link
Member

srowen commented Sep 21, 2017

What other instances of try-catch might need improvement like this?

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #3929 has finished for PR 19277 at commit e3f8e0d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 22, 2017

Test build #3932 has started for PR 19277 at commit 2e5f21a.

@SparkQA
Copy link

SparkQA commented Sep 23, 2017

Test build #3933 has finished for PR 19277 at commit 2e5f21a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 4943ea5 Sep 24, 2017
@zuotingbing zuotingbing deleted the SPARK-22058 branch September 25, 2017 01:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants