Skip to content

Commit

Permalink
[SPARK-36669][SQL] Add Lz4 wrappers for Hadoop Lz4 codec
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This patch proposes to add a few LZ4 wrapper classes for Parquet Lz4 compression output that uses Hadoop Lz4 codec.

### Why are the changes needed?

Currently we use Hadop 3.3.1's shaded client libraries. Lz4 is a provided dependency in Hadoop Common 3.3.1 for Lz4Codec. But it isn't excluded from relocation in these libraries. So to use lz4 as Parquet codec, we will hit the exception even we include lz4 as dependency.

```
[info]   Cause: java.lang.NoClassDefFoundError: org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory
[info]   at org.apache.hadoop.io.compress.lz4.Lz4Compressor.<init>(Lz4Compressor.java:66)
[info]   at org.apache.hadoop.io.compress.Lz4Codec.createCompressor(Lz4Codec.java:119)
[info]   at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152)
[info]   at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
```

Before the issue is fixed at Hadoop new release, we can add a few wrapper classes for Lz4 codec.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Modified test.

Closes #33940 from viirya/lz4-wrappers.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 6bcf330)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
viirya committed Sep 9, 2021
1 parent d22182e commit b52fbee
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.shaded.net.jpountz.lz4;

/**
* TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after
* Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries.
* This does not need implement all net.jpountz.lz4.LZ4Compressor API, just the ones used
* by Hadoop Lz4Compressor.
*/
public final class LZ4Compressor {

private net.jpountz.lz4.LZ4Compressor lz4Compressor;

public LZ4Compressor(net.jpountz.lz4.LZ4Compressor lz4Compressor) {
this.lz4Compressor = lz4Compressor;
}

public void compress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) {
lz4Compressor.compress(src, dest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.shaded.net.jpountz.lz4;

/**
* TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after
* Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries.
* This does not need implement all net.jpountz.lz4.LZ4Factory API, just the ones used by
* Hadoop Lz4Compressor.
*/
public final class LZ4Factory {

private net.jpountz.lz4.LZ4Factory lz4Factory;

public LZ4Factory(net.jpountz.lz4.LZ4Factory lz4Factory) {
this.lz4Factory = lz4Factory;
}

public static LZ4Factory fastestInstance() {
return new LZ4Factory(net.jpountz.lz4.LZ4Factory.fastestInstance());
}

public LZ4Compressor highCompressor() {
return new LZ4Compressor(lz4Factory.highCompressor());
}

public LZ4Compressor fastCompressor() {
return new LZ4Compressor(lz4Factory.fastCompressor());
}

public LZ4SafeDecompressor safeDecompressor() {
return new LZ4SafeDecompressor(lz4Factory.safeDecompressor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.shaded.net.jpountz.lz4;

/**
* TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after
* Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries.
* This does not need implement all net.jpountz.lz4.LZ4SafeDecompressor API, just the ones
* used by Hadoop Lz4Decompressor.
*/
public final class LZ4SafeDecompressor {
private net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor;

public LZ4SafeDecompressor(net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor) {
this.lz4Decompressor = lz4Decompressor;
}

public void decompress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) {
lz4Decompressor.decompress(src, dest);
}
}
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec}
import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec, Lz4Codec}
import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
Expand Down Expand Up @@ -136,9 +136,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}

// Hadoop "gzip" and "zstd" codecs require native library installed for sequence files
// "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach {
case (codec, codecName) =>
// "snappy" codec does not work due to SPARK-36681.
Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new Lz4Codec(), "lz4"))
.foreach { case (codec, codecName) =>
runSequenceFileCodecTest(codec, codecName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
override def format: String = "parquet"
override val codecConfigName: String = SQLConf.PARQUET_COMPRESSION.key
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// TODO(SPARK-36669): "lz4" codec fails due to HADOOP-17891.
override protected def availableCodecs: Seq[String] =
if (System.getProperty("os.arch") == "aarch64") {
// Exclude "brotli" due to PARQUET-1975.
Seq("none", "uncompressed", "snappy", "gzip", "zstd")
Seq("none", "uncompressed", "snappy", "lz4", "gzip", "zstd")
} else {
Seq("none", "uncompressed", "snappy", "gzip", "brotli", "zstd")
Seq("none", "uncompressed", "snappy", "lz4", "gzip", "brotli", "zstd")
}
}

Expand Down

0 comments on commit b52fbee

Please sign in to comment.