Skip to content

Commit

Permalink
[SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libr…
Browse files Browse the repository at this point in the history
…aries fail to initialize

## What changes were proposed in this pull request?

This PR fixes a performance problem in environments where `lz4-java`'s native JNI libraries fail to initialize.

Spark's uses `lz4-java` for LZ4 compression. Under the hood, the `LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call `LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls back on Java implementations in case the JNI library cannot be loaded or initialized.

If the LZ4 JNI libraries are present on the library load path (`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by shading) then an exception will be thrown and caught, triggering fallback to `fastestJavaInstance()` (a non-JNI implementation).

Unfortunately, the LZ4 library does not cache the fact that the JNI library failed during initialization, so every call to `LZ4Factory.fastestInstance()` re-attempts (and fails) to initialize the native code. These initialization attempts are performed in a `static synchronized` method, so exceptions from failures are thrown while holding shared monitors and this causes monitor-contention performance issues. Here's an example stack trace showing the problem:

```java

java.lang.Throwable.fillInStackTrace(Native Method)
java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.lang.NoClassDefFoundError441628568})
java.lang.Throwable.<init>(Throwable.java:265)
java.lang.Error.<init>(Error.java:70)
java.lang.LinkageError.<init>(LinkageError.java:55)
java.lang.NoClassDefFoundError.<init>(NoClassDefFoundError.java:59)
shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36)
shaded.net.jpountz.lz4.LZ4Factory.<init>(LZ4Factory.java:200)
shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51)
shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => holding Monitor(java.lang.Class1475983836})
shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157)
shaded.net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:135)
org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122)
org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156)
org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131)
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120)
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211)
[...]
```

To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to call `fastestInstance()` itself and cache the result (which is safe because these factories [are thread-safe](lz4/lz4-java#82)).

## How was this patch tested?

Existing unit tests.

Closes #24905 from JoshRosen/lz4-factory-flags.

Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: Josh Rosen <rosenville@gmail.com>
  • Loading branch information
JoshRosen and joshrosen-stripe committed Jun 19, 2019
1 parent fc65e0f commit 6b27ad5
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Expand Up @@ -22,7 +22,8 @@ import java.util.Locale

import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import net.jpountz.xxhash.XXHashFactory
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -118,14 +119,35 @@ private[spark] object CompressionCodec {
@DeveloperApi
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {

// SPARK-28102: if the LZ4 JNI libraries fail to initialize then `fastestInstance()` calls fall
// back to non-JNI implementations but do not remember the fact that JNI failed to load, so
// repeated calls to `fastestInstance()` will cause performance problems because the JNI load
// will be repeatedly re-attempted and that path is slow because it throws exceptions from a
// static synchronized method (causing lock contention). To avoid this problem, we cache the
// result of the `fastestInstance()` calls ourselves (both factories are thread-safe).
@transient private[this] lazy val lz4Factory: LZ4Factory = LZ4Factory.fastestInstance()
@transient private[this] lazy val xxHashFactory: XXHashFactory = XXHashFactory.fastestInstance()

private[this] val defaultSeed: Int = 0x9747b28c // LZ4BlockOutputStream.DEFAULT_SEED

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
new LZ4BlockOutputStream(s, blockSize)
val syncFlush = false
new LZ4BlockOutputStream(
s,
blockSize,
lz4Factory.fastCompressor(),
xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
syncFlush)
}

override def compressedInputStream(s: InputStream): InputStream = {
val disableConcatenationOfByteStream = false
new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
new LZ4BlockInputStream(
s,
lz4Factory.fastDecompressor(),
xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
disableConcatenationOfByteStream)
}
}

Expand Down

0 comments on commit 6b27ad5

Please sign in to comment.