Skip to content

Commit

Permalink
[SPARK-34479][SQL] Add zstandard codec to Avro compression codec list
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Avro add zstandard codec since AVRO-2195. This pr add zstandard codec to Avro compression codec list.

### Why are the changes needed?

To make Avro support zstandard codec.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31673 from wangyum/SPARK-34479.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
wangyum authored and dongjoon-hyun committed Feb 27, 2021
1 parent 397b843 commit 54c053a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._

import org.apache.avro.Schema
import org.apache.avro.file.{DataFileReader, FileReader}
import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC}
import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC, ZSTANDARD_CODEC}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
Expand Down Expand Up @@ -109,7 +109,7 @@ private[sql] object AvroUtils extends Logging {
logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
DEFLATE_CODEC
case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC | ZSTANDARD_CODEC) => codec
case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
}
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
Expand Down
Expand Up @@ -479,6 +479,7 @@ abstract class AvroSuite
val xzDir = s"$dir/xz"
val deflateDir = s"$dir/deflate"
val snappyDir = s"$dir/snappy"
val zstandardDir = s"$dir/zstandard"

val df = spark.read.format("avro").load(testAvro)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed")
Expand All @@ -492,17 +493,21 @@ abstract class AvroSuite
df.write.format("avro").save(deflateDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "snappy")
df.write.format("avro").save(snappyDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "zstandard")
df.write.format("avro").save(zstandardDir)

val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
val bzip2Size = FileUtils.sizeOfDirectory(new File(bzip2Dir))
val xzSize = FileUtils.sizeOfDirectory(new File(xzDir))
val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir))
val zstandardSize = FileUtils.sizeOfDirectory(new File(zstandardDir))

assert(uncompressSize > deflateSize)
assert(snappySize > deflateSize)
assert(snappySize > bzip2Size)
assert(bzip2Size > xzSize)
assert(uncompressSize > zstandardSize)
}
}

Expand Down
Expand Up @@ -2470,10 +2470,10 @@ object SQLConf {

val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files. Supported codecs: " +
"uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
"uncompressed, deflate, snappy, bzip2, xz and zstandard. Default codec is snappy.")
.version("2.4.0")
.stringConf
.checkValues(Set("uncompressed", "deflate", "snappy", "bzip2", "xz"))
.checkValues(Set("uncompressed", "deflate", "snappy", "bzip2", "xz", "zstandard"))
.createWithDefault("snappy")

val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
Expand Down

0 comments on commit 54c053a

Please sign in to comment.