Skip to content

Commit

Permalink
[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug
Browse files Browse the repository at this point in the history
This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent and to guard against write-after-`close()` bugs. This is a workaround for xerial/snappy-java#107, a bug where a non-idempotent `close()` method can lead to stream corruption. We can remove this workaround if we upgrade to a snappy-java version that contains my fix for this bug, but in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660

(cherry picked from commit f2cc6b5)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
	core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
  • Loading branch information
JoshRosen committed May 17, 2015
1 parent 91442fd commit 0a63103
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.io

import java.io.{InputStream, OutputStream}
import java.io.{IOException, InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
Expand Down Expand Up @@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
new SnappyOutputStream(s, blockSize)
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
*/
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {

private[this] var closed: Boolean = false

override def write(b: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte]): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b, off, len)
}

override def flush(): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.flush()
}

override def close(): Unit = {
if (!closed) {
closed = true
os.close()
}
}
}

0 comments on commit 0a63103

Please sign in to comment.