diff --git a/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Compressor.java b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Compressor.java new file mode 100644 index 0000000000000..092ed59c6bb14 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Compressor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.shaded.net.jpountz.lz4; + +/** + * TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after + * Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries. + * This does not need implement all net.jpountz.lz4.LZ4Compressor API, just the ones used + * by Hadoop Lz4Compressor. + */ +public final class LZ4Compressor { + + private net.jpountz.lz4.LZ4Compressor lz4Compressor; + + public LZ4Compressor(net.jpountz.lz4.LZ4Compressor lz4Compressor) { + this.lz4Compressor = lz4Compressor; + } + + public void compress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) { + lz4Compressor.compress(src, dest); + } +} diff --git a/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory.java b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory.java new file mode 100644 index 0000000000000..61829b2728bce --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4Factory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.shaded.net.jpountz.lz4; + +/** + * TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after + * Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries. + * This does not need implement all net.jpountz.lz4.LZ4Factory API, just the ones used by + * Hadoop Lz4Compressor. + */ +public final class LZ4Factory { + + private net.jpountz.lz4.LZ4Factory lz4Factory; + + public LZ4Factory(net.jpountz.lz4.LZ4Factory lz4Factory) { + this.lz4Factory = lz4Factory; + } + + public static LZ4Factory fastestInstance() { + return new LZ4Factory(net.jpountz.lz4.LZ4Factory.fastestInstance()); + } + + public LZ4Compressor highCompressor() { + return new LZ4Compressor(lz4Factory.highCompressor()); + } + + public LZ4Compressor fastCompressor() { + return new LZ4Compressor(lz4Factory.fastCompressor()); + } + + public LZ4SafeDecompressor safeDecompressor() { + return new LZ4SafeDecompressor(lz4Factory.safeDecompressor()); + } +} diff --git a/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4SafeDecompressor.java b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4SafeDecompressor.java new file mode 100644 index 0000000000000..cd3dd6f060f52 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/shaded/net/jpountz/lz4/LZ4SafeDecompressor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.shaded.net.jpountz.lz4; + +/** + * TODO(SPARK-36679): A temporary workaround for SPARK-36669. We should remove this after + * Hadoop 3.3.2 release which fixes the LZ4 relocation in shaded Hadoop client libraries. + * This does not need implement all net.jpountz.lz4.LZ4SafeDecompressor API, just the ones + * used by Hadoop Lz4Decompressor. + */ +public final class LZ4SafeDecompressor { + private net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor; + + public LZ4SafeDecompressor(net.jpountz.lz4.LZ4SafeDecompressor lz4Decompressor) { + this.lz4Decompressor = lz4Decompressor; + } + + public void decompress(java.nio.ByteBuffer src, java.nio.ByteBuffer dest) { + lz4Decompressor.decompress(src, dest); + } +} diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 67a9764ee63a9..12022846fe1b0 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,7 +28,7 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ -import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec} +import org.apache.hadoop.io.compress.{BZip2Codec, CompressionCodec, DefaultCodec, Lz4Codec} import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} @@ -136,9 +136,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } // Hadoop "gzip" and "zstd" codecs require native library installed for sequence files - // "snappy" and "lz4" codecs do not work due to SPARK-36669 and SPARK-36681. - Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2")).foreach { - case (codec, codecName) => + // "snappy" codec does not work due to SPARK-36681. + Seq((new DefaultCodec(), "default"), (new BZip2Codec(), "bzip2"), (new Lz4Codec(), "lz4")) + .foreach { case (codec, codecName) => runSequenceFileCodecTest(codec, codecName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 92b887e948da9..3c226d64b78c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -56,13 +56,12 @@ class ParquetCodecSuite extends FileSourceCodecSuite { override def format: String = "parquet" override val codecConfigName: String = SQLConf.PARQUET_COMPRESSION.key // Exclude "lzo" because it is GPL-licenced so not included in Hadoop. - // TODO(SPARK-36669): "lz4" codec fails due to HADOOP-17891. override protected def availableCodecs: Seq[String] = if (System.getProperty("os.arch") == "aarch64") { // Exclude "brotli" due to PARQUET-1975. - Seq("none", "uncompressed", "snappy", "gzip", "zstd") + Seq("none", "uncompressed", "snappy", "lz4", "gzip", "zstd") } else { - Seq("none", "uncompressed", "snappy", "gzip", "brotli", "zstd") + Seq("none", "uncompressed", "snappy", "lz4", "gzip", "brotli", "zstd") } }