Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug #6176

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.io

import java.io.{InputStream, OutputStream}
import java.io.{IOException, InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
Expand Down Expand Up @@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
new SnappyOutputStream(s, blockSize)
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
*/
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {

private[this] var closed: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking - this doesn't need to be volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. I guess this needs to be volatile in case we're performing cleanup in another thread. @rxin, if this is volatile, won't that make the write() checks way more expensive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already do per-record checking of volatile vars.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContextImpl.scala#L44

On Fri, May 15, 2015 at 4:40 PM, Josh Rosen notifications@github.com
wrote:

In core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
#6176 (comment):

}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}
+
+/**

  • * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
  • * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
  • * of snappy-java that contains the fix for SnappyOutputStream.close() is not idempotent xerial/snappy-java#107.
  • */
    +private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
    +
  • private[this] var closed: Boolean = false

Ah, good point. I guess this needs to be volatile in case we're performing
cleanup in another thread. @rxin https://github.com/rxin, if this is
volatile, won't that make the write() checks way more expensive?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/6176/files#r30454371.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline: as far as we know, this is only ever executed in a single-threaded context (since tasks are single-threaded and their stop / cleanup logic is performed in that same thread), so the volatile should not be necessary.


override def write(b: Int): Unit = {
if (closed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these guarding against? I understand you'd need it in close, but here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unlikely event that we wrote to a closed stream, this would end up mutating a buffer that might be shared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the upstream fix also incorporates this: xerial/snappy-java#108 (comment)

throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte]): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b, off, len)
}

override def flush(): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.flush()
}

override def close(): Unit = {
if (!closed) {
closed = true
os.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -97,13 +96,6 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
synchronized (CachedBufferAllocator.class) {
CachedBufferAllocator.queueTable.clear();
}
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
Expand Down