From 8a14d38523e2b35b7f38503bb70bb9934e229cf3 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 14 Jul 2014 23:38:49 -0700 Subject: [PATCH 1/2] [SPARK-2399] Add support for LZ4 compression. --- core/pom.xml | 4 ++++ .../apache/spark/io/CompressionCodec.scala | 22 +++++++++++++++++++ .../spark/io/CompressionCodecSuite.scala | 6 +++++ docs/configuration.md | 10 ++++++++- pom.xml | 5 +++++ 5 files changed, 46 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index 4ed920a750fff..1054cec4d77bb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -114,6 +114,10 @@ org.xerial.snappy snappy-java + + net.jpountz.lz4 + lz4 + com.twitter chill_${scala.binary.version} diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 4b0fe1ab82999..33402c927c732 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -20,6 +20,7 @@ package org.apache.spark.io import java.io.{InputStream, OutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -59,6 +60,27 @@ private[spark] object CompressionCodec { } +/** + * :: DeveloperApi :: + * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]]. + * Block size can be configured by `spark.io.compression.lz4.block.size`. + * + * Note: The wire protocol for this codec is not guaranteed to be compatible across versions + * of Spark. This is intended for use as an internal compression utility within a single Spark + * application. + */ +@DeveloperApi +class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { + + override def compressedOutputStream(s: OutputStream): OutputStream = { + val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768) + new LZ4BlockOutputStream(s, blockSize) + } + + override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s) +} + + /** * :: DeveloperApi :: * LZF implementation of [[org.apache.spark.io.CompressionCodec]]. diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 68a0ea36aa545..42fc395fa698d 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -50,6 +50,12 @@ class CompressionCodecSuite extends FunSuite { testCodec(codec) } + test("lz4 compression codec") { + val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName) + assert(codec.getClass === classOf[LZ4CompressionCodec]) + testCodec(codec) + } + test("lzf compression codec") { val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName) assert(codec.getClass === classOf[LZFCompressionCodec]) diff --git a/docs/configuration.md b/docs/configuration.md index 07aa4c035446b..bbf24f45ca2b7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -350,7 +350,15 @@ Apart from these, the following properties are also available, and may be useful 32768 Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec - is used. + is used. Lowering this block size will also lower shuffle memory usage when Snappy is used. + + + + spark.io.compression.lz4.block.size + 32768 + + Block size (in bytes) used in Snappy compression, in the case when LZ4 compression codec + is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used. diff --git a/pom.xml b/pom.xml index fa80707d0929c..d570f3e6b9321 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,11 @@ snappy-java 1.0.5 + + net.jpountz.lz4 + lz4 + 1.2.0 + com.clearspring.analytics stream From 6c8fefeb2fff882aad7e464f2005b180af0be92c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 14 Jul 2014 23:41:30 -0700 Subject: [PATCH 2/2] Fixed typo. --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index bbf24f45ca2b7..19fd980e6088f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -357,7 +357,7 @@ Apart from these, the following properties are also available, and may be useful spark.io.compression.lz4.block.size 32768 - Block size (in bytes) used in Snappy compression, in the case when LZ4 compression codec + Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.