diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 99a171d72e98..9a7605d885c8 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" [features] avro = ["apache-avro"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] -default = ["compression"] +default = ["compression", "parquet"] pyarrow = ["pyo3", "arrow/pyarrow"] [dependencies] diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 5fad7433a57f..140522d6603b 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -73,6 +73,9 @@ pub enum DataFusionError { /// This error happens whenever a plan is not valid. Examples include /// impossible casts. Plan(String), + /// This error happens when an invalid or unsupported option is passed + /// in a SQL statement + Configuration(String), /// This error happens with schema-related errors, such as schema inference not possible /// and non-unique column names. SchemaError(SchemaError), @@ -288,6 +291,9 @@ impl Display for DataFusionError { DataFusionError::SQL(ref desc) => { write!(f, "SQL error: {desc:?}") } + DataFusionError::Configuration(ref desc) => { + write!(f, "Invalid or Unsupported Configuration: {desc}") + } DataFusionError::NotImplemented(ref desc) => { write!(f, "This feature is not implemented: {desc}") } @@ -338,6 +344,7 @@ impl Error for DataFusionError { DataFusionError::SQL(e) => Some(e), DataFusionError::NotImplemented(_) => None, DataFusionError::Internal(_) => None, + DataFusionError::Configuration(_) => None, DataFusionError::Plan(_) => None, DataFusionError::SchemaError(e) => Some(e), DataFusionError::Execution(_) => None, diff --git a/datafusion/common/src/file_options/arrow_writer.rs b/datafusion/common/src/file_options/arrow_writer.rs new file mode 100644 index 000000000000..a30e6d800e20 --- /dev/null +++ b/datafusion/common/src/file_options/arrow_writer.rs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how Arrow files should be written + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, +}; + +use super::StatementOptions; + +#[derive(Clone, Debug)] +pub struct ArrowWriterOptions {} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for ArrowWriterOptions { + type Error = DataFusionError; + + fn try_from(_value: (&ConfigOptions, &StatementOptions)) -> Result { + Ok(ArrowWriterOptions {}) + } +} diff --git a/datafusion/common/src/file_options/avro_writer.rs b/datafusion/common/src/file_options/avro_writer.rs new file mode 100644 index 000000000000..2e3a64705842 --- /dev/null +++ b/datafusion/common/src/file_options/avro_writer.rs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how avro files should be written + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, +}; + +use super::StatementOptions; + +#[derive(Clone, Debug)] +pub struct AvroWriterOptions {} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for AvroWriterOptions { + type Error = DataFusionError; + + fn try_from(_value: (&ConfigOptions, &StatementOptions)) -> Result { + Ok(AvroWriterOptions {}) + } +} diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs new file mode 100644 index 000000000000..ebf177fdce98 --- /dev/null +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how csv files should be written + +use std::str::FromStr; + +use arrow::csv::WriterBuilder; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing CSV files +#[derive(Clone, Debug)] +pub struct CsvWriterOptions { + /// Struct from the arrow crate which contains all csv writing related settings + pub writer_options: WriterBuilder, + /// Compression to apply after ArrowWriter serializes RecordBatches. + /// This compression is applied by DataFusion not the ArrowWriter itself. + pub compression: CompressionTypeVariant, + /// Indicates whether WriterBuilder.has_header() is set to true. + /// This is duplicative as WriterBuilder also stores this information. + /// However, WriterBuilder does not allow public read access to the + /// has_header parameter. + pub has_header: bool, + // TODO: expose a way to read has_header in arrow create + // https://github.com/apache/arrow-rs/issues/4735 +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { + type Error = DataFusionError; + + fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result { + let _configs = value.0; + let statement_options = value.1; + let mut has_header = true; + let mut builder = WriterBuilder::default(); + let mut compression = CompressionTypeVariant::UNCOMPRESSED; + for (option, value) in &statement_options.options { + builder = match option.to_lowercase().as_str(){ + "header" => { + has_header = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?; + builder.has_headers(has_header) + }, + "date_format" => builder.with_date_format(value.to_owned()), + "datetime_format" => builder.with_datetime_format(value.to_owned()), + "timestamp_format" => builder.with_timestamp_format(value.to_owned()), + "time_format" => builder.with_time_format(value.to_owned()), + "rfc3339" => { + let value_bool = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?; + if value_bool{ + builder.with_rfc3339() + } else{ + builder + } + }, + "null_value" => builder.with_null(value.to_owned()), + "compression" => { + compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?; + builder + }, + "delimeter" => { + // Ignore string literal single quotes passed from sql parsing + let value = value.replace('\'', ""); + let chars: Vec = value.chars().collect(); + if chars.len()>1{ + return Err(DataFusionError::Configuration(format!( + "CSV Delimeter Option must be a single char, got: {}", value + ))) + } + builder.with_delimiter(chars[0].try_into().map_err(|_| { + DataFusionError::Internal( + "Unable to convert CSV delimiter into u8".into(), + ) + })?) + }, + _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for CSV format!"))) + } + } + Ok(CsvWriterOptions { + has_header, + writer_options: builder, + compression, + }) + } +} diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs new file mode 100644 index 000000000000..7d12a267cacb --- /dev/null +++ b/datafusion/common/src/file_options/file_type.rs @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File type abstraction + +use crate::error::{DataFusionError, Result}; +#[cfg(feature = "compression")] +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder, + GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder, + XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder, + ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder, +}; + +use crate::parsers::CompressionTypeVariant; +#[cfg(feature = "compression")] +use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder}; +use bytes::Bytes; +#[cfg(feature = "compression")] +use bzip2::read::MultiBzDecoder; +#[cfg(feature = "compression")] +use flate2::read::MultiGzDecoder; + +use core::fmt; +use futures::stream::BoxStream; +use futures::StreamExt; +#[cfg(feature = "compression")] +use futures::TryStreamExt; +use std::fmt::Display; +use std::str::FromStr; +use tokio::io::AsyncWrite; +#[cfg(feature = "compression")] +use tokio_util::io::{ReaderStream, StreamReader}; +#[cfg(feature = "compression")] +use xz2::read::XzDecoder; +#[cfg(feature = "compression")] +use zstd::Decoder as ZstdDecoder; +use CompressionTypeVariant::*; + +/// The default file extension of arrow files +pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow"; +/// The default file extension of avro files +pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; +/// The default file extension of csv files +pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; +/// The default file extension of json files +pub const DEFAULT_JSON_EXTENSION: &str = ".json"; +/// The default file extension of parquet files +pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; + +/// Define each `FileType`/`FileCompressionType`'s extension +pub trait GetExt { + /// File extension getter + fn get_ext(&self) -> String; +} + +/// Readable file compression type +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FileCompressionType { + variant: CompressionTypeVariant, +} + +impl GetExt for FileCompressionType { + fn get_ext(&self) -> String { + match self.variant { + GZIP => ".gz".to_owned(), + BZIP2 => ".bz2".to_owned(), + XZ => ".xz".to_owned(), + ZSTD => ".zst".to_owned(), + UNCOMPRESSED => "".to_owned(), + } + } +} + +impl From for FileCompressionType { + fn from(t: CompressionTypeVariant) -> Self { + Self { variant: t } + } +} + +impl FromStr for FileCompressionType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let variant = CompressionTypeVariant::from_str(s).map_err(|_| { + DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}")) + })?; + Ok(Self { variant }) + } +} + +/// `FileCompressionType` implementation +impl FileCompressionType { + /// Gzip-ed file + pub const GZIP: Self = Self { variant: GZIP }; + + /// Bzip2-ed file + pub const BZIP2: Self = Self { variant: BZIP2 }; + + /// Xz-ed file (liblzma) + pub const XZ: Self = Self { variant: XZ }; + + /// Zstd-ed file + pub const ZSTD: Self = Self { variant: ZSTD }; + + /// Uncompressed file + pub const UNCOMPRESSED: Self = Self { + variant: UNCOMPRESSED, + }; + + /// The file is compressed or not + pub const fn is_compressed(&self) -> bool { + self.variant.is_compressed() + } + + /// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`. + pub fn convert_to_compress_stream( + &self, + s: BoxStream<'static, Result>, + ) -> Result>> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => s.boxed(), + }) + } + + /// Wrap the given `AsyncWrite` so that it performs compressed writes + /// according to this `FileCompressionType`. + pub fn convert_async_writer( + &self, + w: Box, + ) -> Result> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => Box::new(GzipEncoder::new(w)), + #[cfg(feature = "compression")] + BZIP2 => Box::new(BzEncoder::new(w)), + #[cfg(feature = "compression")] + XZ => Box::new(XzEncoder::new(w)), + #[cfg(feature = "compression")] + ZSTD => Box::new(ZstdEncoder::new(w)), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => w, + }) + } + + /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. + pub fn convert_stream( + &self, + s: BoxStream<'static, Result>, + ) -> Result>> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => s.boxed(), + }) + } + + /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`. + pub fn convert_read( + &self, + r: T, + ) -> Result> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => Box::new(MultiGzDecoder::new(r)), + #[cfg(feature = "compression")] + BZIP2 => Box::new(MultiBzDecoder::new(r)), + #[cfg(feature = "compression")] + XZ => Box::new(XzDecoder::new_multi_decoder(r)), + #[cfg(feature = "compression")] + ZSTD => match ZstdDecoder::new(r) { + Ok(decoder) => Box::new(decoder), + Err(e) => return Err(DataFusionError::External(Box::new(e))), + }, + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => Box::new(r), + }) + } +} + +/// Readable file type +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum FileType { + /// Apache Arrow file + ARROW, + /// Apache Avro file + AVRO, + /// Apache Parquet file + PARQUET, + /// CSV file + CSV, + /// JSON file + JSON, +} + +impl GetExt for FileType { + fn get_ext(&self) -> String { + match self { + FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), + FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), + FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), + FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), + FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), + } + } +} + +impl Display for FileType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let out = match self { + FileType::CSV => "csv", + FileType::JSON => "json", + FileType::PARQUET => "parquet", + FileType::AVRO => "avro", + FileType::ARROW => "arrow", + }; + write!(f, "{}", out) + } +} + +impl FromStr for FileType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let s = s.to_uppercase(); + match s.as_str() { + "ARROW" => Ok(FileType::ARROW), + "AVRO" => Ok(FileType::AVRO), + "PARQUET" => Ok(FileType::PARQUET), + "CSV" => Ok(FileType::CSV), + "JSON" | "NDJSON" => Ok(FileType::JSON), + _ => Err(DataFusionError::NotImplemented(format!( + "Unknown FileType: {s}" + ))), + } + } +} + +impl FileType { + /// Given a `FileCompressionType`, return the `FileType`'s extension with compression suffix + pub fn get_ext_with_compression(&self, c: FileCompressionType) -> Result { + let ext = self.get_ext(); + + match self { + FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), + FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant { + UNCOMPRESSED => Ok(ext), + _ => Err(DataFusionError::Internal( + "FileCompressionType can be specified for CSV/JSON FileType.".into(), + )), + }, + } + } +} + +#[cfg(test)] +mod tests { + use crate::error::DataFusionError; + use crate::file_options::file_type::{FileCompressionType, FileType}; + use std::str::FromStr; + + #[test] + fn get_ext_with_compression() { + for (file_type, compression, extension) in [ + (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"), + (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"), + (FileType::CSV, FileCompressionType::XZ, ".csv.xz"), + (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"), + (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"), + (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"), + (FileType::JSON, FileCompressionType::GZIP, ".json.gz"), + (FileType::JSON, FileCompressionType::XZ, ".json.xz"), + (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"), + (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"), + ] { + assert_eq!( + file_type.get_ext_with_compression(compression).unwrap(), + extension + ); + } + + // Cannot specify compression for these file types + for (file_type, extension) in + [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")] + { + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + extension + ); + for compression in [ + FileCompressionType::GZIP, + FileCompressionType::XZ, + FileCompressionType::BZIP2, + FileCompressionType::ZSTD, + ] { + assert!(matches!( + file_type.get_ext_with_compression(compression), + Err(DataFusionError::Internal(_)) + )); + } + } + } + + #[test] + fn from_str() { + for (ext, file_type) in [ + ("csv", FileType::CSV), + ("CSV", FileType::CSV), + ("json", FileType::JSON), + ("JSON", FileType::JSON), + ("avro", FileType::AVRO), + ("AVRO", FileType::AVRO), + ("parquet", FileType::PARQUET), + ("PARQUET", FileType::PARQUET), + ] { + assert_eq!(FileType::from_str(ext).unwrap(), file_type); + } + + assert!(matches!( + FileType::from_str("Unknown"), + Err(DataFusionError::NotImplemented(_)) + )); + + for (ext, compression_type) in [ + ("gz", FileCompressionType::GZIP), + ("GZ", FileCompressionType::GZIP), + ("gzip", FileCompressionType::GZIP), + ("GZIP", FileCompressionType::GZIP), + ("xz", FileCompressionType::XZ), + ("XZ", FileCompressionType::XZ), + ("bz2", FileCompressionType::BZIP2), + ("BZ2", FileCompressionType::BZIP2), + ("bzip2", FileCompressionType::BZIP2), + ("BZIP2", FileCompressionType::BZIP2), + ("zst", FileCompressionType::ZSTD), + ("ZST", FileCompressionType::ZSTD), + ("zstd", FileCompressionType::ZSTD), + ("ZSTD", FileCompressionType::ZSTD), + ("", FileCompressionType::UNCOMPRESSED), + ] { + assert_eq!( + FileCompressionType::from_str(ext).unwrap(), + compression_type + ); + } + + assert!(matches!( + FileCompressionType::from_str("Unknown"), + Err(DataFusionError::NotImplemented(_)) + )); + } +} diff --git a/datafusion/common/src/file_options/json_writer.rs b/datafusion/common/src/file_options/json_writer.rs new file mode 100644 index 000000000000..b3ea76b6510a --- /dev/null +++ b/datafusion/common/src/file_options/json_writer.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how json files should be written + +use std::str::FromStr; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing JSON files +#[derive(Clone, Debug)] +pub struct JsonWriterOptions { + pub compression: CompressionTypeVariant, +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions { + type Error = DataFusionError; + + fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result { + let _configs = value.0; + let statement_options = value.1; + let mut compression = CompressionTypeVariant::UNCOMPRESSED; + for (option, value) in &statement_options.options { + match option.to_lowercase().as_str(){ + "compression" => { + compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?; + }, + _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for JSON format!"))) + } + } + Ok(JsonWriterOptions { compression }) + } +} diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs new file mode 100644 index 000000000000..29ee73f80fc6 --- /dev/null +++ b/datafusion/common/src/file_options/mod.rs @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how files should be written + +pub mod arrow_writer; +pub mod avro_writer; +pub mod csv_writer; +pub mod file_type; +pub mod json_writer; +pub mod parquet_writer; +pub(crate) mod parse_utils; + +use std::{ + collections::HashMap, + fmt::{self, Display}, + path::Path, + str::FromStr, +}; + +use crate::{ + config::ConfigOptions, file_options::parse_utils::parse_boolean_string, + DataFusionError, FileType, Result, +}; + +use self::{ + arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions, + csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, + parquet_writer::ParquetWriterOptions, +}; + +/// Represents a single arbitrary setting in a +/// [StatementOptions] where OptionTuple.0 determines +/// the specific setting to be modified and OptionTuple.1 +/// determines the value which should be applied +pub type OptionTuple = (String, String); + +/// Represents arbitrary tuples of options passed as String +/// tuples from SQL statements. As in the following statement: +/// COPY ... TO ... (setting1 value1, setting2 value2, ...) +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct StatementOptions { + options: Vec, +} + +/// Useful for conversion from external tables which use Hashmap +impl From<&HashMap> for StatementOptions { + fn from(value: &HashMap) -> Self { + Self { + options: value + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::>(), + } + } +} + +impl StatementOptions { + pub fn new(options: Vec) -> Self { + Self { options } + } + + pub fn into_inner(self) -> Vec { + self.options + } + + /// Scans for option and if it exists removes it and attempts to parse as a boolean + /// Returns none if it does not exist. + pub fn take_bool_option(&mut self, find: &str) -> Result> { + let maybe_option = self.scan_and_remove_option(find); + maybe_option + .map(|(_, v)| parse_boolean_string(find, v)) + .transpose() + } + + /// Scans for option and if it exists removes it and returns it + /// Returns none if it does not exist + pub fn take_str_option(&mut self, find: &str) -> Option { + let maybe_option = self.scan_and_remove_option(find); + maybe_option.map(|(_, v)| v) + } + + /// Infers the file_type given a target and arbitrary options. + /// If the options contain an explicit "format" option, that will be used. + /// Otherwise, attempt to infer file_type from the extension of target. + /// Finally, return an error if unable to determine the file_type + /// If found, format is removed from the options list. + pub fn try_infer_file_type(&mut self, target: &str) -> Result { + let explicit_format = self.scan_and_remove_option("format"); + let format = match explicit_format { + Some(s) => FileType::from_str(s.1.as_str()), + None => { + // try to infer file format from file extension + let extension: &str = &Path::new(target) + .extension() + .ok_or(DataFusionError::Configuration( + "Format not explicitly set and unable to get file extension!" + .to_string(), + ))? + .to_str() + .ok_or(DataFusionError::Configuration( + "Format not explicitly set and failed to parse file extension!" + .to_string(), + ))? + .to_lowercase(); + + FileType::from_str(extension) + } + }?; + + Ok(format) + } + + /// Finds an option in StatementOptions if exists, removes and returns it + /// along with the vec of remaining options. + fn scan_and_remove_option(&mut self, find: &str) -> Option { + let idx = self + .options + .iter() + .position(|(k, _)| k.to_lowercase() == find.to_lowercase()); + match idx { + Some(i) => Some(self.options.swap_remove(i)), + None => None, + } + } +} + +/// This type contains all options needed to initialize a particular +/// RecordBatchWriter type. Each element in the enum contains a thin wrapper +/// around a "writer builder" type (e.g. arrow::csv::WriterBuilder) +/// plus any DataFusion specific writing options (e.g. CSV compression) +#[derive(Clone, Debug)] +pub enum FileTypeWriterOptions { + Parquet(ParquetWriterOptions), + CSV(CsvWriterOptions), + JSON(JsonWriterOptions), + Avro(AvroWriterOptions), + Arrow(ArrowWriterOptions), +} + +impl FileTypeWriterOptions { + /// Constructs a FileTypeWriterOptions given a FileType to be written + /// and arbitrary String tuple options. May return an error if any + /// string setting is unrecognized or unsupported. + pub fn build( + file_type: &FileType, + config_defaults: &ConfigOptions, + statement_options: &StatementOptions, + ) -> Result { + let options = (config_defaults, statement_options); + + let file_type_write_options = match file_type { + FileType::PARQUET => { + FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) + } + FileType::CSV => { + FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) + } + FileType::JSON => { + FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) + } + FileType::AVRO => { + FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) + } + FileType::ARROW => { + FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) + } + }; + + Ok(file_type_write_options) + } + + /// Constructs a FileTypeWriterOptions from session defaults only. + pub fn build_default( + file_type: &FileType, + config_defaults: &ConfigOptions, + ) -> Result { + let empty_statement = StatementOptions::new(vec![]); + let options = (config_defaults, &empty_statement); + + let file_type_write_options = match file_type { + FileType::PARQUET => { + FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) + } + FileType::CSV => { + FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) + } + FileType::JSON => { + FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) + } + FileType::AVRO => { + FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) + } + FileType::ARROW => { + FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) + } + }; + + Ok(file_type_write_options) + } + + /// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from parquet is set. + pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { + match self { + FileTypeWriterOptions::Parquet(opt) => Ok(opt), + _ => Err(DataFusionError::Internal(format!( + "Expected parquet options but found options for: {}", + self + ))), + } + } + + /// Tries to extract CsvWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from csv is set. + pub fn try_into_csv(&self) -> Result<&CsvWriterOptions> { + match self { + FileTypeWriterOptions::CSV(opt) => Ok(opt), + _ => Err(DataFusionError::Internal(format!( + "Expected csv options but found options for {}", + self + ))), + } + } + + /// Tries to extract JsonWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from json is set. + pub fn try_into_json(&self) -> Result<&JsonWriterOptions> { + match self { + FileTypeWriterOptions::JSON(opt) => Ok(opt), + _ => Err(DataFusionError::Internal(format!( + "Expected json options but found options for {}", + self, + ))), + } + } + + /// Tries to extract AvroWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from avro is set. + pub fn try_into_avro(&self) -> Result<&AvroWriterOptions> { + match self { + FileTypeWriterOptions::Avro(opt) => Ok(opt), + _ => Err(DataFusionError::Internal(format!( + "Expected avro options but found options for {}!", + self + ))), + } + } + + /// Tries to extract ArrowWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from arrow is set. + pub fn try_into_arrow(&self) -> Result<&ArrowWriterOptions> { + match self { + FileTypeWriterOptions::Arrow(opt) => Ok(opt), + _ => Err(DataFusionError::Internal(format!( + "Expected arrow options but found options for {}", + self + ))), + } + } +} + +impl Display for FileTypeWriterOptions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = match self { + FileTypeWriterOptions::Arrow(_) => "ArrowWriterOptions", + FileTypeWriterOptions::Avro(_) => "AvroWriterOptions", + FileTypeWriterOptions::CSV(_) => "CsvWriterOptions", + FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", + FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", + }; + write!(f, "{}", name) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use parquet::{ + basic::{Compression, Encoding, ZstdLevel}, + file::properties::{EnabledStatistics, WriterVersion}, + schema::types::ColumnPath, + }; + + use crate::{ + config::ConfigOptions, + file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, + parsers::CompressionTypeVariant, + }; + + use crate::Result; + + use super::{parquet_writer::ParquetWriterOptions, StatementOptions}; + + #[test] + fn test_writeroptions_parquet_from_statement_options() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + option_map.insert("max_row_group_size".to_owned(), "123".to_owned()); + option_map.insert("data_pagesize_limit".to_owned(), "123".to_owned()); + option_map.insert("write_batch_size".to_owned(), "123".to_owned()); + option_map.insert("writer_version".to_owned(), "2.0".to_owned()); + option_map.insert("dictionary_page_size_limit".to_owned(), "123".to_owned()); + option_map.insert("created_by".to_owned(), "df write unit test".to_owned()); + option_map.insert("column_index_truncate_length".to_owned(), "123".to_owned()); + option_map.insert("data_page_row_count_limit".to_owned(), "123".to_owned()); + option_map.insert("bloom_filter_enabled".to_owned(), "true".to_owned()); + option_map.insert("encoding".to_owned(), "plain".to_owned()); + option_map.insert("dictionary_enabled".to_owned(), "true".to_owned()); + option_map.insert("compression".to_owned(), "zstd(4)".to_owned()); + option_map.insert("statistics_enabled".to_owned(), "page".to_owned()); + option_map.insert("bloom_filter_fpp".to_owned(), "0.123".to_owned()); + option_map.insert("bloom_filter_ndv".to_owned(), "123".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let parquet_options = ParquetWriterOptions::try_from((&config, &options))?; + let properties = parquet_options.writer_options(); + + // Verify the expected options propagated down to parquet crate WriterProperties struct + assert_eq!(properties.max_row_group_size(), 123); + assert_eq!(properties.data_page_size_limit(), 123); + assert_eq!(properties.write_batch_size(), 123); + assert_eq!(properties.writer_version(), WriterVersion::PARQUET_2_0); + assert_eq!(properties.dictionary_page_size_limit(), 123); + assert_eq!(properties.created_by(), "df write unit test"); + assert_eq!(properties.column_index_truncate_length(), Some(123)); + assert_eq!(properties.data_page_row_count_limit(), 123); + properties + .bloom_filter_properties(&ColumnPath::from("")) + .expect("expected bloom filter enabled"); + assert_eq!( + properties + .encoding(&ColumnPath::from("")) + .expect("expected default encoding"), + Encoding::PLAIN + ); + assert!(properties.dictionary_enabled(&ColumnPath::from(""))); + assert_eq!( + properties.compression(&ColumnPath::from("")), + Compression::ZSTD(ZstdLevel::try_new(4_i32)?) + ); + assert_eq!( + properties.statistics_enabled(&ColumnPath::from("")), + EnabledStatistics::Page + ); + assert_eq!( + properties + .bloom_filter_properties(&ColumnPath::from("")) + .expect("expected bloom properties!") + .fpp, + 0.123 + ); + assert_eq!( + properties + .bloom_filter_properties(&ColumnPath::from("")) + .expect("expected bloom properties!") + .ndv, + 123 + ); + + Ok(()) + } + + #[test] + fn test_writeroptions_csv_from_statement_options() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + option_map.insert("header".to_owned(), "true".to_owned()); + option_map.insert("date_format".to_owned(), "123".to_owned()); + option_map.insert("datetime_format".to_owned(), "123".to_owned()); + option_map.insert("timestamp_format".to_owned(), "2.0".to_owned()); + option_map.insert("time_format".to_owned(), "123".to_owned()); + option_map.insert("rfc3339".to_owned(), "true".to_owned()); + option_map.insert("null_value".to_owned(), "123".to_owned()); + option_map.insert("compression".to_owned(), "gzip".to_owned()); + option_map.insert("delimeter".to_owned(), ";".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let csv_options = CsvWriterOptions::try_from((&config, &options))?; + let builder = csv_options.writer_options; + let buff = Vec::new(); + let _properties = builder.build(buff); + assert!(csv_options.has_header); + assert_eq!(csv_options.compression, CompressionTypeVariant::GZIP); + // TODO expand unit test if csv::WriterBuilder allows public read access to properties + + Ok(()) + } + + #[test] + fn test_writeroptions_json_from_statement_options() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + option_map.insert("compression".to_owned(), "gzip".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let json_options = JsonWriterOptions::try_from((&config, &options))?; + assert_eq!(json_options.compression, CompressionTypeVariant::GZIP); + + Ok(()) + } +} diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs new file mode 100644 index 000000000000..ea3276b062ac --- /dev/null +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how parquet files should be written + +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; + +use crate::{ + config::ConfigOptions, + file_options::parse_utils::{ + parse_compression_string, parse_encoding_string, parse_statistics_string, + parse_version_string, + }, + DataFusionError, Result, +}; + +use super::StatementOptions; + +/// Options for writing parquet files +#[derive(Clone, Debug)] +pub struct ParquetWriterOptions { + pub writer_options: WriterProperties, +} + +impl ParquetWriterOptions { + pub fn writer_options(&self) -> &WriterProperties { + &self.writer_options + } +} + +/// Constructs a default Parquet WriterPropertiesBuilder using +/// Session level ConfigOptions to initialize settings +fn default_builder(options: &ConfigOptions) -> Result { + let parquet_session_options = &options.execution.parquet; + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(parquet_session_options.data_pagesize_limit) + .set_write_batch_size(parquet_session_options.write_batch_size) + .set_writer_version(parse_version_string( + &parquet_session_options.writer_version, + )?) + .set_dictionary_page_size_limit( + parquet_session_options.dictionary_page_size_limit, + ) + .set_max_row_group_size(parquet_session_options.max_row_group_size) + .set_created_by(parquet_session_options.created_by.clone()) + .set_column_index_truncate_length( + parquet_session_options.column_index_truncate_length, + ) + .set_data_page_row_count_limit(parquet_session_options.data_page_row_count_limit) + .set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled); + + builder = match &parquet_session_options.encoding { + Some(encoding) => builder.set_encoding(parse_encoding_string(encoding)?), + None => builder, + }; + + builder = match &parquet_session_options.dictionary_enabled { + Some(enabled) => builder.set_dictionary_enabled(*enabled), + None => builder, + }; + + builder = match &parquet_session_options.compression { + Some(compression) => { + builder.set_compression(parse_compression_string(compression)?) + } + None => builder, + }; + + builder = match &parquet_session_options.statistics_enabled { + Some(statistics) => { + builder.set_statistics_enabled(parse_statistics_string(statistics)?) + } + None => builder, + }; + + builder = match &parquet_session_options.max_statistics_size { + Some(size) => builder.set_max_statistics_size(*size), + None => builder, + }; + + builder = match &parquet_session_options.bloom_filter_fpp { + Some(fpp) => builder.set_bloom_filter_fpp(*fpp), + None => builder, + }; + + builder = match &parquet_session_options.bloom_filter_ndv { + Some(ndv) => builder.set_bloom_filter_ndv(*ndv), + None => builder, + }; + + Ok(builder) +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { + type Error = DataFusionError; + + fn try_from( + configs_and_statement_options: (&ConfigOptions, &StatementOptions), + ) -> Result { + let configs = configs_and_statement_options.0; + let statement_options = configs_and_statement_options.1; + let mut builder = default_builder(configs)?; + for (option, value) in &statement_options.options { + builder = match option.to_lowercase().as_str(){ + "max_row_group_size" => builder + .set_max_row_group_size(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?), + "data_pagesize_limit" => builder + .set_data_page_size_limit(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), + "write_batch_size" => builder + .set_write_batch_size(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), + "writer_version" => builder + .set_writer_version(parse_version_string(value)?), + "dictionary_page_size_limit" => builder + .set_dictionary_page_size_limit(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), + "created_by" => builder + .set_created_by(value.to_owned()), + "column_index_truncate_length" => builder + .set_column_index_truncate_length(Some(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?)), + "data_page_row_count_limit" => builder + .set_data_page_row_count_limit(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), + "bloom_filter_enabled" => builder + .set_bloom_filter_enabled(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), + "encoding" => builder + .set_encoding(parse_encoding_string(value)?), + "dictionary_enabled" => builder + .set_dictionary_enabled(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), + "compression" => builder + .set_compression(parse_compression_string(value)?), + "statistics_enabled" => builder + .set_statistics_enabled(parse_statistics_string(value)?), + "max_statistics_size" => builder + .set_max_statistics_size(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), + "bloom_filter_fpp" => builder + .set_bloom_filter_fpp(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?), + "bloom_filter_ndv" => builder + .set_bloom_filter_ndv(value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?), + _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!"))) + } + } + Ok(ParquetWriterOptions { + writer_options: builder.build(), + }) + } +} diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs new file mode 100644 index 000000000000..da8d31436b9e --- /dev/null +++ b/datafusion/common/src/file_options/parse_utils.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions for parsing arbitrary passed strings to valid file_option settings + +use parquet::{ + basic::{BrotliLevel, GzipLevel, ZstdLevel}, + file::properties::{EnabledStatistics, WriterVersion}, +}; + +use crate::{DataFusionError, Result}; + +/// Converts a String option to a bool, or returns an error if not a valid bool string. +pub(crate) fn parse_boolean_string(option: &str, value: String) -> Result { + match value.to_lowercase().as_str() { + "true" => Ok(true), + "false" => Ok(false), + _ => Err(DataFusionError::Configuration(format!( + "Unsupported value {value} for option {option}! \ + Valid values are true or false!" + ))), + } +} + +/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding +pub(crate) fn parse_encoding_string( + str_setting: &str, +) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "plain" => Ok(parquet::basic::Encoding::PLAIN), + "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), + "rle" => Ok(parquet::basic::Encoding::RLE), + "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), + "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), + "delta_length_byte_array" => { + Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) + } + "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), + "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), + "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet encoding: \ + {str_setting}. Valid values are: plain, plain_dictionary, rle, \ + /// bit_packed, delta_binary_packed, delta_length_byte_array, \ + /// delta_byte_array, rle_dictionary, and byte_stream_split." + ))), + } +} + +/// Splits compression string into compression codec and optional compression_level +/// I.e. gzip(2) -> gzip, 2 +fn split_compression_string(str_setting: &str) -> Result<(String, Option)> { + // ignore string literal chars passed from sqlparser i.e. remove single quotes + let str_setting = str_setting.replace('\'', ""); + let split_setting = str_setting.split_once('('); + + match split_setting { + Some((codec, rh)) => { + let level = &rh[..rh.len() - 1].parse::().map_err(|_| { + DataFusionError::Configuration(format!( + "Could not parse compression string. \ + Got codec: {} and unknown level from {}", + codec, str_setting + )) + })?; + Ok((codec.to_owned(), Some(*level))) + } + None => Ok((str_setting.to_owned(), None)), + } +} + +/// Helper to ensure compression codecs which don't support levels +/// don't have one set. E.g. snappy(2) is invalid. +fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { + if level.is_some() { + return Err(DataFusionError::Configuration(format!( + "Compression {codec} does not support specifying a level" + ))); + } + Ok(()) +} + +/// Helper to ensure compression codecs which require a level +/// do have one set. E.g. zstd is invalid, zstd(3) is valid +fn require_level(codec: &str, level: Option) -> Result { + level.ok_or(DataFusionError::Configuration(format!( + "{codec} compression requires specifying a level such as {codec}(4)" + ))) +} + +/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression +pub(crate) fn parse_compression_string( + str_setting: &str, +) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + let (codec, level) = split_compression_string(str_setting_lower)?; + let codec = codec.as_str(); + match codec { + "uncompressed" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::UNCOMPRESSED) + } + "snappy" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::SNAPPY) + } + "gzip" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( + level, + )?)) + } + "lzo" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZO) + } + "brotli" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( + level, + )?)) + } + "lz4" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4) + } + "zstd" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + level as i32, + )?)) + } + "lz4_raw" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4_RAW) + } + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet compression: \ + {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ + lzo, brotli(level), lz4, zstd(level), and lz4_raw." + ))), + } +} + +pub(crate) fn parse_version_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "1.0" => Ok(WriterVersion::PARQUET_1_0), + "2.0" => Ok(WriterVersion::PARQUET_2_0), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet writer version {str_setting} \ + valid options are '1.0' and '2.0'" + ))), + } +} + +pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "none" => Ok(EnabledStatistics::None), + "chunk" => Ok(EnabledStatistics::Chunk), + "page" => Ok(EnabledStatistics::Page), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet statistics setting {str_setting} \ + valid options are 'none', 'page', and 'chunk'" + ))), + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index a64b283ed365..5f02d92e50c9 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -22,7 +22,7 @@ pub mod config; mod dfschema; pub mod display; mod error; -pub mod file_type; +pub mod file_options; pub mod format; mod functional_dependencies; mod join_type; @@ -44,11 +44,13 @@ pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, SharedResult, }; -pub use file_type::{ + +pub use file_options::file_type::{ FileCompressionType, FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION, }; +pub use file_options::FileTypeWriterOptions; pub use functional_dependencies::{ aggregate_functional_dependencies, get_target_functional_dependencies, Constraints, Dependency, FunctionalDependence, FunctionalDependencies, diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs index 58f4db751c4c..ea2508f8c455 100644 --- a/datafusion/common/src/parsers.rs +++ b/datafusion/common/src/parsers.rs @@ -46,7 +46,7 @@ impl FromStr for CompressionTypeVariant { "BZIP2" | "BZ2" => Ok(Self::BZIP2), "XZ" => Ok(Self::XZ), "ZST" | "ZSTD" => Ok(Self::ZSTD), - "" => Ok(Self::UNCOMPRESSED), + "" | "UNCOMPRESSED" => Ok(Self::UNCOMPRESSED), _ => Err(ParserError::ParserError(format!( "Unsupported file compression type {s}" ))), diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 92a061ce596a..fbd78ec9d7d8 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -24,6 +24,7 @@ use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion_common::file_options::StatementOptions; use datafusion_common::{DataFusionError, FileType, SchemaError, UnnestOptions}; use parquet::file::properties::WriterProperties; @@ -999,9 +1000,9 @@ impl DataFrame { self.plan, path.into(), FileType::CSV, - true, + false, // TODO implement options - vec![], + StatementOptions::new(vec![]), )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1017,9 +1018,9 @@ impl DataFrame { self.plan, path.into(), FileType::PARQUET, - true, + false, // TODO implement options - vec![], + StatementOptions::new(vec![]), )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1034,9 +1035,9 @@ impl DataFrame { self.plan, path.into(), FileType::JSON, - true, + false, // TODO implement options - vec![], + StatementOptions::new(vec![]), )? .build()?; DataFrame::new(self.session_state, plan).collect().await diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 903c2ce2a208..9dc4c043b4ce 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -27,7 +27,7 @@ use crate::physical_plan::ExecutionPlan; use arrow::ipc::reader::FileReader; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; -use datafusion_common::Statistics; +use datafusion_common::{FileType, Statistics}; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; use std::any::Any; @@ -86,6 +86,10 @@ impl FileFormat for ArrowFormat { let exec = ArrowExec::new(conf); Ok(Arc::new(exec)) } + + fn file_type(&self) -> FileType { + FileType::ARROW + } } fn read_arrow_schema_from_reader(reader: R) -> Result { diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 296fb1e66851..bde2d107ef7d 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion_common::FileType; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; @@ -85,6 +86,10 @@ impl FileFormat for AvroFormat { let exec = AvroExec::new(conf); Ok(Arc::new(exec)) } + + fn file_type(&self) -> FileType { + FileType::AVRO + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 1290625afe2c..4d2caccacd0d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -27,7 +27,7 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; -use datafusion_common::{exec_err, not_impl_err, DataFusionError}; +use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -273,15 +273,14 @@ impl FileFormat for CsvFormat { } let sink_schema = conf.output_schema().clone(); - let sink = Arc::new(CsvSink::new( - conf, - self.has_header, - self.delimiter, - self.file_compression_type, - )); + let sink = Arc::new(CsvSink::new(conf)); Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } + + fn file_type(&self) -> FileType { + FileType::CSV + } } impl CsvFormat { @@ -439,18 +438,11 @@ impl BatchSerializer for CsvSerializer { struct CsvSink { /// Config options for writing data config: FileSinkConfig, - has_header: bool, - delimiter: u8, - file_compression_type: FileCompressionType, } impl Debug for CsvSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CsvSink") - .field("has_header", &self.has_header) - .field("delimiter", &self.delimiter) - .field("file_compression_type", &self.file_compression_type) - .finish() + f.debug_struct("CsvSink").finish() } } @@ -471,18 +463,8 @@ impl DisplayAs for CsvSink { } impl CsvSink { - fn new( - config: FileSinkConfig, - has_header: bool, - delimiter: u8, - file_compression_type: FileCompressionType, - ) -> Self { - Self { - config, - has_header, - delimiter, - file_compression_type, - } + fn new(config: FileSinkConfig) -> Self { + Self { config } } } @@ -494,6 +476,11 @@ impl DataSink for CsvSink { context: &Arc, ) -> Result { let num_partitions = data.len(); + let writer_options = self.config.file_type_writer_options.try_into_csv()?; + let (builder, compression) = + (&writer_options.writer_options, &writer_options.compression); + let mut has_header = writer_options.has_header; + let compression = FileCompressionType::from(*compression); let object_store = context .runtime_env() @@ -503,25 +490,23 @@ impl DataSink for CsvSink { let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - if !self.config.per_thread_output { - return not_impl_err!("per_thread_output=false is not implemented for CsvSink in Append mode"); - } for file_group in &self.config.file_groups { + let mut append_builder = builder.clone(); // In append mode, consider has_header flag only when file is empty (at the start). // For other modes, use has_header flag as is. - let header = self.has_header - && (!matches!(&self.config.writer_mode, FileWriterMode::Append) - || file_group.object_meta.size == 0); - let builder = WriterBuilder::new().with_delimiter(self.delimiter); + if file_group.object_meta.size != 0 { + has_header = false; + append_builder = append_builder.has_headers(false); + } let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); + .with_builder(append_builder) + .with_header(has_header); serializers.push(Box::new(serializer)); let file = file_group.clone(); let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, file.object_meta.clone().into(), object_store.clone(), ) @@ -535,18 +520,15 @@ impl DataSink for CsvSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - match self.config.per_thread_output { - true => { + match self.config.single_file_output { + false => { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); for part_idx in 0..num_partitions { - let header = self.has_header; - let builder = - WriterBuilder::new().with_delimiter(self.delimiter); let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); + .with_builder(builder.clone()) + .with_header(has_header); serializers.push(Box::new(serializer)); let file_path = base_path .prefix() @@ -559,7 +541,7 @@ impl DataSink for CsvSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -567,12 +549,10 @@ impl DataSink for CsvSink { writers.push(writer); } } - false => { - let header = self.has_header; - let builder = WriterBuilder::new().with_delimiter(self.delimiter); + true => { let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); + .with_builder(builder.clone()) + .with_header(has_header); serializers.push(Box::new(serializer)); let file_path = base_path.prefix(); let object_meta = ObjectMeta { @@ -583,7 +563,7 @@ impl DataSink for CsvSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -598,7 +578,7 @@ impl DataSink for CsvSink { data, serializers, writers, - self.config.per_thread_output, + self.config.single_file_output, ) .await } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index e2ddda0fa511..30075c05d57a 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -22,6 +22,7 @@ use std::any::Any; use bytes::Bytes; use datafusion_common::not_impl_err; use datafusion_common::DataFusionError; +use datafusion_common::FileType; use datafusion_execution::TaskContext; use rand::distributions::Alphanumeric; use rand::distributions::DistString; @@ -184,6 +185,10 @@ impl FileFormat for JsonFormat { Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } + + fn file_type(&self) -> FileType { + FileType::JSON + } } impl Default for JsonSerializer { @@ -270,13 +275,17 @@ impl DataSink for JsonSink { .runtime_env() .object_store(&self.config.object_store_url)?; + let writer_options = self.config.file_type_writer_options.try_into_json()?; + + let compression = FileCompressionType::from(writer_options.compression); + // Construct serializer and writer for each file group let mut serializers: Vec> = vec![]; let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - if !self.config.per_thread_output { - return not_impl_err!("per_thread_output=false is not implemented for JsonSink in Append mode"); + if self.config.single_file_output { + return Err(DataFusionError::NotImplemented("single_file_output=true is not implemented for JsonSink in Append mode".into())); } for file_group in &self.config.file_groups { let serializer = JsonSerializer::new(); @@ -285,7 +294,7 @@ impl DataSink for JsonSink { let file = file_group.clone(); let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, file.object_meta.clone().into(), object_store.clone(), ) @@ -299,8 +308,8 @@ impl DataSink for JsonSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - match self.config.per_thread_output { - true => { + match self.config.single_file_output { + false => { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); @@ -318,7 +327,7 @@ impl DataSink for JsonSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -326,7 +335,7 @@ impl DataSink for JsonSink { writers.push(writer); } } - false => { + true => { let serializer = JsonSerializer::new(); serializers.push(Box::new(serializer)); let file_path = base_path.prefix(); @@ -338,7 +347,7 @@ impl DataSink for JsonSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -353,7 +362,7 @@ impl DataSink for JsonSink { data, serializers, writers, - self.config.per_thread_output, + self.config.single_file_output, ) .await } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 9dd88cc03808..f15d1cea3132 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -39,7 +39,7 @@ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; -use datafusion_common::{not_impl_err, DataFusionError}; +use datafusion_common::{not_impl_err, DataFusionError, FileType}; use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; @@ -101,6 +101,9 @@ pub trait FileFormat: Send + Sync + fmt::Debug { ) -> Result> { not_impl_err!("Writer not implemented for this format") } + + /// Returns the FileType corresponding to this FileFormat + fn file_type(&self) -> FileType; } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e5bbe7c82ab7..79eda8109ca8 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,7 +17,6 @@ //! Parquet format abstractions -use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use rand::distributions::DistString; use std::any::Any; use std::fmt; @@ -28,7 +27,7 @@ use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError}; +use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; @@ -37,7 +36,7 @@ use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; +use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; @@ -236,6 +235,10 @@ impl FileFormat for ParquetFormat { Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } + + fn file_type(&self) -> FileType { + FileType::PARQUET + } } fn summarize_min_max( @@ -596,212 +599,11 @@ impl DisplayAs for ParquetSink { } } -/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding -fn parse_encoding_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "plain" => Ok(parquet::basic::Encoding::PLAIN), - "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), - "rle" => Ok(parquet::basic::Encoding::RLE), - "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), - "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), - "delta_length_byte_array" => { - Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) - } - "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), - "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), - "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet encoding: \ - {str_setting}. Valid values are: plain, plain_dictionary, rle, \ - /// bit_packed, delta_binary_packed, delta_length_byte_array, \ - /// delta_byte_array, rle_dictionary, and byte_stream_split." - ))), - } -} - -/// Splits compression string into compression codec and optional compression_level -/// I.e. gzip(2) -> gzip, 2 -fn split_compression_string(str_setting: &str) -> Result<(&str, Option)> { - let split_setting = str_setting.split_once('('); - - match split_setting { - Some((codec, rh)) => { - let level = &rh[..rh.len() - 1].parse::().map_err(|_| { - DataFusionError::Plan(format!( - "Could not parse compression string. \ - Got codec: {} and unknown level from {}", - codec, str_setting - )) - })?; - Ok((codec, Some(*level))) - } - None => Ok((str_setting, None)), - } -} - -/// Helper to ensure compression codecs which don't support levels -/// don't have one set. E.g. snappy(2) is invalid. -fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { - if level.is_some() { - return Err(DataFusionError::Plan(format!( - "Compression {codec} does not support specifying a level" - ))); - } - Ok(()) -} - -/// Helper to ensure compression codecs which require a level -/// do have one set. E.g. zstd is invalid, zstd(3) is valid -fn require_level(codec: &str, level: Option) -> Result { - level.ok_or(DataFusionError::Plan(format!( - "{codec} compression requires specifying a level such as {codec}(4)" - ))) -} - -/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression -fn parse_compression_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - let (codec, level) = split_compression_string(str_setting_lower)?; - match codec { - "uncompressed" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::UNCOMPRESSED) - } - "snappy" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::SNAPPY) - } - "gzip" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( - level, - )?)) - } - "lzo" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZO) - } - "brotli" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( - level, - )?)) - } - "lz4" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZ4) - } - "zstd" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( - level as i32, - )?)) - } - "lz4_raw" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZ4_RAW) - } - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet compression: \ - {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ - lzo, brotli(level), lz4, zstd(level), and lz4_raw." - ))), - } -} - -fn parse_version_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "1.0" => Ok(WriterVersion::PARQUET_1_0), - "2.0" => Ok(WriterVersion::PARQUET_2_0), - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet writer version {str_setting} \ - valid options are '1.0' and '2.0'" - ))), - } -} - -fn parse_statistics_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "none" => Ok(EnabledStatistics::None), - "chunk" => Ok(EnabledStatistics::Chunk), - "page" => Ok(EnabledStatistics::Page), - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet statistics setting {str_setting} \ - valid options are 'none', 'page', and 'chunk'" - ))), - } -} - impl ParquetSink { fn new(config: FileSinkConfig) -> Self { Self { config } } - /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options. - /// May return error if SessionContext contains invalid or unsupported options - fn parquet_writer_props_from_context( - &self, - context: &Arc, - ) -> Result { - let parquet_context = &context.session_config().options().execution.parquet; - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(parquet_context.data_pagesize_limit) - .set_write_batch_size(parquet_context.write_batch_size) - .set_writer_version(parse_version_string(&parquet_context.writer_version)?) - .set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit) - .set_max_row_group_size(parquet_context.max_row_group_size) - .set_created_by(parquet_context.created_by.clone()) - .set_column_index_truncate_length( - parquet_context.column_index_truncate_length, - ) - .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) - .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled); - - builder = match &parquet_context.encoding { - Some(encoding) => builder.set_encoding(parse_encoding_string(encoding)?), - None => builder, - }; - - builder = match &parquet_context.dictionary_enabled { - Some(enabled) => builder.set_dictionary_enabled(*enabled), - None => builder, - }; - - builder = match &parquet_context.compression { - Some(compression) => { - builder.set_compression(parse_compression_string(compression)?) - } - None => builder, - }; - - builder = match &parquet_context.statistics_enabled { - Some(statistics) => { - builder.set_statistics_enabled(parse_statistics_string(statistics)?) - } - None => builder, - }; - - builder = match &parquet_context.max_statistics_size { - Some(size) => builder.set_max_statistics_size(*size), - None => builder, - }; - - builder = match &parquet_context.bloom_filter_fpp { - Some(fpp) => builder.set_bloom_filter_fpp(*fpp), - None => builder, - }; - - builder = match &parquet_context.bloom_filter_ndv { - Some(ndv) => builder.set_bloom_filter_ndv(*ndv), - None => builder, - }; - - Ok(builder.build()) - } - // Create a write for parquet files async fn create_writer( &self, @@ -846,7 +648,11 @@ impl DataSink for ParquetSink { context: &Arc, ) -> Result { let num_partitions = data.len(); - let parquet_props = self.parquet_writer_props_from_context(context)?; + let parquet_props = self + .config + .file_type_writer_options + .try_into_parquet()? + .writer_options(); let object_store = context .runtime_env() @@ -866,8 +672,8 @@ impl DataSink for ParquetSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - match self.config.per_thread_output { - true => { + match self.config.single_file_output { + false => { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); @@ -891,7 +697,7 @@ impl DataSink for ParquetSink { writers.push(writer); } } - false => { + true => { let file_path = base_path.prefix(); let object_meta = ObjectMeta { location: file_path.clone(), @@ -916,9 +722,9 @@ impl DataSink for ParquetSink { // TODO parallelize serialization accross partitions and batches within partitions // see: https://github.com/apache/arrow-datafusion/issues/7079 for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { - let idx = match self.config.per_thread_output { - true => part_idx, - false => 0, + let idx = match self.config.single_file_output { + false => part_idx, + true => 0, }; while let Some(batch) = data_stream.next().await.transpose()? { row_count += batch.num_rows(); diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 901ccd4043de..272eee1fbcc3 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -324,14 +324,14 @@ pub(crate) async fn stateless_serialize_and_write_files( mut data: Vec, mut serializers: Vec>, mut writers: Vec>>, - per_thread_output: bool, + single_file_output: bool, ) -> Result { - if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) { - return internal_err!("per_thread_output is false, but got more than 1 writer!"); + if single_file_output && (serializers.len() != 1 || writers.len() != 1) { + return internal_err!("single_file_output is true, but got more than 1 writer!"); } let num_partitions = data.len(); - if per_thread_output && (num_partitions != writers.len()) { - return internal_err!("per_thread_output is true, but did not get 1 writer for each output partition!"); + if !single_file_output && (num_partitions != writers.len()) { + return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!"); } let mut row_count = 0; // Map errors to DatafusionError. @@ -340,9 +340,9 @@ pub(crate) async fn stateless_serialize_and_write_files( // TODO parallelize serialization accross partitions and batches within partitions // see: https://github.com/apache/arrow-datafusion/issues/7079 for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { - let idx = match per_thread_output { - true => part_idx, - false => 0, + let idx = match single_file_output { + false => part_idx, + true => 0, }; while let Some(maybe_batch) = data_stream.next().await { // Write data to files in a round robin fashion: diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index c95fa8d77687..7740bff2109b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -25,6 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; use dashmap::DashMap; +use datafusion_common::FileTypeWriterOptions; use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema}; use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; @@ -269,8 +270,14 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, - ///This setting controls how inserts to this table should be handled + /// This setting controls how inserts to this table should be handled pub insert_mode: ListingTableInsertMode, + /// This setting when true indicates that the table is backed by a single file. + /// Any inserts to the table may only append to this existing file. + pub single_file: bool, + /// This setting holds file format specific options which should be used + /// when inserting into this table. + pub file_type_write_options: Option, } impl ListingOptions { @@ -290,6 +297,8 @@ impl ListingOptions { file_sort_order: vec![], infinite_source: false, insert_mode: ListingTableInsertMode::AppendToFile, + single_file: false, + file_type_write_options: None, } } @@ -464,6 +473,21 @@ impl ListingOptions { self } + /// Configure if this table is backed by a sigle file + pub fn with_single_file(mut self, single_file: bool) -> Self { + self.single_file = single_file; + self + } + + /// Configure file format specific writing options. + pub fn with_write_options( + mut self, + file_type_write_options: FileTypeWriterOptions, + ) -> Self { + self.file_type_write_options = Some(file_type_write_options); + self + } + /// Infer the schema of the files at the given path on the provided object store. /// The inferred schema does not include the partitioning columns. /// @@ -873,6 +897,16 @@ impl TableProvider for ListingTable { } } + let file_format = self.options().format.as_ref(); + + let file_type_writer_options = match &self.options().file_type_write_options { + Some(opt) => opt.clone(), + None => FileTypeWriterOptions::build_default( + &file_format.file_type(), + state.config_options(), + )?, + }; + // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), @@ -881,9 +915,9 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), writer_mode, - // TODO: when listing table is known to be backed by a single file, this should be false - per_thread_output: true, + single_file_output: self.options.single_file, overwrite, + file_type_writer_options, }; self.options() @@ -1828,7 +1862,7 @@ mod tests { ) .await .expect_err("Example should fail!"); - assert_eq!("Error during planning: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); + assert_eq!("Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); Ok(()) } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 2529c11de5a5..a788baf80fee 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, SchemaRef}; use async_trait::async_trait; +use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions}; use datafusion_common::DataFusionError; use datafusion_expr::CreateExternalTable; @@ -133,9 +134,21 @@ impl TableProviderFactory for ListingTableFactory { // look for 'infinite' as an option let infinite_source = cmd.unbounded; - let explicit_insert_mode = cmd.options.get("insert_mode"); + let mut statement_options = StatementOptions::from(&cmd.options); + + // Extract ListingTable specific options if present or set default + // Discard unbounded option if present + statement_options.take_str_option("unbounded"); + let create_local_path = statement_options + .take_bool_option("create_local_path")? + .unwrap_or(false); + let single_file = statement_options + .take_bool_option("single_file")? + .unwrap_or(false); + + let explicit_insert_mode = statement_options.take_str_option("insert_mode"); let insert_mode = match explicit_insert_mode { - Some(mode) => ListingTableInsertMode::from_str(mode), + Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), None => match file_type { FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), @@ -145,34 +158,50 @@ impl TableProviderFactory for ListingTableFactory { }, }?; - let create_local_path_mode = cmd - .options - .get("create_local_path") - .map(|s| s.as_str()) - .unwrap_or("false"); - let single_file = cmd - .options - .get("single_file") - .map(|s| s.as_str()) - .unwrap_or("false"); - - let single_file = match single_file { - "true" => Ok(true), - "false" => Ok(false), - _ => Err(DataFusionError::Plan( - "Invalid option single_file, must be 'true' or 'false'".into(), - )), - }?; + let file_type = file_format.file_type(); + + // Use remaining options and session state to build FileTypeWriterOptions + let file_type_writer_options = FileTypeWriterOptions::build( + &file_type, + state.config_options(), + &statement_options, + )?; + + // Some options have special syntax which takes precedence + // e.g. "WITH HEADER ROW" overrides (header false, ...) + let file_type_writer_options = match file_type { + FileType::CSV => { + let mut csv_writer_options = + file_type_writer_options.try_into_csv()?.clone(); + csv_writer_options.has_header = cmd.has_header; + csv_writer_options.writer_options = csv_writer_options + .writer_options + .has_headers(cmd.has_header) + .with_delimiter(cmd.delimiter.try_into().map_err(|_| { + DataFusionError::Internal( + "Unable to convert CSV delimiter into u8".into(), + ) + })?); + csv_writer_options.compression = cmd.file_compression_type; + FileTypeWriterOptions::CSV(csv_writer_options) + } + FileType::JSON => { + let mut json_writer_options = + file_type_writer_options.try_into_json()?.clone(); + json_writer_options.compression = cmd.file_compression_type; + FileTypeWriterOptions::JSON(json_writer_options) + } + FileType::PARQUET => file_type_writer_options, + FileType::ARROW => file_type_writer_options, + FileType::AVRO => file_type_writer_options, + }; - let table_path = match create_local_path_mode { - "true" => ListingTableUrl::parse_create_local_if_not_exists( + let table_path = match create_local_path { + true => ListingTableUrl::parse_create_local_if_not_exists( &cmd.location, !single_file, ), - "false" => ListingTableUrl::parse(&cmd.location), - _ => Err(DataFusionError::Plan( - "Invalid option create_local_path, must be 'true' or 'false'".into(), - )), + false => ListingTableUrl::parse(&cmd.location), }?; let options = ListingOptions::new(file_format) @@ -182,7 +211,9 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_infinite_source(infinite_source) .with_file_sort_order(cmd.order_exprs.clone()) - .with_insert_mode(insert_mode); + .with_insert_mode(insert_mode) + .with_single_file(single_file) + .with_write_options(file_type_writer_options); let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ecaf71ff541f..0fc0c23a2fda 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -62,7 +62,7 @@ use crate::{ physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay}, }; -use datafusion_common::plan_err; +use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; use datafusion_physical_expr::expressions::Column; use arrow::compute::cast; @@ -79,7 +79,6 @@ use super::listing::ListingTableUrl; /// The base configurations to provide when creating a physical plan for /// writing to any given file format. -#[derive(Debug, Clone)] pub struct FileSinkConfig { /// Object store URL, used to get an ObjectStore instance pub object_store_url: ObjectStoreUrl, @@ -94,12 +93,14 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, - /// If false, it is assumed there is a single table_path which is a file to which all data should be written + /// If true, it is assumed there is a single table_path which is a file to which all data should be written /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. - pub per_thread_output: bool, + pub single_file_output: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, + /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size + pub file_type_writer_options: FileTypeWriterOptions, } impl FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ffb9039b2a64..25b11b1f973f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -38,6 +38,7 @@ use crate::logical_expr::{ Repartition, Union, UserDefinedLogicalNode, }; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::file_options::FileTypeWriterOptions; use datafusion_common::FileType; use datafusion_expr::dml::CopyTo; @@ -555,19 +556,24 @@ impl DefaultPhysicalPlanner { input, output_url, file_format, - per_thread_output, - options: _, + single_file_output, + statement_options, }) => { let input_exec = self.create_initial_plan(input, session_state).await?; // TODO: make this behavior configurable via options (should copy to create path/file as needed?) // TODO: add additional configurable options for if existing files should be overwritten or // appended to - let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, *per_thread_output)?; + let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?; let object_store_url = parsed_url.object_store(); let schema: Schema = (**input.schema()).clone().into(); + let file_type_writer_options = FileTypeWriterOptions::build( + file_format, + session_state.config_options(), + statement_options)?; + // Set file sink related options let config = FileSinkConfig { object_store_url, @@ -576,12 +582,11 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols: vec![], writer_mode: FileWriterMode::PutMultipart, - per_thread_output: *per_thread_output, + single_file_output: *single_file_output, overwrite: false, + file_type_writer_options }; - // TODO: implement statement level overrides for each file type - // E.g. CsvFormat::from_options(options) let sink_format: Arc = match file_format { FileType::CSV => Arc::new(CsvFormat::default()), FileType::PARQUET => Arc::new(ParquetFormat::default()), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index fb91e1bc9dba..d3cc42afbe66 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -41,6 +41,7 @@ use crate::{ Expr, ExprSchemable, TableSource, }; use arrow::datatypes::{DataType, Schema, SchemaRef}; +use datafusion_common::file_options::StatementOptions; use datafusion_common::plan_err; use datafusion_common::UnnestOptions; use datafusion_common::{ @@ -238,15 +239,15 @@ impl LogicalPlanBuilder { input: LogicalPlan, output_url: String, file_format: FileType, - per_thread_output: bool, - options: Vec<(String, String)>, + single_file_output: bool, + statement_options: StatementOptions, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, file_format, - per_thread_output, - options, + single_file_output, + statement_options, }))) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index c5243d830565..501f2eeba146 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -20,7 +20,9 @@ use std::{ sync::Arc, }; -use datafusion_common::{DFSchemaRef, FileType, OwnedTableReference}; +use datafusion_common::{ + file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference, +}; use crate::LogicalPlan; @@ -36,9 +38,9 @@ pub struct CopyTo { /// If false, it is assumed output_url is a file to which all data should be written /// regardless of input partitioning. Otherwise, output_url is assumed to be a directory /// to which each output partition is written to its own output file - pub per_thread_output: bool, + pub single_file_output: bool, /// Arbitrary options as tuples - pub options: Vec<(String, String)>, + pub statement_options: StatementOptions, } /// The operator that modifies the content of a database (adapted from diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ab691c1366a4..b17db245e6b1 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1117,16 +1117,18 @@ impl LogicalPlan { input: _, output_url, file_format, - per_thread_output, - options, + single_file_output, + statement_options, }) => { - let op_str = options + let op_str = statement_options + .clone() + .into_inner() .iter() .map(|(k, v)| format!("{k} {v}")) .collect::>() .join(", "); - write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: ({op_str})") + write!(f, "CopyTo: format={file_format} output_url={output_url} single_file_output={single_file_output} options: ({op_str})") } LogicalPlan::Ddl(ddl) => { write!(f, "{}", ddl.display()) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index aae0b3a1b018..fd6a65b312c6 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -755,14 +755,14 @@ pub fn from_plan( input: _, output_url, file_format, - per_thread_output, - options, + single_file_output, + statement_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs[0].clone()), output_url: output_url.clone(), file_format: file_format.clone(), - per_thread_output: *per_thread_output, - options: options.clone(), + single_file_output: *single_file_output, + statement_options: statement_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index ff7d74de1c09..d3b18fbfadbf 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -46,9 +46,8 @@ use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec}; use datafusion::physical_plan::{ udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr, }; -use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, FileCompressionType, Result, -}; +use datafusion_common::FileCompressionType; +use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use prost::bytes::BufMut; use prost::Message; diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index c160388acffb..655442d7e353 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -25,10 +25,11 @@ use crate::planner::{ use crate::utils::normalize_ident; use arrow_schema::DataType; +use datafusion_common::file_options::StatementOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, unqualified_field_not_found, Column, Constraints, DFField, DFSchema, - DFSchemaRef, DataFusionError, ExprSchema, FileType, OwnedTableReference, Result, + DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema, }; use datafusion_expr::dml::CopyTo; @@ -56,8 +57,6 @@ use sqlparser::parser::ParserError::ParserError; use datafusion_common::plan_err; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::path::Path; -use std::str::FromStr; use std::sync::Arc; fn ident_to_string(ident: &Ident) -> String { @@ -603,50 +602,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } }; - // convert options to lowercase strings, check for explicitly set "format" option - let mut options = vec![]; - let mut explicit_format = None; - // default behavior is to assume the user is specifying a single file to which - // we should output all data regardless of input partitioning. - let mut per_thread_output: bool = false; - for (key, value) in statement.options { - let (k, v) = (key.to_lowercase(), value.to_string().to_lowercase()); - // check for options important to planning - if k == "format" { - explicit_format = Some(FileType::from_str(&v)?); - } - if k == "per_thread_output" { - per_thread_output = match v.as_str(){ - "true" => true, - "false" => false, - _ => return Err(DataFusionError::Plan( - format!("Copy to option 'per_thread_output' must be true or false, got {value}") - )) - } - } - options.push((k, v)); - } - let format = match explicit_format { - Some(file_format) => file_format, - None => { - // try to infer file format from file extension - let extension: &str = &Path::new(&statement.target) - .extension() - .ok_or( - DataFusionError::Plan("Copy To format not explicitly set and unable to get file extension!".to_string()))? - .to_str() - .ok_or(DataFusionError::Plan("Copy to format not explicitly set and failed to parse file extension!".to_string()))? - .to_lowercase(); - - FileType::from_str(extension)? - } - }; + // TODO, parse options as Vec<(String, String)> to avoid this conversion + let options = statement + .options + .iter() + .map(|(s, v)| (s.to_owned(), v.to_string())) + .collect::>(); + + let mut statement_options = StatementOptions::new(options); + let file_format = statement_options.try_infer_file_type(&statement.target)?; + let single_file_output = + statement_options.take_bool_option("single_file_output")?; + + // COPY defaults to outputting a single file if not otherwise specified + let single_file_output = single_file_output.unwrap_or(true); + Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url: statement.target, - file_format: format, - per_thread_output, - options, + file_format, + single_file_output, + statement_options, })) } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 8b4d9686a4f5..07112184bf59 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -372,7 +372,7 @@ fn plan_rollback_transaction_chained() { fn plan_copy_to() { let sql = "COPY test_decimal to 'output.csv'"; let plan = r#" -CopyTo: format=csv output_url=output.csv per_thread_output=false options: () +CopyTo: format=csv output_url=output.csv single_file_output=true options: () TableScan: test_decimal "# .trim(); @@ -384,7 +384,7 @@ fn plan_explain_copy_to() { let sql = "EXPLAIN COPY test_decimal to 'output.csv'"; let plan = r#" Explain - CopyTo: format=csv output_url=output.csv per_thread_output=false options: () + CopyTo: format=csv output_url=output.csv single_file_output=true options: () TableScan: test_decimal "# .trim(); @@ -395,7 +395,7 @@ Explain fn plan_copy_to_query() { let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'"; let plan = r#" -CopyTo: format=csv output_url=output.csv per_thread_output=false options: () +CopyTo: format=csv output_url=output.csv single_file_output=true options: () Limit: skip=0, fetch=10 Projection: test_decimal.id, test_decimal.price TableScan: test_decimal diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index d13caa47c17d..f095552dadf2 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -21,27 +21,32 @@ create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, # Copy to directory as multiple files query IT -COPY source_table TO 'test_files/scratch/copy/table' (format parquet, per_thread_output true); +COPY source_table TO 'test_files/scratch/copy/table' (format parquet, single_file_output false, compression 'zstd(10)'); ---- 2 -# Error case -query error DataFusion error: Error during planning: Copy To format not explicitly set and unable to get file extension! -EXPLAIN COPY source_table to 'test_files/scratch/copy/table' - query TT -EXPLAIN COPY source_table to 'test_files/scratch/copy/table' (format parquet, per_thread_output true) +EXPLAIN COPY source_table TO 'test_files/scratch/copy/table' (format parquet, single_file_output false, compression 'zstd(10)'); ---- logical_plan -CopyTo: format=parquet output_url=test_files/scratch/copy/table per_thread_output=true options: (format parquet, per_thread_output true) +CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_output=false options: (compression 'zstd(10)') --TableScan: source_table projection=[col1, col2] physical_plan InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +# Error case +query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension! +EXPLAIN COPY source_table to 'test_files/scratch/copy/table' + +query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: query"\) +EXPLAIN COPY source_table to 'test_files/scratch/copy/table' (format parquet, single_file_output false) +query TT +EXPLAIN COPY source_table to 'test_files/scratch/copy/table' (format parquet, per_thread_output true) + # Copy more files to directory via query query IT -COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table' (format parquet, per_thread_output true); +COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table' (format parquet, single_file_output false); ---- 4 @@ -59,6 +64,42 @@ select * from validate_parquet; 1 Foo 2 Bar +# Copy parquet with all supported statment overrides +query IT +COPY source_table +TO 'test_files/scratch/copy/table_with_options' +(format parquet, +single_file_output false, +compression 'snappy', +max_row_group_size 12345, +data_pagesize_limit 1234, +write_batch_size 1234, +writer_version 2.0, +dictionary_page_size_limit 123, +created_by 'DF copy.slt', +column_index_truncate_length 123, +data_page_row_count_limit 1234, +bloom_filter_enabled true, +encoding plain, +dictionary_enabled false, +statistics_enabled page, +max_statistics_size 123, +bloom_filter_fpp 0.001, +bloom_filter_ndv 100 +) +---- +2 + +# validate multiple parquet file output with all options set +statement ok +CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/'; + +query IT +select * from validate_parquet_with_options; +---- +1 Foo +2 Bar + # Copy from table to single file query IT COPY source_table to 'test_files/scratch/copy/table.parquet'; @@ -75,15 +116,31 @@ select * from validate_parquet_single; 1 Foo 2 Bar -# copy from table to folder of csv files +# copy from table to folder of compressed json files +query IT +COPY source_table to 'test_files/scratch/copy/table_json_gz' (format json, single_file_output false, compression 'gzip'); +---- +2 + +# validate folder of csv files +statement ok +CREATE EXTERNAL TABLE validate_json_gz STORED AS json COMPRESSION TYPE gzip LOCATION 'test_files/scratch/copy/table_json_gz'; + +query IT +select * from validate_json_gz; +---- +1 Foo +2 Bar + +# copy from table to folder of compressed csv files query IT -COPY source_table to 'test_files/scratch/copy/table_csv' (format csv, per_thread_output true); +COPY source_table to 'test_files/scratch/copy/table_csv' (format csv, single_file_output false, header false, compression 'gzip'); ---- 2 # validate folder of csv files statement ok -CREATE EXTERNAL TABLE validate_csv STORED AS csv WITH HEADER ROW LOCATION 'test_files/scratch/copy/table_csv'; +CREATE EXTERNAL TABLE validate_csv STORED AS csv COMPRESSION TYPE gzip LOCATION 'test_files/scratch/copy/table_csv'; query IT select * from validate_csv; @@ -109,7 +166,7 @@ select * from validate_single_csv; # Copy from table to folder of json query IT -COPY source_table to 'test_files/scratch/copy/table_json' (format json, per_thread_output true); +COPY source_table to 'test_files/scratch/copy/table_json' (format json, single_file_output false); ---- 2 @@ -139,21 +196,38 @@ select * from validate_single_json; 1 Foo 2 Bar -# Copy from table with options +# COPY csv files with all options set query IT -COPY source_table to 'test_files/scratch/copy/table.json' (row_group_size 55); +COPY source_table +to 'test_files/scratch/copy/table_csv_with_options' +(format csv, +single_file_output false, +header false, +compression 'uncompressed', +datetime_format '%FT%H:%M:%S.%9f', +delimeter ';', +null_value 'NULLVAL'); ---- 2 -# Copy from table with options (and trailing comma) -query IT -COPY source_table to 'test_files/scratch/copy/table.json' (row_group_size 55, row_group_limit_bytes 9,); ----- -2 +# Validate single csv output +statement ok +CREATE EXTERNAL TABLE validate_csv_with_options +STORED AS csv +LOCATION 'test_files/scratch/copy/table_csv_with_options'; +query T +select * from validate_csv_with_options; +---- +1;Foo +2;Bar # Error cases: +# Copy from table with options +query error DataFusion error: Invalid or Unsupported Configuration: Found unsupported option row_group_size with value 55 for JSON format! +COPY source_table to 'test_files/scratch/copy/table.json' (row_group_size 55); + # Incomplete statement query error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\) COPY (select col2, sum(col1) from source_table