diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 3bcf9535488b..6f9c7b9dc2d2 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -287,6 +287,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", + "zstd 0.11.2+zstd.1.5.2", + "zstd-safe 5.0.2+zstd.1.5.2", ] [[package]] @@ -709,6 +711,7 @@ dependencies = [ "url", "uuid", "xz2", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -1783,7 +1786,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -2323,9 +2326,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.108" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56e159d99e6c2b93995d171050271edb50ecc5288fbc7cc17de8fdce4e58c14" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -2878,13 +2881,32 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe 5.0.2+zstd.1.5.2", +] + [[package]] name = "zstd" version = "0.12.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" dependencies = [ - "zstd-safe", + "zstd-safe 6.0.4+zstd.1.5.4", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs index 1c31d61d143c..4aff7c7eb477 100644 --- a/datafusion/common/src/parsers.rs +++ b/datafusion/common/src/parsers.rs @@ -34,6 +34,8 @@ pub enum CompressionTypeVariant { BZIP2, /// Xz-ed file (liblzma) XZ, + /// Zstd-ed file, + ZSTD, /// Uncompressed file UNCOMPRESSED, } @@ -47,6 +49,7 @@ impl FromStr for CompressionTypeVariant { "GZIP" | "GZ" => Ok(Self::GZIP), "BZIP2" | "BZ2" => Ok(Self::BZIP2), "XZ" => Ok(Self::XZ), + "ZST" | "ZSTD" => Ok(Self::ZSTD), "" => Ok(Self::UNCOMPRESSED), _ => Err(ParserError::ParserError(format!( "Unsupported file compression type {s}" @@ -61,6 +64,7 @@ impl ToString for CompressionTypeVariant { Self::GZIP => "GZIP", Self::BZIP2 => "BZIP2", Self::XZ => "XZ", + Self::ZSTD => "ZSTD", Self::UNCOMPRESSED => "", } .to_string() diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 925d3d9d750b..50c64efbeae9 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -40,7 +40,7 @@ path = "src/lib.rs" [features] # Used to enable the avro format avro = ["apache-avro", "num-traits", "datafusion-common/avro"] -compression = ["xz2", "bzip2", "flate2", "async-compression"] +compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions"] default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "compression"] # Enables support for non-scalar, binary operations on dictionaries @@ -61,7 +61,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.14", optional = true } arrow = { version = "33.0.0", features = ["prettyprint"] } -async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "futures-io", "tokio"], optional = true } +async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "zstd", "futures-io", "tokio"], optional = true } async-trait = "0.1.41" bytes = "1.1" bzip2 = { version = "0.4.3", optional = true } @@ -101,6 +101,7 @@ tokio-util = { version = "0.7.4", features = ["io"] } url = "2.2" uuid = { version = "1.0", features = ["v4"] } xz2 = { version = "0.1", optional = true } +zstd = { version = "0.11", optional = true, default-features = false } [dev-dependencies] diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index 1a9973c68a64..59c95962a992 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -26,7 +26,7 @@ use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; #[cfg(feature = "compression")] use async_compression::tokio::bufread::{ BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, - XzDecoder as AsyncXzDecoder, + XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer, }; use bytes::Bytes; #[cfg(feature = "compression")] @@ -42,6 +42,8 @@ use std::str::FromStr; use tokio_util::io::{ReaderStream, StreamReader}; #[cfg(feature = "compression")] use xz2::read::XzDecoder; +#[cfg(feature = "compression")] +use zstd::Decoder as ZstdDecoder; use CompressionTypeVariant::*; /// Define each `FileType`/`FileCompressionType`'s extension @@ -62,6 +64,7 @@ impl GetExt for FileCompressionType { GZIP => ".gz".to_owned(), BZIP2 => ".bz2".to_owned(), XZ => ".xz".to_owned(), + ZSTD => ".zst".to_owned(), UNCOMPRESSED => "".to_owned(), } } @@ -95,6 +98,9 @@ impl FileCompressionType { /// Xz-ed file (liblzma) pub const XZ: Self = Self { variant: XZ }; + /// Zstd-ed file + pub const ZSTD: Self = Self { variant: ZSTD }; + /// Uncompressed file pub const UNCOMPRESSED: Self = Self { variant: UNCOMPRESSED, @@ -140,8 +146,13 @@ impl FileCompressionType { ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) .map_err(err_converter), ), + #[cfg(feature = "compression")] + ZSTD => Box::new( + ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s))) + .map_err(err_converter), + ), #[cfg(not(feature = "compression"))] - GZIP | BZIP2 | XZ => { + GZIP | BZIP2 | XZ | ZSTD => { return Err(DataFusionError::NotImplemented( "Compression feature is not enabled".to_owned(), )) @@ -162,8 +173,13 @@ impl FileCompressionType { BZIP2 => Box::new(BzDecoder::new(r)), #[cfg(feature = "compression")] XZ => Box::new(XzDecoder::new(r)), + #[cfg(feature = "compression")] + ZSTD => match ZstdDecoder::new(r) { + Ok(decoder) => Box::new(decoder), + Err(e) => return Err(DataFusionError::External(Box::new(e))), + }, #[cfg(not(feature = "compression"))] - GZIP | BZIP2 | XZ => { + GZIP | BZIP2 | XZ | ZSTD => { return Err(DataFusionError::NotImplemented( "Compression feature is not enabled".to_owned(), )) @@ -239,155 +255,90 @@ mod tests { #[test] fn get_ext_with_compression() { - let file_type = FileType::CSV; - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) - .unwrap(), - ".csv" - ); - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::GZIP) - .unwrap(), - ".csv.gz" - ); - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::XZ) - .unwrap(), - ".csv.xz" - ); - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::BZIP2) - .unwrap(), - ".csv.bz2" - ); - - let file_type = FileType::JSON; - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) - .unwrap(), - ".json" - ); - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::GZIP) - .unwrap(), - ".json.gz" - ); - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::XZ) - .unwrap(), - ".json.xz" - ); - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::BZIP2) - .unwrap(), - ".json.bz2" - ); - - let file_type = FileType::AVRO; - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) - .unwrap(), - ".avro" - ); - assert!(matches!( - file_type.get_ext_with_compression(FileCompressionType::GZIP), - Err(DataFusionError::Internal(_)) - )); - assert!(matches!( - file_type.get_ext_with_compression(FileCompressionType::BZIP2), - Err(DataFusionError::Internal(_)) - )); + for (file_type, compression, extension) in [ + (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"), + (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"), + (FileType::CSV, FileCompressionType::XZ, ".csv.xz"), + (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"), + (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"), + (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"), + (FileType::JSON, FileCompressionType::GZIP, ".json.gz"), + (FileType::JSON, FileCompressionType::XZ, ".json.xz"), + (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"), + (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"), + ] { + assert_eq!( + file_type.get_ext_with_compression(compression).unwrap(), + extension + ); + } - let file_type = FileType::PARQUET; - assert_eq!( - file_type - .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) - .unwrap(), - ".parquet" - ); - assert!(matches!( - file_type.get_ext_with_compression(FileCompressionType::GZIP), - Err(DataFusionError::Internal(_)) - )); - assert!(matches!( - file_type.get_ext_with_compression(FileCompressionType::BZIP2), - Err(DataFusionError::Internal(_)) - )); + // Cannot specify compression for these file types + for (file_type, extension) in + [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")] + { + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + extension + ); + for compression in [ + FileCompressionType::GZIP, + FileCompressionType::XZ, + FileCompressionType::BZIP2, + FileCompressionType::ZSTD, + ] { + assert!(matches!( + file_type.get_ext_with_compression(compression), + Err(DataFusionError::Internal(_)) + )); + } + } } #[test] fn from_str() { - assert_eq!(FileType::from_str("csv").unwrap(), FileType::CSV); - assert_eq!(FileType::from_str("CSV").unwrap(), FileType::CSV); - - assert_eq!(FileType::from_str("json").unwrap(), FileType::JSON); - assert_eq!(FileType::from_str("JSON").unwrap(), FileType::JSON); - - assert_eq!(FileType::from_str("avro").unwrap(), FileType::AVRO); - assert_eq!(FileType::from_str("AVRO").unwrap(), FileType::AVRO); - - assert_eq!(FileType::from_str("parquet").unwrap(), FileType::PARQUET); - assert_eq!(FileType::from_str("PARQUET").unwrap(), FileType::PARQUET); + for (ext, file_type) in [ + ("csv", FileType::CSV), + ("CSV", FileType::CSV), + ("json", FileType::JSON), + ("JSON", FileType::JSON), + ("avro", FileType::AVRO), + ("AVRO", FileType::AVRO), + ("parquet", FileType::PARQUET), + ("PARQUET", FileType::PARQUET), + ] { + assert_eq!(FileType::from_str(ext).unwrap(), file_type); + } assert!(matches!( FileType::from_str("Unknown"), Err(DataFusionError::NotImplemented(_)) )); - assert_eq!( - FileCompressionType::from_str("gz").unwrap(), - FileCompressionType::GZIP - ); - assert_eq!( - FileCompressionType::from_str("GZ").unwrap(), - FileCompressionType::GZIP - ); - assert_eq!( - FileCompressionType::from_str("gzip").unwrap(), - FileCompressionType::GZIP - ); - assert_eq!( - FileCompressionType::from_str("GZIP").unwrap(), - FileCompressionType::GZIP - ); - assert_eq!( - FileCompressionType::from_str("xz").unwrap(), - FileCompressionType::XZ - ); - assert_eq!( - FileCompressionType::from_str("XZ").unwrap(), - FileCompressionType::XZ - ); - assert_eq!( - FileCompressionType::from_str("bz2").unwrap(), - FileCompressionType::BZIP2 - ); - assert_eq!( - FileCompressionType::from_str("BZ2").unwrap(), - FileCompressionType::BZIP2 - ); - assert_eq!( - FileCompressionType::from_str("bzip2").unwrap(), - FileCompressionType::BZIP2 - ); - assert_eq!( - FileCompressionType::from_str("BZIP2").unwrap(), - FileCompressionType::BZIP2 - ); - - assert_eq!( - FileCompressionType::from_str("").unwrap(), - FileCompressionType::UNCOMPRESSED - ); + for (ext, compression_type) in [ + ("gz", FileCompressionType::GZIP), + ("GZ", FileCompressionType::GZIP), + ("gzip", FileCompressionType::GZIP), + ("GZIP", FileCompressionType::GZIP), + ("xz", FileCompressionType::XZ), + ("XZ", FileCompressionType::XZ), + ("bz2", FileCompressionType::BZIP2), + ("BZ2", FileCompressionType::BZIP2), + ("bzip2", FileCompressionType::BZIP2), + ("BZIP2", FileCompressionType::BZIP2), + ("zst", FileCompressionType::ZSTD), + ("ZST", FileCompressionType::ZSTD), + ("zstd", FileCompressionType::ZSTD), + ("ZSTD", FileCompressionType::ZSTD), + ("", FileCompressionType::UNCOMPRESSED), + ] { + assert_eq!( + FileCompressionType::from_str(ext).unwrap(), + compression_type + ); + } assert!(matches!( FileCompressionType::from_str("Unknown"), diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 337a54f42ef9..9197d8f3babf 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -345,7 +345,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn csv_exec_with_projection( @@ -400,7 +401,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn csv_exec_with_mixed_order_projection( @@ -455,7 +457,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn csv_exec_with_limit( @@ -510,7 +513,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn csv_exec_with_missing_column( @@ -553,7 +557,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn csv_exec_with_partition( @@ -688,7 +693,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn test_chunked_csv( diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 73cbe24d4402..3556774a8002 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -382,7 +382,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn nd_json_exec_file_without_projection( @@ -452,7 +453,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn nd_json_exec_file_with_missing_column( @@ -504,7 +506,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn nd_json_exec_file_projection( @@ -554,7 +557,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn nd_json_exec_file_mixed_order_projection( @@ -658,7 +662,8 @@ mod tests { case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), case(FileCompressionType::BZIP2), - case(FileCompressionType::XZ) + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) )] #[tokio::test] async fn test_chunked_json( diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 737893f51845..9dccde8a7345 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -36,6 +36,7 @@ use arrow::record_batch::RecordBatch; use bzip2::write::BzEncoder; #[cfg(feature = "compression")] use bzip2::Compression as BzCompression; +use datafusion_common::DataFusionError; #[cfg(feature = "compression")] use flate2::write::GzEncoder; #[cfg(feature = "compression")] @@ -49,6 +50,8 @@ use std::sync::Arc; use tempfile::TempDir; #[cfg(feature = "compression")] use xz2::write::XzEncoder; +#[cfg(feature = "compression")] +use zstd::Encoder as ZstdEncoder; pub fn create_table_dual() -> Arc { let dual_schema = Arc::new(Schema::new(vec![ @@ -124,14 +127,22 @@ pub fn partitioned_file_groups( #[cfg(feature = "compression")] FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)), #[cfg(feature = "compression")] + FileCompressionType::ZSTD => { + let encoder = ZstdEncoder::new(file, 0) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .auto_finish(); + Box::new(encoder) + } + #[cfg(feature = "compression")] FileCompressionType::BZIP2 => { Box::new(BzEncoder::new(file, BzCompression::default())) } #[cfg(not(feature = "compression"))] FileCompressionType::GZIP | FileCompressionType::BZIP2 - | FileCompressionType::XZ => { - panic!("GZIP compression is not supported in this build") + | FileCompressionType::XZ + | FileCompressionType::ZSTD => { + panic!("Compression is not supported in this build") } }; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9521ae6020f3..c3ef861eb3b4 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1591,7 +1591,7 @@ pub struct CreateExternalTable { pub if_not_exists: bool, /// SQL used to create the table, if available pub definition: Option, - /// File compression type (GZIP, BZIP2, XZ) + /// File compression type (GZIP, BZIP2, XZ, ZSTD) pub file_compression_type: CompressionTypeVariant, /// Table(provider) specific options pub options: HashMap, diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index d25268c87eb0..521466214ff8 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -399,7 +399,7 @@ impl<'a> DFParser<'a> { let token = self.parser.next_token(); match &token.token { Token::Word(w) => CompressionTypeVariant::from_str(&w.value), - _ => self.expected("one of GZIP, BZIP2, XZ", token), + _ => self.expected("one of GZIP, BZIP2, XZ, ZSTD", token), } } @@ -586,6 +586,7 @@ mod tests { ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE GZIP LOCATION 'foo.csv'", "GZIP"), ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE BZIP2 LOCATION 'foo.csv'", "BZIP2"), ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE XZ LOCATION 'foo.csv'", "XZ"), + ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE ZSTD LOCATION 'foo.csv'", "ZSTD"), ]; for (sql, file_compression_type) in sqls { let expected = Statement::CreateExternalTable(CreateExternalTable {