-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug #6176
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.io | ||
|
||
import java.io.{InputStream, OutputStream} | ||
import java.io.{IOException, InputStream, OutputStream} | ||
|
||
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} | ||
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} | ||
|
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { | |
|
||
override def compressedOutputStream(s: OutputStream): OutputStream = { | ||
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt | ||
new SnappyOutputStream(s, blockSize) | ||
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize)) | ||
} | ||
|
||
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) | ||
} | ||
|
||
/** | ||
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close | ||
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version | ||
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107. | ||
*/ | ||
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream { | ||
|
||
private[this] var closed: Boolean = false | ||
|
||
override def write(b: Int): Unit = { | ||
if (closed) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are these guarding against? I understand you'd need it in close, but here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the unlikely event that we wrote to a closed stream, this would end up mutating a buffer that might be shared. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that the upstream fix also incorporates this: xerial/snappy-java#108 (comment) |
||
throw new IOException("Stream is closed") | ||
} | ||
os.write(b) | ||
} | ||
|
||
override def write(b: Array[Byte]): Unit = { | ||
if (closed) { | ||
throw new IOException("Stream is closed") | ||
} | ||
os.write(b) | ||
} | ||
|
||
override def write(b: Array[Byte], off: Int, len: Int): Unit = { | ||
if (closed) { | ||
throw new IOException("Stream is closed") | ||
} | ||
os.write(b, off, len) | ||
} | ||
|
||
override def flush(): Unit = { | ||
if (closed) { | ||
throw new IOException("Stream is closed") | ||
} | ||
os.flush() | ||
} | ||
|
||
override def close(): Unit = { | ||
if (!closed) { | ||
closed = true | ||
os.close() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just double checking - this doesn't need to be volatile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. I guess this needs to be volatile in case we're performing cleanup in another thread. @rxin, if this is volatile, won't that make the
write()
checks way more expensive?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already do per-record checking of volatile vars.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContextImpl.scala#L44
On Fri, May 15, 2015 at 4:40 PM, Josh Rosen notifications@github.com
wrote:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this offline: as far as we know, this is only ever executed in a single-threaded context (since tasks are single-threaded and their stop / cleanup logic is performed in that same thread), so the volatile should not be necessary.