From 29c96b44c95e2b8e6e740d1b8d28ae5944573ccd Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 23 Aug 2023 15:58:38 -0400 Subject: [PATCH 1/7] squash, merge main --- datafusion/common/src/error.rs | 7 + .../common/src/file_options/arrow_writer.rs | 36 ++ .../common/src/file_options/avro_writer.rs | 36 ++ .../common/src/file_options/csv_writer.rs | 91 ++++ .../common/src/file_options/file_type.rs | 420 ++++++++++++++++++ .../common/src/file_options/json_writer.rs | 53 +++ datafusion/common/src/file_options/mod.rs | 270 +++++++++++ .../common/src/file_options/parquet_writer.rs | 163 +++++++ .../common/src/file_options/parse_utils.rs | 183 ++++++++ datafusion/common/src/lib.rs | 4 +- datafusion/core/src/dataframe.rs | 13 +- .../core/src/datasource/file_format/arrow.rs | 6 +- .../core/src/datasource/file_format/avro.rs | 5 + .../core/src/datasource/file_format/csv.rs | 84 ++-- .../core/src/datasource/file_format/json.rs | 27 +- .../core/src/datasource/file_format/mod.rs | 5 +- .../src/datasource/file_format/parquet.rs | 228 +--------- .../core/src/datasource/file_format/write.rs | 16 +- .../core/src/datasource/listing/table.rs | 42 +- .../src/datasource/listing_table_factory.rs | 85 ++-- .../core/src/datasource/physical_plan/csv.rs | 2 + .../core/src/datasource/physical_plan/mod.rs | 9 +- datafusion/core/src/physical_planner.rs | 17 +- datafusion/expr/src/logical_plan/builder.rs | 9 +- datafusion/expr/src/logical_plan/dml.rs | 8 +- datafusion/expr/src/logical_plan/plan.rs | 10 +- datafusion/expr/src/utils.rs | 8 +- datafusion/proto/src/physical_plan/mod.rs | 5 +- datafusion/sql/src/statement.rs | 64 +-- datafusion/sql/tests/sql_integration.rs | 6 +- datafusion/sqllogictest/test_files/copy.slt | 60 ++- 31 files changed, 1552 insertions(+), 420 deletions(-) create mode 100644 datafusion/common/src/file_options/arrow_writer.rs create mode 100644 datafusion/common/src/file_options/avro_writer.rs create mode 100644 datafusion/common/src/file_options/csv_writer.rs create mode 100644 datafusion/common/src/file_options/file_type.rs create mode 100644 datafusion/common/src/file_options/json_writer.rs create mode 100644 datafusion/common/src/file_options/mod.rs create mode 100644 datafusion/common/src/file_options/parquet_writer.rs create mode 100644 datafusion/common/src/file_options/parse_utils.rs diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index f186abc1d2f9..4535d5a4e58b 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -73,6 +73,9 @@ pub enum DataFusionError { /// This error happens whenever a plan is not valid. Examples include /// impossible casts. Plan(String), + /// This error happens when an invalid or unsupported option is passed + /// in a SQL statement + InvalidOption(String), /// This error happens with schema-related errors, such as schema inference not possible /// and non-unique column names. SchemaError(SchemaError), @@ -288,6 +291,9 @@ impl Display for DataFusionError { DataFusionError::SQL(ref desc) => { write!(f, "SQL error: {desc:?}") } + DataFusionError::InvalidOption(ref desc) => { + write!(f, "Invalid Option: {desc}") + } DataFusionError::NotImplemented(ref desc) => { write!(f, "This feature is not implemented: {desc}") } @@ -338,6 +344,7 @@ impl Error for DataFusionError { DataFusionError::SQL(e) => Some(e), DataFusionError::NotImplemented(_) => None, DataFusionError::Internal(_) => None, + DataFusionError::InvalidOption(_) => None, DataFusionError::Plan(_) => None, DataFusionError::SchemaError(e) => Some(e), DataFusionError::Execution(_) => None, diff --git a/datafusion/common/src/file_options/arrow_writer.rs b/datafusion/common/src/file_options/arrow_writer.rs new file mode 100644 index 000000000000..a7a5ad51d0d5 --- /dev/null +++ b/datafusion/common/src/file_options/arrow_writer.rs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how json files should be written + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, +}; + +use super::StatementOptions; + +#[derive(Clone, Debug)] +pub struct ArrowWriterOptions {} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for ArrowWriterOptions { + type Error = DataFusionError; + + fn try_from(_value: (&ConfigOptions, &StatementOptions)) -> Result { + Ok(ArrowWriterOptions {}) + } +} diff --git a/datafusion/common/src/file_options/avro_writer.rs b/datafusion/common/src/file_options/avro_writer.rs new file mode 100644 index 000000000000..2e3a64705842 --- /dev/null +++ b/datafusion/common/src/file_options/avro_writer.rs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how avro files should be written + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, +}; + +use super::StatementOptions; + +#[derive(Clone, Debug)] +pub struct AvroWriterOptions {} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for AvroWriterOptions { + type Error = DataFusionError; + + fn try_from(_value: (&ConfigOptions, &StatementOptions)) -> Result { + Ok(AvroWriterOptions {}) + } +} diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs new file mode 100644 index 000000000000..8e1077978cb8 --- /dev/null +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how csv files should be written + +use std::str::FromStr; + +use arrow::csv::WriterBuilder; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing CSV files +#[derive(Clone, Debug)] +pub struct CsvWriterOptions { + /// Struct from the arrow crate which contains all csv writing related settings + pub writer_options: WriterBuilder, + /// Compression to apply after ArrowWriter serializes RecordBatches. + /// This compression is applied by DataFusion not the ArrowWriter itself. + pub compression: CompressionTypeVariant, + /// Indicates weather WriterBuilder.has_header() is set to true. + /// This is duplicative as WriterBuilder also stores this information. + /// However, WriterBuilder does not allow public read access to the + /// has_header parameter. + pub has_header: bool, + // TODO: expose a way to read has_header in arrow create +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { + type Error = DataFusionError; + + fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result { + let _configs = value.0; + let statement_options = value.1; + let mut has_header = true; + let mut builder = WriterBuilder::default(); + let mut compression = CompressionTypeVariant::UNCOMPRESSED; + for (option, value) in &statement_options.options { + builder = match option.to_lowercase().as_str(){ + "header" => { + has_header = value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?; + builder.has_headers(has_header) + }, + "date_format" => builder.with_date_format(value.to_owned()), + "datetime_format" => builder.with_datetime_format(value.to_owned()), + "timestamp_format" => builder.with_timestamp_format(value.to_owned()), + "time_format" => builder.with_time_format(value.to_owned()), + "rfc3339" => { + let value_bool = value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?; + if value_bool{ + builder.with_rfc3339() + } else{ + builder + } + }, + "null_value" => builder.with_null(value.to_owned()), + "compression" => { + compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?; + builder + }, + _ => return Err(DataFusionError::InvalidOption(format!("Found unsupported option {option} with value {value} for CSV format!"))) + } + } + Ok(CsvWriterOptions { + has_header, + writer_options: builder, + compression, + }) + } +} diff --git a/datafusion/common/src/file_options/file_type.rs b/datafusion/common/src/file_options/file_type.rs new file mode 100644 index 000000000000..7d12a267cacb --- /dev/null +++ b/datafusion/common/src/file_options/file_type.rs @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File type abstraction + +use crate::error::{DataFusionError, Result}; +#[cfg(feature = "compression")] +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder, + GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder, + XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder, + ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder, +}; + +use crate::parsers::CompressionTypeVariant; +#[cfg(feature = "compression")] +use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder}; +use bytes::Bytes; +#[cfg(feature = "compression")] +use bzip2::read::MultiBzDecoder; +#[cfg(feature = "compression")] +use flate2::read::MultiGzDecoder; + +use core::fmt; +use futures::stream::BoxStream; +use futures::StreamExt; +#[cfg(feature = "compression")] +use futures::TryStreamExt; +use std::fmt::Display; +use std::str::FromStr; +use tokio::io::AsyncWrite; +#[cfg(feature = "compression")] +use tokio_util::io::{ReaderStream, StreamReader}; +#[cfg(feature = "compression")] +use xz2::read::XzDecoder; +#[cfg(feature = "compression")] +use zstd::Decoder as ZstdDecoder; +use CompressionTypeVariant::*; + +/// The default file extension of arrow files +pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow"; +/// The default file extension of avro files +pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; +/// The default file extension of csv files +pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; +/// The default file extension of json files +pub const DEFAULT_JSON_EXTENSION: &str = ".json"; +/// The default file extension of parquet files +pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; + +/// Define each `FileType`/`FileCompressionType`'s extension +pub trait GetExt { + /// File extension getter + fn get_ext(&self) -> String; +} + +/// Readable file compression type +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct FileCompressionType { + variant: CompressionTypeVariant, +} + +impl GetExt for FileCompressionType { + fn get_ext(&self) -> String { + match self.variant { + GZIP => ".gz".to_owned(), + BZIP2 => ".bz2".to_owned(), + XZ => ".xz".to_owned(), + ZSTD => ".zst".to_owned(), + UNCOMPRESSED => "".to_owned(), + } + } +} + +impl From for FileCompressionType { + fn from(t: CompressionTypeVariant) -> Self { + Self { variant: t } + } +} + +impl FromStr for FileCompressionType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let variant = CompressionTypeVariant::from_str(s).map_err(|_| { + DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}")) + })?; + Ok(Self { variant }) + } +} + +/// `FileCompressionType` implementation +impl FileCompressionType { + /// Gzip-ed file + pub const GZIP: Self = Self { variant: GZIP }; + + /// Bzip2-ed file + pub const BZIP2: Self = Self { variant: BZIP2 }; + + /// Xz-ed file (liblzma) + pub const XZ: Self = Self { variant: XZ }; + + /// Zstd-ed file + pub const ZSTD: Self = Self { variant: ZSTD }; + + /// Uncompressed file + pub const UNCOMPRESSED: Self = Self { + variant: UNCOMPRESSED, + }; + + /// The file is compressed or not + pub const fn is_compressed(&self) -> bool { + self.variant.is_compressed() + } + + /// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`. + pub fn convert_to_compress_stream( + &self, + s: BoxStream<'static, Result>, + ) -> Result>> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => s.boxed(), + }) + } + + /// Wrap the given `AsyncWrite` so that it performs compressed writes + /// according to this `FileCompressionType`. + pub fn convert_async_writer( + &self, + w: Box, + ) -> Result> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => Box::new(GzipEncoder::new(w)), + #[cfg(feature = "compression")] + BZIP2 => Box::new(BzEncoder::new(w)), + #[cfg(feature = "compression")] + XZ => Box::new(XzEncoder::new(w)), + #[cfg(feature = "compression")] + ZSTD => Box::new(ZstdEncoder::new(w)), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => w, + }) + } + + /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. + pub fn convert_stream( + &self, + s: BoxStream<'static, Result>, + ) -> Result>> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => s.boxed(), + }) + } + + /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`. + pub fn convert_read( + &self, + r: T, + ) -> Result> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => Box::new(MultiGzDecoder::new(r)), + #[cfg(feature = "compression")] + BZIP2 => Box::new(MultiBzDecoder::new(r)), + #[cfg(feature = "compression")] + XZ => Box::new(XzDecoder::new_multi_decoder(r)), + #[cfg(feature = "compression")] + ZSTD => match ZstdDecoder::new(r) { + Ok(decoder) => Box::new(decoder), + Err(e) => return Err(DataFusionError::External(Box::new(e))), + }, + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => Box::new(r), + }) + } +} + +/// Readable file type +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum FileType { + /// Apache Arrow file + ARROW, + /// Apache Avro file + AVRO, + /// Apache Parquet file + PARQUET, + /// CSV file + CSV, + /// JSON file + JSON, +} + +impl GetExt for FileType { + fn get_ext(&self) -> String { + match self { + FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), + FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), + FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), + FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), + FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(), + } + } +} + +impl Display for FileType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let out = match self { + FileType::CSV => "csv", + FileType::JSON => "json", + FileType::PARQUET => "parquet", + FileType::AVRO => "avro", + FileType::ARROW => "arrow", + }; + write!(f, "{}", out) + } +} + +impl FromStr for FileType { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let s = s.to_uppercase(); + match s.as_str() { + "ARROW" => Ok(FileType::ARROW), + "AVRO" => Ok(FileType::AVRO), + "PARQUET" => Ok(FileType::PARQUET), + "CSV" => Ok(FileType::CSV), + "JSON" | "NDJSON" => Ok(FileType::JSON), + _ => Err(DataFusionError::NotImplemented(format!( + "Unknown FileType: {s}" + ))), + } + } +} + +impl FileType { + /// Given a `FileCompressionType`, return the `FileType`'s extension with compression suffix + pub fn get_ext_with_compression(&self, c: FileCompressionType) -> Result { + let ext = self.get_ext(); + + match self { + FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), + FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant { + UNCOMPRESSED => Ok(ext), + _ => Err(DataFusionError::Internal( + "FileCompressionType can be specified for CSV/JSON FileType.".into(), + )), + }, + } + } +} + +#[cfg(test)] +mod tests { + use crate::error::DataFusionError; + use crate::file_options::file_type::{FileCompressionType, FileType}; + use std::str::FromStr; + + #[test] + fn get_ext_with_compression() { + for (file_type, compression, extension) in [ + (FileType::CSV, FileCompressionType::UNCOMPRESSED, ".csv"), + (FileType::CSV, FileCompressionType::GZIP, ".csv.gz"), + (FileType::CSV, FileCompressionType::XZ, ".csv.xz"), + (FileType::CSV, FileCompressionType::BZIP2, ".csv.bz2"), + (FileType::CSV, FileCompressionType::ZSTD, ".csv.zst"), + (FileType::JSON, FileCompressionType::UNCOMPRESSED, ".json"), + (FileType::JSON, FileCompressionType::GZIP, ".json.gz"), + (FileType::JSON, FileCompressionType::XZ, ".json.xz"), + (FileType::JSON, FileCompressionType::BZIP2, ".json.bz2"), + (FileType::JSON, FileCompressionType::ZSTD, ".json.zst"), + ] { + assert_eq!( + file_type.get_ext_with_compression(compression).unwrap(), + extension + ); + } + + // Cannot specify compression for these file types + for (file_type, extension) in + [(FileType::AVRO, ".avro"), (FileType::PARQUET, ".parquet")] + { + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::UNCOMPRESSED) + .unwrap(), + extension + ); + for compression in [ + FileCompressionType::GZIP, + FileCompressionType::XZ, + FileCompressionType::BZIP2, + FileCompressionType::ZSTD, + ] { + assert!(matches!( + file_type.get_ext_with_compression(compression), + Err(DataFusionError::Internal(_)) + )); + } + } + } + + #[test] + fn from_str() { + for (ext, file_type) in [ + ("csv", FileType::CSV), + ("CSV", FileType::CSV), + ("json", FileType::JSON), + ("JSON", FileType::JSON), + ("avro", FileType::AVRO), + ("AVRO", FileType::AVRO), + ("parquet", FileType::PARQUET), + ("PARQUET", FileType::PARQUET), + ] { + assert_eq!(FileType::from_str(ext).unwrap(), file_type); + } + + assert!(matches!( + FileType::from_str("Unknown"), + Err(DataFusionError::NotImplemented(_)) + )); + + for (ext, compression_type) in [ + ("gz", FileCompressionType::GZIP), + ("GZ", FileCompressionType::GZIP), + ("gzip", FileCompressionType::GZIP), + ("GZIP", FileCompressionType::GZIP), + ("xz", FileCompressionType::XZ), + ("XZ", FileCompressionType::XZ), + ("bz2", FileCompressionType::BZIP2), + ("BZ2", FileCompressionType::BZIP2), + ("bzip2", FileCompressionType::BZIP2), + ("BZIP2", FileCompressionType::BZIP2), + ("zst", FileCompressionType::ZSTD), + ("ZST", FileCompressionType::ZSTD), + ("zstd", FileCompressionType::ZSTD), + ("ZSTD", FileCompressionType::ZSTD), + ("", FileCompressionType::UNCOMPRESSED), + ] { + assert_eq!( + FileCompressionType::from_str(ext).unwrap(), + compression_type + ); + } + + assert!(matches!( + FileCompressionType::from_str("Unknown"), + Err(DataFusionError::NotImplemented(_)) + )); + } +} diff --git a/datafusion/common/src/file_options/json_writer.rs b/datafusion/common/src/file_options/json_writer.rs new file mode 100644 index 000000000000..a788ad26d0a3 --- /dev/null +++ b/datafusion/common/src/file_options/json_writer.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how json files should be written + +use std::str::FromStr; + +use crate::{ + config::ConfigOptions, + error::{DataFusionError, Result}, + parsers::CompressionTypeVariant, +}; + +use super::StatementOptions; + +/// Options for writing JSON files +#[derive(Clone, Debug)] +pub struct JsonWriterOptions { + pub compression: CompressionTypeVariant, +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions { + type Error = DataFusionError; + + fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result { + let _configs = value.0; + let statement_options = value.1; + let mut compression = CompressionTypeVariant::UNCOMPRESSED; + for (option, value) in &statement_options.options { + match option.to_lowercase().as_str(){ + "compression" => { + compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?; + }, + _ => return Err(DataFusionError::InvalidOption(format!("Found unsupported option {option} with value {value} for JSON format!"))) + } + } + Ok(JsonWriterOptions { compression }) + } +} diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs new file mode 100644 index 000000000000..016522afe7ff --- /dev/null +++ b/datafusion/common/src/file_options/mod.rs @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how files should be written + +pub mod arrow_writer; +pub mod avro_writer; +pub mod csv_writer; +pub mod file_type; +pub mod json_writer; +pub mod parquet_writer; +pub(crate) mod parse_utils; + +use std::{collections::HashMap, path::Path, str::FromStr}; + +use crate::{ + config::ConfigOptions, file_options::parse_utils::parse_boolean_string, + DataFusionError, FileType, Result, +}; + +use self::{ + arrow_writer::ArrowWriterOptions, avro_writer::AvroWriterOptions, + csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, + parquet_writer::ParquetWriterOptions, +}; + +/// Represents a single arbitrary setting in a +/// [StatementOptions] where OptionTuple.0 determines +/// the specific setting to be modified and OptionTuple.1 +/// determines the value which should be applied +pub type OptionTuple = (String, String); + +/// Represents arbitrary tuples of options passed as String +/// tuples from SQL statements. As in the following statement: +/// COPY ... TO ... (setting1 value1, setting2 value2, ...) +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct StatementOptions { + options: Vec, +} + +/// Useful for conversion from external tables which use Hashmap +impl From<&HashMap> for StatementOptions { + fn from(value: &HashMap) -> Self { + Self { + options: value + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::>(), + } + } +} + +impl StatementOptions { + pub fn new(options: Vec) -> Self { + Self { options } + } + + pub fn into_inner(self) -> Vec { + self.options + } + + /// Scans for option and if it exists removes it and attempts to parse as a boolean + /// Returns none if it does not exist. + pub fn get_bool_option(&mut self, find: &str) -> Result> { + let maybe_option = self.scan_and_remove_option(find); + maybe_option + .map(|(_, v)| parse_boolean_string(find, v)) + .transpose() + } + + /// Scans for option and if it exists removes it and returns it + /// Returns none if it does not exist + pub fn get_str_option(&mut self, find: &str) -> Option { + let maybe_option = self.scan_and_remove_option(find); + maybe_option.map(|(_, v)| v) + } + + /// Infers the file_type given a target and arbitrary options. + /// If the options contain an explicit "format" option, that will be used. + /// Otherwise, attempt to infer file_type from the extension of target. + /// Finally, return an error if unable to determine the file_type + /// If found, format is removed from the options list. + pub fn try_infer_file_type(&mut self, target: &str) -> Result { + let explicit_format = self.scan_and_remove_option("format"); + let format = match explicit_format { + Some(s) => FileType::from_str(s.1.as_str()), + None => { + // try to infer file format from file extension + let extension: &str = &Path::new(target) + .extension() + .ok_or(DataFusionError::InvalidOption( + "Format not explicitly set and unable to get file extension!" + .to_string(), + ))? + .to_str() + .ok_or(DataFusionError::InvalidOption( + "Format not explicitly set and failed to parse file extension!" + .to_string(), + ))? + .to_lowercase(); + + FileType::from_str(extension) + } + }?; + + Ok(format) + } + + /// Finds an option in StatementOptions if exists, removes and returns it + /// along with the vec of remaining options. + fn scan_and_remove_option(&mut self, find: &str) -> Option { + let idx = self + .options + .iter() + .position(|(k, _)| k.to_lowercase() == find.to_lowercase()); + match idx { + Some(i) => Some(self.options.swap_remove(i)), + None => None, + } + } +} + + +/// This type contains all options needed to initialize a particular +/// RecordBatchWriter type. Each element in the enum contains a thin wrapper +/// around a "writer builder" type (e.g. arrow::csv::WriterBuilder) +/// plus any DataFusion specific writing options (e.g. CSV compression) +#[derive(Clone, Debug)] +pub enum FileTypeWriterOptions { + Parquet(ParquetWriterOptions), + CSV(CsvWriterOptions), + JSON(JsonWriterOptions), + Avro(AvroWriterOptions), + Arrow(ArrowWriterOptions), +} + +impl FileTypeWriterOptions { + /// Constructs a FileTypeWriterOptions given a FileType to be written + /// and arbitrary String tuple options. May return an error if any + /// string setting is unrecognized or unsupported. + pub fn build( + file_type: &FileType, + config_defaults: &ConfigOptions, + statement_options: &StatementOptions, + ) -> Result { + let options = (config_defaults, statement_options); + + let file_type_write_options = match file_type { + FileType::PARQUET => { + FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) + } + FileType::CSV => { + FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) + } + FileType::JSON => { + FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) + } + FileType::AVRO => { + FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) + } + FileType::ARROW => { + FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) + } + }; + + Ok(file_type_write_options) + } + + /// Constructs a FileTypeWriterOptions from session defaults only. + pub fn build_default( + file_type: &FileType, + config_defaults: &ConfigOptions, + ) -> Result { + let empty_statement = StatementOptions::new(vec![]); + let options = (config_defaults, &empty_statement); + + let file_type_write_options = match file_type { + FileType::PARQUET => { + FileTypeWriterOptions::Parquet(ParquetWriterOptions::try_from(options)?) + } + FileType::CSV => { + FileTypeWriterOptions::CSV(CsvWriterOptions::try_from(options)?) + } + FileType::JSON => { + FileTypeWriterOptions::JSON(JsonWriterOptions::try_from(options)?) + } + FileType::AVRO => { + FileTypeWriterOptions::Avro(AvroWriterOptions::try_from(options)?) + } + FileType::ARROW => { + FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?) + } + }; + + Ok(file_type_write_options) + } + + /// Tries to extract ParquetWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from parquet is set. + pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { + match self { + FileTypeWriterOptions::Parquet(opt) => Ok(opt), + _ => Err(DataFusionError::Internal( + "Expected parquet options but found options for a different FileType!" + .into(), + )), + } + } + + /// Tries to extract CsvWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from csv is set. + pub fn try_into_csv(&self) -> Result<&CsvWriterOptions> { + match self { + FileTypeWriterOptions::CSV(opt) => Ok(opt), + _ => Err(DataFusionError::Internal( + "Expected csv options but found options for a different FileType!".into(), + )), + } + } + + /// Tries to extract JsonWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from json is set. + pub fn try_into_json(&self) -> Result<&JsonWriterOptions> { + match self { + FileTypeWriterOptions::JSON(opt) => Ok(opt), + _ => Err(DataFusionError::Internal( + "Expected json options but found options for a different FileType!" + .into(), + )), + } + } + + /// Tries to extract AvroWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from avro is set. + pub fn try_into_avro(&self) -> Result<&AvroWriterOptions> { + match self { + FileTypeWriterOptions::Avro(opt) => Ok(opt), + _ => Err(DataFusionError::Internal( + "Expected avro options but found options for a different FileType!" + .into(), + )), + } + } + + /// Tries to extract ArrowWriterOptions from this FileTypeWriterOptions enum. + /// Returns an error if a different type from arrow is set. + pub fn try_into_arrow(&self) -> Result<&ArrowWriterOptions> { + match self { + FileTypeWriterOptions::Arrow(opt) => Ok(opt), + _ => Err(DataFusionError::Internal( + "Expected arrow options but found options for a different FileType!" + .into(), + )), + } + } +} diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs new file mode 100644 index 000000000000..a97e218ba1c2 --- /dev/null +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options related to how parquet files should be written + +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; + +use crate::{ + config::ConfigOptions, + file_options::parse_utils::{ + parse_compression_string, parse_encoding_string, parse_statistics_string, + parse_version_string, + }, + DataFusionError, Result, +}; + +use super::StatementOptions; + +/// Options for writing parquet files +#[derive(Clone, Debug)] +pub struct ParquetWriterOptions { + pub writer_options: WriterProperties, +} + +impl ParquetWriterOptions { + pub fn writer_options(&self) -> &WriterProperties { + &self.writer_options + } +} + +/// Constructs a default Parquet WriterPropertiesBuilder using +/// Session level ConfigOptions to initialize settings +fn default_builder(options: &ConfigOptions) -> Result { + let parquet_context = &options.execution.parquet; + let mut builder = WriterProperties::builder() + .set_data_page_size_limit(parquet_context.data_pagesize_limit) + .set_write_batch_size(parquet_context.write_batch_size) + .set_writer_version(parse_version_string(&parquet_context.writer_version)?) + .set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit) + .set_max_row_group_size(parquet_context.max_row_group_size) + .set_created_by(parquet_context.created_by.clone()) + .set_column_index_truncate_length(parquet_context.column_index_truncate_length) + .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) + .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled); + + builder = match &parquet_context.encoding { + Some(encoding) => builder.set_encoding(parse_encoding_string(encoding)?), + None => builder, + }; + + builder = match &parquet_context.dictionary_enabled { + Some(enabled) => builder.set_dictionary_enabled(*enabled), + None => builder, + }; + + builder = match &parquet_context.compression { + Some(compression) => { + builder.set_compression(parse_compression_string(compression)?) + } + None => builder, + }; + + builder = match &parquet_context.statistics_enabled { + Some(statistics) => { + builder.set_statistics_enabled(parse_statistics_string(statistics)?) + } + None => builder, + }; + + builder = match &parquet_context.max_statistics_size { + Some(size) => builder.set_max_statistics_size(*size), + None => builder, + }; + + builder = match &parquet_context.bloom_filter_fpp { + Some(fpp) => builder.set_bloom_filter_fpp(*fpp), + None => builder, + }; + + builder = match &parquet_context.bloom_filter_ndv { + Some(ndv) => builder.set_bloom_filter_ndv(*ndv), + None => builder, + }; + + Ok(builder) +} + +impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { + type Error = DataFusionError; + + fn try_from( + configs_and_statement_options: (&ConfigOptions, &StatementOptions), + ) -> Result { + let configs = configs_and_statement_options.0; + let statement_options = configs_and_statement_options.1; + let mut builder = default_builder(configs)?; + for (option, value) in &statement_options.options { + builder = match option.to_lowercase().as_str(){ + "max_row_group_size" => builder + .set_max_row_group_size(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as u64 as required for {option}!")))?), + "data_pagesize_limit" => builder + .set_data_page_size_limit(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + "write_batch_size" => builder + .set_write_batch_size(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + "writer_version" => builder + .set_writer_version(parse_version_string(value)?), + "dictionary_page_size_limit" => builder + .set_dictionary_page_size_limit(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + "created_by" => builder + .set_created_by(value.to_owned()), + "column_index_truncate_length" => builder + .set_column_index_truncate_length(Some(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?)), + "data_page_row_count_limit" => builder + .set_data_page_row_count_limit(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + "bloom_filter_enabled" => builder + .set_bloom_filter_enabled(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?), + "encoding" => builder + .set_encoding(parse_encoding_string(value)?), + "dictionary_enabled" => builder + .set_dictionary_enabled(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?), + "compression" => builder + .set_compression(parse_compression_string(value)?), + "statistics_enabled" => builder + .set_statistics_enabled(parse_statistics_string(value)?), + "max_statistics_size" => builder + .set_max_statistics_size(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + "bloom_filter_fpp" => builder + .set_bloom_filter_fpp(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as f64 as required for {option}!")))?), + "bloom_filter_ndv" => builder + .set_bloom_filter_ndv(value.parse() + .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as u64 as required for {option}!")))?), + _ => return Err(DataFusionError::InvalidOption(format!("Found unsupported option {option} with value {value} for Parquet format!"))) + } + } + Ok(ParquetWriterOptions { + writer_options: builder.build(), + }) + } +} diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs new file mode 100644 index 000000000000..6af679be08e1 --- /dev/null +++ b/datafusion/common/src/file_options/parse_utils.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions for parsing arbitrary passed strings to valid file_option settings + +use parquet::{ + basic::{BrotliLevel, GzipLevel, ZstdLevel}, + file::properties::{EnabledStatistics, WriterVersion}, +}; + +use crate::{DataFusionError, Result}; + +/// Converts a String option to a bool, or returns an error if not a valid bool string. +pub(crate) fn parse_boolean_string(option: &str, value: String) -> Result { + match value.to_lowercase().as_str() { + "true" => Ok(true), + "false" => Ok(false), + _ => Err(DataFusionError::InvalidOption(format!( + "Unsupported value {value} for option {option}! \ + Valid values are true or false!" + ))), + } +} + +/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding +pub(crate) fn parse_encoding_string( + str_setting: &str, +) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "plain" => Ok(parquet::basic::Encoding::PLAIN), + "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), + "rle" => Ok(parquet::basic::Encoding::RLE), + "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), + "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), + "delta_length_byte_array" => { + Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) + } + "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), + "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), + "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), + _ => Err(DataFusionError::InvalidOption(format!( + "Unknown or unsupported parquet encoding: \ + {str_setting}. Valid values are: plain, plain_dictionary, rle, \ + /// bit_packed, delta_binary_packed, delta_length_byte_array, \ + /// delta_byte_array, rle_dictionary, and byte_stream_split." + ))), + } +} + +/// Splits compression string into compression codec and optional compression_level +/// I.e. gzip(2) -> gzip, 2 +fn split_compression_string(str_setting: &str) -> Result<(String, Option)> { + // ignore string literal chars passed from sqlparser i.e. remove single quotes + let str_setting = str_setting.replace('\'', ""); + let split_setting = str_setting.split_once('('); + + match split_setting { + Some((codec, rh)) => { + let level = &rh[..rh.len() - 1].parse::().map_err(|_| { + DataFusionError::InvalidOption(format!( + "Could not parse compression string. \ + Got codec: {} and unknown level from {}", + codec, str_setting + )) + })?; + Ok((codec.to_owned(), Some(*level))) + } + None => Ok((str_setting.to_owned(), None)), + } +} + +/// Helper to ensure compression codecs which don't support levels +/// don't have one set. E.g. snappy(2) is invalid. +fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { + if level.is_some() { + return Err(DataFusionError::InvalidOption(format!( + "Compression {codec} does not support specifying a level" + ))); + } + Ok(()) +} + +/// Helper to ensure compression codecs which require a level +/// do have one set. E.g. zstd is invalid, zstd(3) is valid +fn require_level(codec: &str, level: Option) -> Result { + level.ok_or(DataFusionError::InvalidOption(format!( + "{codec} compression requires specifying a level such as {codec}(4)" + ))) +} + +/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression +pub(crate) fn parse_compression_string( + str_setting: &str, +) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + let (codec, level) = split_compression_string(str_setting_lower)?; + let codec = codec.as_str(); + match codec { + "uncompressed" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::UNCOMPRESSED) + } + "snappy" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::SNAPPY) + } + "gzip" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( + level, + )?)) + } + "lzo" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZO) + } + "brotli" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( + level, + )?)) + } + "lz4" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4) + } + "zstd" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + level as i32, + )?)) + } + "lz4_raw" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4_RAW) + } + _ => Err(DataFusionError::InvalidOption(format!( + "Unknown or unsupported parquet compression: \ + {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ + lzo, brotli(level), lz4, zstd(level), and lz4_raw." + ))), + } +} + +pub(crate) fn parse_version_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "1.0" => Ok(WriterVersion::PARQUET_1_0), + "2.0" => Ok(WriterVersion::PARQUET_2_0), + _ => Err(DataFusionError::InvalidOption(format!( + "Unknown or unsupported parquet writer version {str_setting} \ + valid options are '1.0' and '2.0'" + ))), + } +} + +pub(crate) fn parse_statistics_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "none" => Ok(EnabledStatistics::None), + "chunk" => Ok(EnabledStatistics::Chunk), + "page" => Ok(EnabledStatistics::Page), + _ => Err(DataFusionError::InvalidOption(format!( + "Unknown or unsupported parquet statistics setting {str_setting} \ + valid options are 'none', 'page', and 'chunk'" + ))), + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index d946da6f4f33..3391f3492f8a 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -23,7 +23,7 @@ pub mod delta; mod dfschema; pub mod display; mod error; -pub mod file_type; +pub mod file_options; pub mod format; mod functional_dependencies; mod join_type; @@ -45,7 +45,7 @@ pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, SharedResult, }; -pub use file_type::{ +pub use file_options::file_type::{ FileCompressionType, FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION, diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 29b3caff89b6..f8aa558f1c36 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -24,6 +24,7 @@ use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; +use datafusion_common::file_options::StatementOptions; use datafusion_common::{DataFusionError, FileType, SchemaError, UnnestOptions}; use parquet::file::properties::WriterProperties; @@ -999,9 +1000,9 @@ impl DataFrame { self.plan, path.into(), FileType::CSV, - true, + false, // TODO implement options - vec![], + StatementOptions::new(vec![]), )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1017,9 +1018,9 @@ impl DataFrame { self.plan, path.into(), FileType::PARQUET, - true, + false, // TODO implement options - vec![], + StatementOptions::new(vec![]), )? .build()?; DataFrame::new(self.session_state, plan).collect().await @@ -1034,9 +1035,9 @@ impl DataFrame { self.plan, path.into(), FileType::JSON, - true, + false, // TODO implement options - vec![], + StatementOptions::new(vec![]), )? .build()?; DataFrame::new(self.session_state, plan).collect().await diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 903c2ce2a208..9dc4c043b4ce 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -27,7 +27,7 @@ use crate::physical_plan::ExecutionPlan; use arrow::ipc::reader::FileReader; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; -use datafusion_common::Statistics; +use datafusion_common::{FileType, Statistics}; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; use std::any::Any; @@ -86,6 +86,10 @@ impl FileFormat for ArrowFormat { let exec = ArrowExec::new(conf); Ok(Arc::new(exec)) } + + fn file_type(&self) -> FileType { + FileType::ARROW + } } fn read_arrow_schema_from_reader(reader: R) -> Result { diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 41f192acb090..7ecea32e87f8 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion_common::FileType; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; @@ -85,6 +86,10 @@ impl FileFormat for AvroFormat { let exec = AvroExec::new(conf); Ok(Arc::new(exec)) } + + fn file_type(&self) -> FileType { + FileType::AVRO + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index c7b6852df72f..890bda03ec91 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -27,7 +27,7 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; -use datafusion_common::{not_impl_err, DataFusionError}; +use datafusion_common::{not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -273,15 +273,14 @@ impl FileFormat for CsvFormat { } let sink_schema = conf.output_schema().clone(); - let sink = Arc::new(CsvSink::new( - conf, - self.has_header, - self.delimiter, - self.file_compression_type, - )); + let sink = Arc::new(CsvSink::new(conf)); Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } + + fn file_type(&self) -> FileType { + FileType::CSV + } } impl CsvFormat { @@ -441,18 +440,11 @@ impl BatchSerializer for CsvSerializer { struct CsvSink { /// Config options for writing data config: FileSinkConfig, - has_header: bool, - delimiter: u8, - file_compression_type: FileCompressionType, } impl Debug for CsvSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CsvSink") - .field("has_header", &self.has_header) - .field("delimiter", &self.delimiter) - .field("file_compression_type", &self.file_compression_type) - .finish() + f.debug_struct("CsvSink").finish() } } @@ -473,18 +465,8 @@ impl DisplayAs for CsvSink { } impl CsvSink { - fn new( - config: FileSinkConfig, - has_header: bool, - delimiter: u8, - file_compression_type: FileCompressionType, - ) -> Self { - Self { - config, - has_header, - delimiter, - file_compression_type, - } + fn new(config: FileSinkConfig) -> Self { + Self { config } } } @@ -496,6 +478,11 @@ impl DataSink for CsvSink { context: &Arc, ) -> Result { let num_partitions = data.len(); + let writer_options = self.config.file_type_writer_options.try_into_csv()?; + let (builder, compression) = + (&writer_options.writer_options, &writer_options.compression); + let mut has_header = writer_options.has_header; + let compression = FileCompressionType::from(*compression); let object_store = context .runtime_env() @@ -505,25 +492,23 @@ impl DataSink for CsvSink { let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - if !self.config.per_thread_output { - return not_impl_err!("per_thread_output=false is not implemented for CsvSink in Append mode"); - } for file_group in &self.config.file_groups { + let mut append_builder = builder.clone(); // In append mode, consider has_header flag only when file is empty (at the start). // For other modes, use has_header flag as is. - let header = self.has_header - && (!matches!(&self.config.writer_mode, FileWriterMode::Append) - || file_group.object_meta.size == 0); - let builder = WriterBuilder::new().with_delimiter(self.delimiter); + if file_group.object_meta.size != 0 { + has_header = false; + append_builder = append_builder.has_headers(false); + } let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); + .with_builder(append_builder) + .with_header(has_header); serializers.push(Box::new(serializer)); let file = file_group.clone(); let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, file.object_meta.clone().into(), object_store.clone(), ) @@ -537,18 +522,15 @@ impl DataSink for CsvSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - match self.config.per_thread_output { - true => { + match self.config.single_file_output { + false => { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); for part_idx in 0..num_partitions { - let header = self.has_header; - let builder = - WriterBuilder::new().with_delimiter(self.delimiter); let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); + .with_builder(builder.clone()) + .with_header(has_header); serializers.push(Box::new(serializer)); let file_path = base_path .prefix() @@ -561,7 +543,7 @@ impl DataSink for CsvSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -569,12 +551,10 @@ impl DataSink for CsvSink { writers.push(writer); } } - false => { - let header = self.has_header; - let builder = WriterBuilder::new().with_delimiter(self.delimiter); + true => { let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); + .with_builder(builder.clone()) + .with_header(has_header); serializers.push(Box::new(serializer)); let file_path = base_path.prefix(); let object_meta = ObjectMeta { @@ -585,7 +565,7 @@ impl DataSink for CsvSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -600,7 +580,7 @@ impl DataSink for CsvSink { data, serializers, writers, - self.config.per_thread_output, + self.config.single_file_output, ) .await } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index e2ddda0fa511..30075c05d57a 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -22,6 +22,7 @@ use std::any::Any; use bytes::Bytes; use datafusion_common::not_impl_err; use datafusion_common::DataFusionError; +use datafusion_common::FileType; use datafusion_execution::TaskContext; use rand::distributions::Alphanumeric; use rand::distributions::DistString; @@ -184,6 +185,10 @@ impl FileFormat for JsonFormat { Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } + + fn file_type(&self) -> FileType { + FileType::JSON + } } impl Default for JsonSerializer { @@ -270,13 +275,17 @@ impl DataSink for JsonSink { .runtime_env() .object_store(&self.config.object_store_url)?; + let writer_options = self.config.file_type_writer_options.try_into_json()?; + + let compression = FileCompressionType::from(writer_options.compression); + // Construct serializer and writer for each file group let mut serializers: Vec> = vec![]; let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - if !self.config.per_thread_output { - return not_impl_err!("per_thread_output=false is not implemented for JsonSink in Append mode"); + if self.config.single_file_output { + return Err(DataFusionError::NotImplemented("single_file_output=true is not implemented for JsonSink in Append mode".into())); } for file_group in &self.config.file_groups { let serializer = JsonSerializer::new(); @@ -285,7 +294,7 @@ impl DataSink for JsonSink { let file = file_group.clone(); let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, file.object_meta.clone().into(), object_store.clone(), ) @@ -299,8 +308,8 @@ impl DataSink for JsonSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - match self.config.per_thread_output { - true => { + match self.config.single_file_output { + false => { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); @@ -318,7 +327,7 @@ impl DataSink for JsonSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -326,7 +335,7 @@ impl DataSink for JsonSink { writers.push(writer); } } - false => { + true => { let serializer = JsonSerializer::new(); serializers.push(Box::new(serializer)); let file_path = base_path.prefix(); @@ -338,7 +347,7 @@ impl DataSink for JsonSink { }; let writer = create_writer( self.config.writer_mode, - self.file_compression_type, + compression, object_meta.into(), object_store.clone(), ) @@ -353,7 +362,7 @@ impl DataSink for JsonSink { data, serializers, writers, - self.config.per_thread_output, + self.config.single_file_output, ) .await } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 9dd88cc03808..f15d1cea3132 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -39,7 +39,7 @@ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; -use datafusion_common::{not_impl_err, DataFusionError}; +use datafusion_common::{not_impl_err, DataFusionError, FileType}; use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; @@ -101,6 +101,9 @@ pub trait FileFormat: Send + Sync + fmt::Debug { ) -> Result> { not_impl_err!("Writer not implemented for this format") } + + /// Returns the FileType corresponding to this FileFormat + fn file_type(&self) -> FileType; } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 73928ce56c54..f291b7711fb1 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,7 +17,6 @@ //! Parquet format abstractions -use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use rand::distributions::DistString; use std::any::Any; use std::fmt; @@ -28,7 +27,7 @@ use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use datafusion_common::{not_impl_err, plan_err, DataFusionError}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; @@ -37,7 +36,7 @@ use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; +use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; @@ -236,6 +235,10 @@ impl FileFormat for ParquetFormat { Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) } + + fn file_type(&self) -> FileType { + FileType::PARQUET + } } fn summarize_min_max( @@ -599,212 +602,11 @@ impl DisplayAs for ParquetSink { } } -/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding -fn parse_encoding_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "plain" => Ok(parquet::basic::Encoding::PLAIN), - "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), - "rle" => Ok(parquet::basic::Encoding::RLE), - "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), - "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), - "delta_length_byte_array" => { - Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) - } - "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), - "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), - "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet encoding: \ - {str_setting}. Valid values are: plain, plain_dictionary, rle, \ - /// bit_packed, delta_binary_packed, delta_length_byte_array, \ - /// delta_byte_array, rle_dictionary, and byte_stream_split." - ))), - } -} - -/// Splits compression string into compression codec and optional compression_level -/// I.e. gzip(2) -> gzip, 2 -fn split_compression_string(str_setting: &str) -> Result<(&str, Option)> { - let split_setting = str_setting.split_once('('); - - match split_setting { - Some((codec, rh)) => { - let level = &rh[..rh.len() - 1].parse::().map_err(|_| { - DataFusionError::Plan(format!( - "Could not parse compression string. \ - Got codec: {} and unknown level from {}", - codec, str_setting - )) - })?; - Ok((codec, Some(*level))) - } - None => Ok((str_setting, None)), - } -} - -/// Helper to ensure compression codecs which don't support levels -/// don't have one set. E.g. snappy(2) is invalid. -fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { - if level.is_some() { - return Err(DataFusionError::Plan(format!( - "Compression {codec} does not support specifying a level" - ))); - } - Ok(()) -} - -/// Helper to ensure compression codecs which require a level -/// do have one set. E.g. zstd is invalid, zstd(3) is valid -fn require_level(codec: &str, level: Option) -> Result { - level.ok_or(DataFusionError::Plan(format!( - "{codec} compression requires specifying a level such as {codec}(4)" - ))) -} - -/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression -fn parse_compression_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - let (codec, level) = split_compression_string(str_setting_lower)?; - match codec { - "uncompressed" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::UNCOMPRESSED) - } - "snappy" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::SNAPPY) - } - "gzip" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( - level, - )?)) - } - "lzo" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZO) - } - "brotli" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( - level, - )?)) - } - "lz4" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZ4) - } - "zstd" => { - let level = require_level(codec, level)?; - Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( - level as i32, - )?)) - } - "lz4_raw" => { - check_level_is_none(codec, &level)?; - Ok(parquet::basic::Compression::LZ4_RAW) - } - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet compression: \ - {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ - lzo, brotli(level), lz4, zstd(level), and lz4_raw." - ))), - } -} - -fn parse_version_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "1.0" => Ok(WriterVersion::PARQUET_1_0), - "2.0" => Ok(WriterVersion::PARQUET_2_0), - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet writer version {str_setting} \ - valid options are '1.0' and '2.0'" - ))), - } -} - -fn parse_statistics_string(str_setting: &str) -> Result { - let str_setting_lower: &str = &str_setting.to_lowercase(); - match str_setting_lower { - "none" => Ok(EnabledStatistics::None), - "chunk" => Ok(EnabledStatistics::Chunk), - "page" => Ok(EnabledStatistics::Page), - _ => Err(DataFusionError::Plan(format!( - "Unknown or unsupported parquet statistics setting {str_setting} \ - valid options are 'none', 'page', and 'chunk'" - ))), - } -} - impl ParquetSink { fn new(config: FileSinkConfig) -> Self { Self { config } } - /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options. - /// May return error if SessionContext contains invalid or unsupported options - fn parquet_writer_props_from_context( - &self, - context: &Arc, - ) -> Result { - let parquet_context = &context.session_config().options().execution.parquet; - let mut builder = WriterProperties::builder() - .set_data_page_size_limit(parquet_context.data_pagesize_limit) - .set_write_batch_size(parquet_context.write_batch_size) - .set_writer_version(parse_version_string(&parquet_context.writer_version)?) - .set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit) - .set_max_row_group_size(parquet_context.max_row_group_size) - .set_created_by(parquet_context.created_by.clone()) - .set_column_index_truncate_length( - parquet_context.column_index_truncate_length, - ) - .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) - .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled); - - builder = match &parquet_context.encoding { - Some(encoding) => builder.set_encoding(parse_encoding_string(encoding)?), - None => builder, - }; - - builder = match &parquet_context.dictionary_enabled { - Some(enabled) => builder.set_dictionary_enabled(*enabled), - None => builder, - }; - - builder = match &parquet_context.compression { - Some(compression) => { - builder.set_compression(parse_compression_string(compression)?) - } - None => builder, - }; - - builder = match &parquet_context.statistics_enabled { - Some(statistics) => { - builder.set_statistics_enabled(parse_statistics_string(statistics)?) - } - None => builder, - }; - - builder = match &parquet_context.max_statistics_size { - Some(size) => builder.set_max_statistics_size(*size), - None => builder, - }; - - builder = match &parquet_context.bloom_filter_fpp { - Some(fpp) => builder.set_bloom_filter_fpp(*fpp), - None => builder, - }; - - builder = match &parquet_context.bloom_filter_ndv { - Some(ndv) => builder.set_bloom_filter_ndv(*ndv), - None => builder, - }; - - Ok(builder.build()) - } - // Create a write for parquet files async fn create_writer( &self, @@ -849,7 +651,11 @@ impl DataSink for ParquetSink { context: &Arc, ) -> Result { let num_partitions = data.len(); - let parquet_props = self.parquet_writer_props_from_context(context)?; + let parquet_props = self + .config + .file_type_writer_options + .try_into_parquet()? + .writer_options(); let object_store = context .runtime_env() @@ -869,8 +675,8 @@ impl DataSink for ParquetSink { FileWriterMode::PutMultipart => { // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) let base_path = &self.config.table_paths[0]; - match self.config.per_thread_output { - true => { + match self.config.single_file_output { + false => { // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); @@ -894,7 +700,7 @@ impl DataSink for ParquetSink { writers.push(writer); } } - false => { + true => { let file_path = base_path.prefix(); let object_meta = ObjectMeta { location: file_path.clone(), @@ -919,9 +725,9 @@ impl DataSink for ParquetSink { // TODO parallelize serialization accross partitions and batches within partitions // see: https://github.com/apache/arrow-datafusion/issues/7079 for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { - let idx = match self.config.per_thread_output { - true => part_idx, - false => 0, + let idx = match self.config.single_file_output { + false => part_idx, + true => 0, }; while let Some(batch) = data_stream.next().await.transpose()? { row_count += batch.num_rows(); diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 40060148123a..58e83443e605 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -327,14 +327,14 @@ pub(crate) async fn stateless_serialize_and_write_files( mut data: Vec, mut serializers: Vec>, mut writers: Vec>>, - per_thread_output: bool, + single_file_output: bool, ) -> Result { - if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) { - return internal_err!("per_thread_output is false, but got more than 1 writer!"); + if single_file_output && (serializers.len() != 1 || writers.len() != 1) { + return internal_err!("single_file_output is true, but got more than 1 writer!"); } let num_partitions = data.len(); - if per_thread_output && (num_partitions != writers.len()) { - return internal_err!("per_thread_output is true, but did not get 1 writer for each output partition!"); + if !single_file_output && (num_partitions != writers.len()) { + return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!"); } let mut row_count = 0; // Map errors to DatafusionError. @@ -343,9 +343,9 @@ pub(crate) async fn stateless_serialize_and_write_files( // TODO parallelize serialization accross partitions and batches within partitions // see: https://github.com/apache/arrow-datafusion/issues/7079 for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { - let idx = match per_thread_output { - true => part_idx, - false => 0, + let idx = match single_file_output { + false => part_idx, + true => 0, }; while let Some(maybe_batch) = data_stream.next().await { // Write data to files in a round robin fashion: diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e224ad2d6b58..65aac01218bc 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -25,6 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; use dashmap::DashMap; +use datafusion_common::file_options::FileTypeWriterOptions; use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema}; use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; @@ -269,8 +270,14 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, - ///This setting controls how inserts to this table should be handled + /// This setting controls how inserts to this table should be handled pub insert_mode: ListingTableInsertMode, + /// This setting when true indicates that the table is backed by a single file. + /// Any inserts to the table may only append to this existing file. + pub single_file: bool, + /// This setting holds file format specific options which should be used + /// when inserting into this table. + pub file_type_write_options: Option, } impl ListingOptions { @@ -290,6 +297,8 @@ impl ListingOptions { file_sort_order: vec![], infinite_source: false, insert_mode: ListingTableInsertMode::AppendToFile, + single_file: false, + file_type_write_options: None, } } @@ -464,6 +473,21 @@ impl ListingOptions { self } + /// Configure if this table is backed by a sigle file + pub fn with_single_file(mut self, single_file: bool) -> Self { + self.single_file = single_file; + self + } + + /// Configure file format specific writing options. + pub fn with_write_options( + mut self, + file_type_write_options: FileTypeWriterOptions, + ) -> Self { + self.file_type_write_options = Some(file_type_write_options); + self + } + /// Infer the schema of the files at the given path on the provided object store. /// The inferred schema does not include the partitioning columns. /// @@ -873,6 +897,16 @@ impl TableProvider for ListingTable { } } + let file_format = self.options().format.as_ref(); + + let file_type_writer_options = match &self.options().file_type_write_options { + Some(opt) => opt.clone(), + None => FileTypeWriterOptions::build_default( + &file_format.file_type(), + state.config_options(), + )?, + }; + // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), @@ -881,9 +915,9 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), writer_mode, - // TODO: when listing table is known to be backed by a single file, this should be false - per_thread_output: true, + single_file_output: self.options.single_file, overwrite, + file_type_writer_options, }; self.options() @@ -1828,7 +1862,7 @@ mod tests { ) .await .expect_err("Example should fail!"); - assert_eq!("Error during planning: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); + assert_eq!("Invalid Option: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); Ok(()) } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 2529c11de5a5..bd57537dd0ed 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, SchemaRef}; use async_trait::async_trait; +use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions}; use datafusion_common::DataFusionError; use datafusion_expr::CreateExternalTable; @@ -133,9 +134,21 @@ impl TableProviderFactory for ListingTableFactory { // look for 'infinite' as an option let infinite_source = cmd.unbounded; - let explicit_insert_mode = cmd.options.get("insert_mode"); + let mut statement_options = StatementOptions::from(&cmd.options); + + // Extract ListingTable specific options if present or set default + // Discard unbounded option if present + statement_options.get_str_option("unbounded"); + let create_local_path = statement_options + .get_bool_option("create_local_path")? + .unwrap_or(false); + let single_file = statement_options + .get_bool_option("single_file")? + .unwrap_or(false); + + let explicit_insert_mode = statement_options.get_str_option("insert_mode"); let insert_mode = match explicit_insert_mode { - Some(mode) => ListingTableInsertMode::from_str(mode), + Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), None => match file_type { FileType::CSV => Ok(ListingTableInsertMode::AppendToFile), FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles), @@ -145,34 +158,50 @@ impl TableProviderFactory for ListingTableFactory { }, }?; - let create_local_path_mode = cmd - .options - .get("create_local_path") - .map(|s| s.as_str()) - .unwrap_or("false"); - let single_file = cmd - .options - .get("single_file") - .map(|s| s.as_str()) - .unwrap_or("false"); - - let single_file = match single_file { - "true" => Ok(true), - "false" => Ok(false), - _ => Err(DataFusionError::Plan( - "Invalid option single_file, must be 'true' or 'false'".into(), - )), - }?; + let file_type = file_format.file_type(); + + // Use remaining options and session state to build FileTypeWriterOptions + let file_type_writer_options = FileTypeWriterOptions::build( + &file_type, + state.config_options(), + &statement_options, + )?; + + // Some options have special syntax which takes precedence + // e.g. "WITH HEADER ROW" overrides (header false, ...) + let file_type_writer_options = match file_type { + FileType::CSV => { + let mut csv_writer_options = + file_type_writer_options.try_into_csv()?.clone(); + csv_writer_options.has_header = cmd.has_header; + csv_writer_options.writer_options = csv_writer_options + .writer_options + .has_headers(cmd.has_header) + .with_delimiter(cmd.delimiter.try_into().map_err(|_| { + DataFusionError::Internal( + "Unable to convert CSV delimiter into u8".into(), + ) + })?); + csv_writer_options.compression = cmd.file_compression_type; + FileTypeWriterOptions::CSV(csv_writer_options) + } + FileType::JSON => { + let mut json_writer_options = + file_type_writer_options.try_into_json()?.clone(); + json_writer_options.compression = cmd.file_compression_type; + FileTypeWriterOptions::JSON(json_writer_options) + } + FileType::PARQUET => file_type_writer_options, + FileType::ARROW => file_type_writer_options, + FileType::AVRO => file_type_writer_options, + }; - let table_path = match create_local_path_mode { - "true" => ListingTableUrl::parse_create_local_if_not_exists( + let table_path = match create_local_path { + true => ListingTableUrl::parse_create_local_if_not_exists( &cmd.location, !single_file, ), - "false" => ListingTableUrl::parse(&cmd.location), - _ => Err(DataFusionError::Plan( - "Invalid option create_local_path, must be 'true' or 'false'".into(), - )), + false => ListingTableUrl::parse(&cmd.location), }?; let options = ListingOptions::new(file_format) @@ -182,7 +211,9 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_infinite_source(infinite_source) .with_file_sort_order(cmd.order_exprs.clone()) - .with_insert_mode(insert_mode); + .with_insert_mode(insert_mode) + .with_single_file(single_file) + .with_write_options(file_type_writer_options); let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 127714a15d1b..39f91f8265b3 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -1191,6 +1191,8 @@ mod tests { Field::new("c2", DataType::UInt64, false), ])); + println!("{out_dir}"); + // get name of first part let paths = fs::read_dir(&out_dir).unwrap(); let mut part_0_name: String = "".to_owned(); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ecaf71ff541f..0fc0c23a2fda 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -62,7 +62,7 @@ use crate::{ physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay}, }; -use datafusion_common::plan_err; +use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; use datafusion_physical_expr::expressions::Column; use arrow::compute::cast; @@ -79,7 +79,6 @@ use super::listing::ListingTableUrl; /// The base configurations to provide when creating a physical plan for /// writing to any given file format. -#[derive(Debug, Clone)] pub struct FileSinkConfig { /// Object store URL, used to get an ObjectStore instance pub object_store_url: ObjectStoreUrl, @@ -94,12 +93,14 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, - /// If false, it is assumed there is a single table_path which is a file to which all data should be written + /// If true, it is assumed there is a single table_path which is a file to which all data should be written /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. - pub per_thread_output: bool, + pub single_file_output: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, + /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size + pub file_type_writer_options: FileTypeWriterOptions, } impl FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 599817a73828..952ce4096a25 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -38,6 +38,7 @@ use crate::logical_expr::{ Repartition, Union, UserDefinedLogicalNode, }; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::file_options::FileTypeWriterOptions; use datafusion_common::FileType; use datafusion_expr::dml::CopyTo; @@ -557,19 +558,24 @@ impl DefaultPhysicalPlanner { input, output_url, file_format, - per_thread_output, - options: _, + single_file_output, + statement_options, }) => { let input_exec = self.create_initial_plan(input, session_state).await?; // TODO: make this behavior configurable via options (should copy to create path/file as needed?) // TODO: add additional configurable options for if existing files should be overwritten or // appended to - let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, *per_thread_output)?; + let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?; let object_store_url = parsed_url.object_store(); let schema: Schema = (**input.schema()).clone().into(); + let file_type_writer_options = FileTypeWriterOptions::build( + file_format, + session_state.config_options(), + statement_options)?; + // Set file sink related options let config = FileSinkConfig { object_store_url, @@ -578,12 +584,11 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols: vec![], writer_mode: FileWriterMode::PutMultipart, - per_thread_output: *per_thread_output, + single_file_output: *single_file_output, overwrite: false, + file_type_writer_options }; - // TODO: implement statement level overrides for each file type - // E.g. CsvFormat::from_options(options) let sink_format: Arc = match file_format { FileType::CSV => Arc::new(CsvFormat::default()), FileType::PARQUET => Arc::new(ParquetFormat::default()), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ccbf1af5e70e..244a957979bc 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -41,6 +41,7 @@ use crate::{ Expr, ExprSchemable, TableSource, }; use arrow::datatypes::{DataType, Schema, SchemaRef}; +use datafusion_common::file_options::StatementOptions; use datafusion_common::plan_err; use datafusion_common::UnnestOptions; use datafusion_common::{ @@ -238,15 +239,15 @@ impl LogicalPlanBuilder { input: LogicalPlan, output_url: String, file_format: FileType, - per_thread_output: bool, - options: Vec<(String, String)>, + single_file_output: bool, + statement_options: StatementOptions, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, file_format, - per_thread_output, - options, + single_file_output, + statement_options, }))) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index c5243d830565..501f2eeba146 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -20,7 +20,9 @@ use std::{ sync::Arc, }; -use datafusion_common::{DFSchemaRef, FileType, OwnedTableReference}; +use datafusion_common::{ + file_options::StatementOptions, DFSchemaRef, FileType, OwnedTableReference, +}; use crate::LogicalPlan; @@ -36,9 +38,9 @@ pub struct CopyTo { /// If false, it is assumed output_url is a file to which all data should be written /// regardless of input partitioning. Otherwise, output_url is assumed to be a directory /// to which each output partition is written to its own output file - pub per_thread_output: bool, + pub single_file_output: bool, /// Arbitrary options as tuples - pub options: Vec<(String, String)>, + pub statement_options: StatementOptions, } /// The operator that modifies the content of a database (adapted from diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index bfb4f662d7e7..26a15012dd35 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1117,16 +1117,18 @@ impl LogicalPlan { input: _, output_url, file_format, - per_thread_output, - options, + single_file_output, + statement_options, }) => { - let op_str = options + let op_str = statement_options + .clone() + .into_inner() .iter() .map(|(k, v)| format!("{k} {v}")) .collect::>() .join(", "); - write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: ({op_str})") + write!(f, "CopyTo: format={file_format} output_url={output_url} single_file_output={single_file_output} options: ({op_str})") } LogicalPlan::Ddl(ddl) => { write!(f, "{}", ddl.display()) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 88fd9fa6b7ef..28b0d2985345 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -750,14 +750,14 @@ pub fn from_plan( input: _, output_url, file_format, - per_thread_output, - options, + single_file_output, + statement_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs[0].clone()), output_url: output_url.clone(), file_format: file_format.clone(), - per_thread_output: *per_thread_output, - options: options.clone(), + single_file_output: *single_file_output, + statement_options: statement_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => Ok(LogicalPlan::Values(Values { schema: schema.clone(), diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index ff7d74de1c09..d3b18fbfadbf 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -46,9 +46,8 @@ use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec}; use datafusion::physical_plan::{ udaf, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr, }; -use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, FileCompressionType, Result, -}; +use datafusion_common::FileCompressionType; +use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use prost::bytes::BufMut; use prost::Message; diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 867825ec9ae4..a86dea9dab65 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -25,10 +25,11 @@ use crate::planner::{ use crate::utils::normalize_ident; use arrow_schema::DataType; +use datafusion_common::file_options::StatementOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, unqualified_field_not_found, Column, Constraints, DFField, DFSchema, - DFSchemaRef, DataFusionError, ExprSchema, FileType, OwnedTableReference, Result, + DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference, TableReference, ToDFSchema, }; use datafusion_expr::dml::CopyTo; @@ -56,8 +57,6 @@ use sqlparser::parser::ParserError::ParserError; use datafusion_common::plan_err; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::path::Path; -use std::str::FromStr; use std::sync::Arc; fn ident_to_string(ident: &Ident) -> String { @@ -582,50 +581,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } }; - // convert options to lowercase strings, check for explicitly set "format" option - let mut options = vec![]; - let mut explicit_format = None; - // default behavior is to assume the user is specifying a single file to which - // we should output all data regardless of input partitioning. - let mut per_thread_output: bool = false; - for (key, value) in statement.options { - let (k, v) = (key.to_lowercase(), value.to_string().to_lowercase()); - // check for options important to planning - if k == "format" { - explicit_format = Some(FileType::from_str(&v)?); - } - if k == "per_thread_output" { - per_thread_output = match v.as_str(){ - "true" => true, - "false" => false, - _ => return Err(DataFusionError::Plan( - format!("Copy to option 'per_thread_output' must be true or false, got {value}") - )) - } - } - options.push((k, v)); - } - let format = match explicit_format { - Some(file_format) => file_format, - None => { - // try to infer file format from file extension - let extension: &str = &Path::new(&statement.target) - .extension() - .ok_or( - DataFusionError::Plan("Copy To format not explicitly set and unable to get file extension!".to_string()))? - .to_str() - .ok_or(DataFusionError::Plan("Copy to format not explicitly set and failed to parse file extension!".to_string()))? - .to_lowercase(); - - FileType::from_str(extension)? - } - }; + // TODO, parse options as Vec<(String, String)> to avoid this conversion + let options = statement + .options + .iter() + .map(|(s, v)| (s.to_owned(), v.to_string())) + .collect::>(); + + let mut statement_options = StatementOptions::new(options); + let file_format = statement_options.try_infer_file_type(&statement.target)?; + let single_file_output = + statement_options.get_bool_option("single_file_output")?; + + // COPY defaults to outputting a single file if not otherwise specified + let single_file_output = single_file_output.unwrap_or(true); + Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url: statement.target, - file_format: format, - per_thread_output, - options, + file_format, + single_file_output, + statement_options, })) } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index c315266b3cd8..8817ed9533a7 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -329,7 +329,7 @@ fn plan_rollback_transaction_chained() { fn plan_copy_to() { let sql = "COPY test_decimal to 'output.csv'"; let plan = r#" -CopyTo: format=csv output_url=output.csv per_thread_output=false options: () +CopyTo: format=csv output_url=output.csv single_file_output=true options: () TableScan: test_decimal "# .trim(); @@ -341,7 +341,7 @@ fn plan_explain_copy_to() { let sql = "EXPLAIN COPY test_decimal to 'output.csv'"; let plan = r#" Explain - CopyTo: format=csv output_url=output.csv per_thread_output=false options: () + CopyTo: format=csv output_url=output.csv single_file_output=true options: () TableScan: test_decimal "# .trim(); @@ -352,7 +352,7 @@ Explain fn plan_copy_to_query() { let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'"; let plan = r#" -CopyTo: format=csv output_url=output.csv per_thread_output=false options: () +CopyTo: format=csv output_url=output.csv single_file_output=true options: () Limit: skip=0, fetch=10 Projection: test_decimal.id, test_decimal.price TableScan: test_decimal diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index a44a662ada23..996de3a386b6 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -21,27 +21,32 @@ create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, # Copy to directory as multiple files query IT -COPY source_table TO 'test_files/scratch/table' (format parquet, per_thread_output true); +COPY source_table TO 'test_files/scratch/table' (format parquet, single_file_output false, compression 'zstd(10)'); ---- 2 -# Error case -query error DataFusion error: Error during planning: Copy To format not explicitly set and unable to get file extension! -EXPLAIN COPY source_table to 'test_files/scratch/table' - query TT -EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet, per_thread_output true) +EXPLAIN COPY source_table TO 'test_files/scratch/table' (format parquet, single_file_output false, compression 'zstd(10)'); ---- logical_plan -CopyTo: format=parquet output_url=test_files/scratch/table per_thread_output=true options: (format parquet, per_thread_output true) +CopyTo: format=parquet output_url=test_files/scratch/table single_file_output=false options: (compression 'zstd(10)') --TableScan: source_table projection=[col1, col2] physical_plan InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +# Error case +query error DataFusion error: Invalid Option: Format not explicitly set and unable to get file extension! +EXPLAIN COPY source_table to 'test_files/scratch/table' + +query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: query"\) +EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet, single_file_output false) +query TT +EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet, per_thread_output true) + # Copy more files to directory via query query IT -COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/table' (format parquet, per_thread_output true); +COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/table' (format parquet, single_file_output false); ---- 4 @@ -75,15 +80,31 @@ select * from validate_parquet_single; 1 Foo 2 Bar -# copy from table to folder of csv files +# copy from table to folder of compressed json files +query IT +COPY source_table to 'test_files/scratch/table_json_gz' (format json, single_file_output false, compression 'gzip'); +---- +2 + +# validate folder of csv files +statement ok +CREATE EXTERNAL TABLE validate_json_gz STORED AS json COMPRESSION TYPE gzip LOCATION 'test_files/scratch/table_json_gz'; + +query IT +select * from validate_json_gz; +---- +1 Foo +2 Bar + +# copy from table to folder of compressed csv files query IT -COPY source_table to 'test_files/scratch/table_csv' (format csv, per_thread_output true); +COPY source_table to 'test_files/scratch/table_csv' (format csv, single_file_output false, header false, compression 'gzip'); ---- 2 # validate folder of csv files statement ok -CREATE EXTERNAL TABLE validate_csv STORED AS csv WITH HEADER ROW LOCATION 'test_files/scratch/table_csv'; +CREATE EXTERNAL TABLE validate_csv STORED AS csv COMPRESSION TYPE gzip LOCATION 'test_files/scratch/table_csv'; query IT select * from validate_csv; @@ -109,7 +130,7 @@ select * from validate_single_csv; # Copy from table to folder of json query IT -COPY source_table to 'test_files/scratch/table_json' (format json, per_thread_output true); +COPY source_table to 'test_files/scratch/table_json' (format json, single_file_output false); ---- 2 @@ -139,20 +160,11 @@ select * from validate_single_json; 1 Foo 2 Bar +# Error cases: + # Copy from table with options -query IT +query error DataFusion error: Invalid Option: Found unsupported option row_group_size with value 55 for JSON format! COPY source_table to 'test_files/scratch/table.json' (row_group_size 55); ----- -2 - -# Copy from table with options (and trailing comma) -query IT -COPY source_table to 'test_files/scratch/table.json' (row_group_size 55, row_group_limit_bytes 9,); ----- -2 - - -# Error cases: # Incomplete statement query error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\) From 53dbf1d6072d33fe406b72b185b70fdb99b2556a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 23 Aug 2023 16:09:50 -0400 Subject: [PATCH 2/7] cargo fmt --- datafusion/common/src/file_options/mod.rs | 1 - datafusion/common/src/lib.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 016522afe7ff..7105deabd76b 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -134,7 +134,6 @@ impl StatementOptions { } } - /// This type contains all options needed to initialize a particular /// RecordBatchWriter type. Each element in the enum contains a thin wrapper /// around a "writer builder" type (e.g. arrow::csv::WriterBuilder) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index b449fbe6b285..a5c79b449734 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -46,12 +46,12 @@ pub use error::{ SharedResult, }; -pub use file_options::FileTypeWriterOptions; pub use file_options::file_type::{ FileCompressionType, FileType, GetExt, DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION, DEFAULT_JSON_EXTENSION, - DEFAULT_PARQUET_EXTENSION + DEFAULT_PARQUET_EXTENSION, }; +pub use file_options::FileTypeWriterOptions; pub use functional_dependencies::{ aggregate_functional_dependencies, get_target_functional_dependencies, Constraints, Dependency, FunctionalDependence, FunctionalDependencies, From 309182f618ff815fbf9059338f9bfcc617871c79 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 23 Aug 2023 16:43:40 -0400 Subject: [PATCH 3/7] try to fix pyarrow compile failure --- datafusion/common/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 99a171d72e98..9a7605d885c8 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" [features] avro = ["apache-avro"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] -default = ["compression"] +default = ["compression", "parquet"] pyarrow = ["pyo3", "arrow/pyarrow"] [dependencies] From c0b358e188f759cc2ea705b49fbfbf1efecacec8 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sat, 26 Aug 2023 06:30:39 -0400 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Andrew Lamb --- datafusion/common/src/file_options/arrow_writer.rs | 2 +- datafusion/common/src/file_options/csv_writer.rs | 3 ++- datafusion/common/src/file_options/mod.rs | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/file_options/arrow_writer.rs b/datafusion/common/src/file_options/arrow_writer.rs index a7a5ad51d0d5..a30e6d800e20 100644 --- a/datafusion/common/src/file_options/arrow_writer.rs +++ b/datafusion/common/src/file_options/arrow_writer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Options related to how json files should be written +//! Options related to how Arrow files should be written use crate::{ config::ConfigOptions, diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index 8e1077978cb8..aaa3e8191ea9 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -37,12 +37,13 @@ pub struct CsvWriterOptions { /// Compression to apply after ArrowWriter serializes RecordBatches. /// This compression is applied by DataFusion not the ArrowWriter itself. pub compression: CompressionTypeVariant, - /// Indicates weather WriterBuilder.has_header() is set to true. + /// Indicates whether WriterBuilder.has_header() is set to true. /// This is duplicative as WriterBuilder also stores this information. /// However, WriterBuilder does not allow public read access to the /// has_header parameter. pub has_header: bool, // TODO: expose a way to read has_header in arrow create + // https://github.com/apache/arrow-rs/issues/4735 } impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 7105deabd76b..63622e41db4a 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -75,7 +75,7 @@ impl StatementOptions { /// Scans for option and if it exists removes it and attempts to parse as a boolean /// Returns none if it does not exist. - pub fn get_bool_option(&mut self, find: &str) -> Result> { + pub fn take_bool_option(&mut self, find: &str) -> Result> { let maybe_option = self.scan_and_remove_option(find); maybe_option .map(|(_, v)| parse_boolean_string(find, v)) @@ -214,7 +214,7 @@ impl FileTypeWriterOptions { match self { FileTypeWriterOptions::Parquet(opt) => Ok(opt), _ => Err(DataFusionError::Internal( - "Expected parquet options but found options for a different FileType!" + "Expected parquet options but found options for: {}", self.name() .into(), )), } From 7be48144b3cbf897744fe5bae1a8c2f2169e63ac Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 28 Aug 2023 08:46:50 -0400 Subject: [PATCH 5/7] review comments + add test cases --- datafusion/common/src/error.rs | 8 +-- .../common/src/file_options/csv_writer.rs | 21 +++++- .../common/src/file_options/json_writer.rs | 2 +- datafusion/common/src/file_options/mod.rs | 65 +++++++++++------- .../common/src/file_options/parquet_writer.rs | 66 ++++++++++--------- .../common/src/file_options/parse_utils.rs | 16 ++--- datafusion/common/src/parsers.rs | 2 +- .../core/src/datasource/listing/table.rs | 2 +- .../src/datasource/listing_table_factory.rs | 8 +-- .../core/src/datasource/physical_plan/csv.rs | 2 - datafusion/sql/src/statement.rs | 2 +- datafusion/sqllogictest/test_files/copy.slt | 66 ++++++++++++++++++- 12 files changed, 180 insertions(+), 80 deletions(-) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 73f469739af9..140522d6603b 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -75,7 +75,7 @@ pub enum DataFusionError { Plan(String), /// This error happens when an invalid or unsupported option is passed /// in a SQL statement - InvalidOption(String), + Configuration(String), /// This error happens with schema-related errors, such as schema inference not possible /// and non-unique column names. SchemaError(SchemaError), @@ -291,8 +291,8 @@ impl Display for DataFusionError { DataFusionError::SQL(ref desc) => { write!(f, "SQL error: {desc:?}") } - DataFusionError::InvalidOption(ref desc) => { - write!(f, "Invalid Option: {desc}") + DataFusionError::Configuration(ref desc) => { + write!(f, "Invalid or Unsupported Configuration: {desc}") } DataFusionError::NotImplemented(ref desc) => { write!(f, "This feature is not implemented: {desc}") @@ -344,7 +344,7 @@ impl Error for DataFusionError { DataFusionError::SQL(e) => Some(e), DataFusionError::NotImplemented(_) => None, DataFusionError::Internal(_) => None, - DataFusionError::InvalidOption(_) => None, + DataFusionError::Configuration(_) => None, DataFusionError::Plan(_) => None, DataFusionError::SchemaError(e) => Some(e), DataFusionError::Execution(_) => None, diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index aaa3e8191ea9..ebf177fdce98 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -59,7 +59,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { builder = match option.to_lowercase().as_str(){ "header" => { has_header = value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?; + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?; builder.has_headers(has_header) }, "date_format" => builder.with_date_format(value.to_owned()), @@ -68,7 +68,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { "time_format" => builder.with_time_format(value.to_owned()), "rfc3339" => { let value_bool = value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?; + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?; if value_bool{ builder.with_rfc3339() } else{ @@ -80,7 +80,22 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions { compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?; builder }, - _ => return Err(DataFusionError::InvalidOption(format!("Found unsupported option {option} with value {value} for CSV format!"))) + "delimeter" => { + // Ignore string literal single quotes passed from sql parsing + let value = value.replace('\'', ""); + let chars: Vec = value.chars().collect(); + if chars.len()>1{ + return Err(DataFusionError::Configuration(format!( + "CSV Delimeter Option must be a single char, got: {}", value + ))) + } + builder.with_delimiter(chars[0].try_into().map_err(|_| { + DataFusionError::Internal( + "Unable to convert CSV delimiter into u8".into(), + ) + })?) + }, + _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for CSV format!"))) } } Ok(CsvWriterOptions { diff --git a/datafusion/common/src/file_options/json_writer.rs b/datafusion/common/src/file_options/json_writer.rs index a788ad26d0a3..b3ea76b6510a 100644 --- a/datafusion/common/src/file_options/json_writer.rs +++ b/datafusion/common/src/file_options/json_writer.rs @@ -45,7 +45,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for JsonWriterOptions { "compression" => { compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?; }, - _ => return Err(DataFusionError::InvalidOption(format!("Found unsupported option {option} with value {value} for JSON format!"))) + _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for JSON format!"))) } } Ok(JsonWriterOptions { compression }) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 63622e41db4a..f02129a05795 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -25,7 +25,12 @@ pub mod json_writer; pub mod parquet_writer; pub(crate) mod parse_utils; -use std::{collections::HashMap, path::Path, str::FromStr}; +use std::{ + collections::HashMap, + fmt::{self, Display}, + path::Path, + str::FromStr, +}; use crate::{ config::ConfigOptions, file_options::parse_utils::parse_boolean_string, @@ -84,7 +89,7 @@ impl StatementOptions { /// Scans for option and if it exists removes it and returns it /// Returns none if it does not exist - pub fn get_str_option(&mut self, find: &str) -> Option { + pub fn take_str_option(&mut self, find: &str) -> Option { let maybe_option = self.scan_and_remove_option(find); maybe_option.map(|(_, v)| v) } @@ -102,12 +107,12 @@ impl StatementOptions { // try to infer file format from file extension let extension: &str = &Path::new(target) .extension() - .ok_or(DataFusionError::InvalidOption( + .ok_or(DataFusionError::Configuration( "Format not explicitly set and unable to get file extension!" .to_string(), ))? .to_str() - .ok_or(DataFusionError::InvalidOption( + .ok_or(DataFusionError::Configuration( "Format not explicitly set and failed to parse file extension!" .to_string(), ))? @@ -213,10 +218,10 @@ impl FileTypeWriterOptions { pub fn try_into_parquet(&self) -> Result<&ParquetWriterOptions> { match self { FileTypeWriterOptions::Parquet(opt) => Ok(opt), - _ => Err(DataFusionError::Internal( - "Expected parquet options but found options for: {}", self.name() - .into(), - )), + _ => Err(DataFusionError::Internal(format!( + "Expected parquet options but found options for: {}", + self + ))), } } @@ -225,9 +230,10 @@ impl FileTypeWriterOptions { pub fn try_into_csv(&self) -> Result<&CsvWriterOptions> { match self { FileTypeWriterOptions::CSV(opt) => Ok(opt), - _ => Err(DataFusionError::Internal( - "Expected csv options but found options for a different FileType!".into(), - )), + _ => Err(DataFusionError::Internal(format!( + "Expected csv options but found options for {}", + self + ))), } } @@ -236,10 +242,10 @@ impl FileTypeWriterOptions { pub fn try_into_json(&self) -> Result<&JsonWriterOptions> { match self { FileTypeWriterOptions::JSON(opt) => Ok(opt), - _ => Err(DataFusionError::Internal( - "Expected json options but found options for a different FileType!" - .into(), - )), + _ => Err(DataFusionError::Internal(format!( + "Expected json options but found options for {}", + self, + ))), } } @@ -248,10 +254,10 @@ impl FileTypeWriterOptions { pub fn try_into_avro(&self) -> Result<&AvroWriterOptions> { match self { FileTypeWriterOptions::Avro(opt) => Ok(opt), - _ => Err(DataFusionError::Internal( - "Expected avro options but found options for a different FileType!" - .into(), - )), + _ => Err(DataFusionError::Internal(format!( + "Expected avro options but found options for {}!", + self + ))), } } @@ -260,10 +266,23 @@ impl FileTypeWriterOptions { pub fn try_into_arrow(&self) -> Result<&ArrowWriterOptions> { match self { FileTypeWriterOptions::Arrow(opt) => Ok(opt), - _ => Err(DataFusionError::Internal( - "Expected arrow options but found options for a different FileType!" - .into(), - )), + _ => Err(DataFusionError::Internal(format!( + "Expected arrow options but found options for {}", + self + ))), } } } + +impl Display for FileTypeWriterOptions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = match self { + FileTypeWriterOptions::Arrow(_) => "Arrow", + FileTypeWriterOptions::Avro(_) => "Avro", + FileTypeWriterOptions::CSV(_) => "CSV", + FileTypeWriterOptions::JSON(_) => "JSON", + FileTypeWriterOptions::Parquet(_) => "Parquet", + }; + write!(f, "{}", name) + } +} diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a97e218ba1c2..ea3276b062ac 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -45,53 +45,59 @@ impl ParquetWriterOptions { /// Constructs a default Parquet WriterPropertiesBuilder using /// Session level ConfigOptions to initialize settings fn default_builder(options: &ConfigOptions) -> Result { - let parquet_context = &options.execution.parquet; + let parquet_session_options = &options.execution.parquet; let mut builder = WriterProperties::builder() - .set_data_page_size_limit(parquet_context.data_pagesize_limit) - .set_write_batch_size(parquet_context.write_batch_size) - .set_writer_version(parse_version_string(&parquet_context.writer_version)?) - .set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit) - .set_max_row_group_size(parquet_context.max_row_group_size) - .set_created_by(parquet_context.created_by.clone()) - .set_column_index_truncate_length(parquet_context.column_index_truncate_length) - .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) - .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled); - - builder = match &parquet_context.encoding { + .set_data_page_size_limit(parquet_session_options.data_pagesize_limit) + .set_write_batch_size(parquet_session_options.write_batch_size) + .set_writer_version(parse_version_string( + &parquet_session_options.writer_version, + )?) + .set_dictionary_page_size_limit( + parquet_session_options.dictionary_page_size_limit, + ) + .set_max_row_group_size(parquet_session_options.max_row_group_size) + .set_created_by(parquet_session_options.created_by.clone()) + .set_column_index_truncate_length( + parquet_session_options.column_index_truncate_length, + ) + .set_data_page_row_count_limit(parquet_session_options.data_page_row_count_limit) + .set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled); + + builder = match &parquet_session_options.encoding { Some(encoding) => builder.set_encoding(parse_encoding_string(encoding)?), None => builder, }; - builder = match &parquet_context.dictionary_enabled { + builder = match &parquet_session_options.dictionary_enabled { Some(enabled) => builder.set_dictionary_enabled(*enabled), None => builder, }; - builder = match &parquet_context.compression { + builder = match &parquet_session_options.compression { Some(compression) => { builder.set_compression(parse_compression_string(compression)?) } None => builder, }; - builder = match &parquet_context.statistics_enabled { + builder = match &parquet_session_options.statistics_enabled { Some(statistics) => { builder.set_statistics_enabled(parse_statistics_string(statistics)?) } None => builder, }; - builder = match &parquet_context.max_statistics_size { + builder = match &parquet_session_options.max_statistics_size { Some(size) => builder.set_max_statistics_size(*size), None => builder, }; - builder = match &parquet_context.bloom_filter_fpp { + builder = match &parquet_session_options.bloom_filter_fpp { Some(fpp) => builder.set_bloom_filter_fpp(*fpp), None => builder, }; - builder = match &parquet_context.bloom_filter_ndv { + builder = match &parquet_session_options.bloom_filter_ndv { Some(ndv) => builder.set_bloom_filter_ndv(*ndv), None => builder, }; @@ -112,48 +118,48 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { builder = match option.to_lowercase().as_str(){ "max_row_group_size" => builder .set_max_row_group_size(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as u64 as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?), "data_pagesize_limit" => builder .set_data_page_size_limit(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), "write_batch_size" => builder .set_write_batch_size(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), "writer_version" => builder .set_writer_version(parse_version_string(value)?), "dictionary_page_size_limit" => builder .set_dictionary_page_size_limit(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), "created_by" => builder .set_created_by(value.to_owned()), "column_index_truncate_length" => builder .set_column_index_truncate_length(Some(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?)), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?)), "data_page_row_count_limit" => builder .set_data_page_row_count_limit(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), "bloom_filter_enabled" => builder .set_bloom_filter_enabled(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), "encoding" => builder .set_encoding(parse_encoding_string(value)?), "dictionary_enabled" => builder .set_dictionary_enabled(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as bool as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), "compression" => builder .set_compression(parse_compression_string(value)?), "statistics_enabled" => builder .set_statistics_enabled(parse_statistics_string(value)?), "max_statistics_size" => builder .set_max_statistics_size(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as usize as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), "bloom_filter_fpp" => builder .set_bloom_filter_fpp(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as f64 as required for {option}!")))?), + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?), "bloom_filter_ndv" => builder .set_bloom_filter_ndv(value.parse() - .map_err(|_| DataFusionError::InvalidOption(format!("Unable to parse {value} as u64 as required for {option}!")))?), - _ => return Err(DataFusionError::InvalidOption(format!("Found unsupported option {option} with value {value} for Parquet format!"))) + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?), + _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!"))) } } Ok(ParquetWriterOptions { diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs index 6af679be08e1..da8d31436b9e 100644 --- a/datafusion/common/src/file_options/parse_utils.rs +++ b/datafusion/common/src/file_options/parse_utils.rs @@ -29,7 +29,7 @@ pub(crate) fn parse_boolean_string(option: &str, value: String) -> Result match value.to_lowercase().as_str() { "true" => Ok(true), "false" => Ok(false), - _ => Err(DataFusionError::InvalidOption(format!( + _ => Err(DataFusionError::Configuration(format!( "Unsupported value {value} for option {option}! \ Valid values are true or false!" ))), @@ -53,7 +53,7 @@ pub(crate) fn parse_encoding_string( "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), - _ => Err(DataFusionError::InvalidOption(format!( + _ => Err(DataFusionError::Configuration(format!( "Unknown or unsupported parquet encoding: \ {str_setting}. Valid values are: plain, plain_dictionary, rle, \ /// bit_packed, delta_binary_packed, delta_length_byte_array, \ @@ -72,7 +72,7 @@ fn split_compression_string(str_setting: &str) -> Result<(String, Option)> match split_setting { Some((codec, rh)) => { let level = &rh[..rh.len() - 1].parse::().map_err(|_| { - DataFusionError::InvalidOption(format!( + DataFusionError::Configuration(format!( "Could not parse compression string. \ Got codec: {} and unknown level from {}", codec, str_setting @@ -88,7 +88,7 @@ fn split_compression_string(str_setting: &str) -> Result<(String, Option)> /// don't have one set. E.g. snappy(2) is invalid. fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { if level.is_some() { - return Err(DataFusionError::InvalidOption(format!( + return Err(DataFusionError::Configuration(format!( "Compression {codec} does not support specifying a level" ))); } @@ -98,7 +98,7 @@ fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { /// Helper to ensure compression codecs which require a level /// do have one set. E.g. zstd is invalid, zstd(3) is valid fn require_level(codec: &str, level: Option) -> Result { - level.ok_or(DataFusionError::InvalidOption(format!( + level.ok_or(DataFusionError::Configuration(format!( "{codec} compression requires specifying a level such as {codec}(4)" ))) } @@ -149,7 +149,7 @@ pub(crate) fn parse_compression_string( check_level_is_none(codec, &level)?; Ok(parquet::basic::Compression::LZ4_RAW) } - _ => Err(DataFusionError::InvalidOption(format!( + _ => Err(DataFusionError::Configuration(format!( "Unknown or unsupported parquet compression: \ {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ lzo, brotli(level), lz4, zstd(level), and lz4_raw." @@ -162,7 +162,7 @@ pub(crate) fn parse_version_string(str_setting: &str) -> Result { match str_setting_lower { "1.0" => Ok(WriterVersion::PARQUET_1_0), "2.0" => Ok(WriterVersion::PARQUET_2_0), - _ => Err(DataFusionError::InvalidOption(format!( + _ => Err(DataFusionError::Configuration(format!( "Unknown or unsupported parquet writer version {str_setting} \ valid options are '1.0' and '2.0'" ))), @@ -175,7 +175,7 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result Ok(EnabledStatistics::None), "chunk" => Ok(EnabledStatistics::Chunk), "page" => Ok(EnabledStatistics::Page), - _ => Err(DataFusionError::InvalidOption(format!( + _ => Err(DataFusionError::Configuration(format!( "Unknown or unsupported parquet statistics setting {str_setting} \ valid options are 'none', 'page', and 'chunk'" ))), diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs index 58f4db751c4c..ea2508f8c455 100644 --- a/datafusion/common/src/parsers.rs +++ b/datafusion/common/src/parsers.rs @@ -46,7 +46,7 @@ impl FromStr for CompressionTypeVariant { "BZIP2" | "BZ2" => Ok(Self::BZIP2), "XZ" => Ok(Self::XZ), "ZST" | "ZSTD" => Ok(Self::ZSTD), - "" => Ok(Self::UNCOMPRESSED), + "" | "UNCOMPRESSED" => Ok(Self::UNCOMPRESSED), _ => Err(ParserError::ParserError(format!( "Unsupported file compression type {s}" ))), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e46882600015..7740bff2109b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1862,7 +1862,7 @@ mod tests { ) .await .expect_err("Example should fail!"); - assert_eq!("Invalid Option: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); + assert_eq!("Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); Ok(()) } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index bd57537dd0ed..a788baf80fee 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -138,15 +138,15 @@ impl TableProviderFactory for ListingTableFactory { // Extract ListingTable specific options if present or set default // Discard unbounded option if present - statement_options.get_str_option("unbounded"); + statement_options.take_str_option("unbounded"); let create_local_path = statement_options - .get_bool_option("create_local_path")? + .take_bool_option("create_local_path")? .unwrap_or(false); let single_file = statement_options - .get_bool_option("single_file")? + .take_bool_option("single_file")? .unwrap_or(false); - let explicit_insert_mode = statement_options.get_str_option("insert_mode"); + let explicit_insert_mode = statement_options.take_str_option("insert_mode"); let insert_mode = match explicit_insert_mode { Some(mode) => ListingTableInsertMode::from_str(mode.as_str()), None => match file_type { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 14af4bc63682..8a9822c8ca97 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -1189,8 +1189,6 @@ mod tests { Field::new("c2", DataType::UInt64, false), ])); - println!("{out_dir}"); - // get name of first part let paths = fs::read_dir(&out_dir).unwrap(); let mut part_0_name: String = "".to_owned(); diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index d1e0154ef73f..655442d7e353 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -612,7 +612,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut statement_options = StatementOptions::new(options); let file_format = statement_options.try_infer_file_type(&statement.target)?; let single_file_output = - statement_options.get_bool_option("single_file_output")?; + statement_options.take_bool_option("single_file_output")?; // COPY defaults to outputting a single file if not otherwise specified let single_file_output = single_file_output.unwrap_or(true); diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 6db30445e3e2..f095552dadf2 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -36,7 +36,7 @@ InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[]) --MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] # Error case -query error DataFusion error: Invalid Option: Format not explicitly set and unable to get file extension! +query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension! EXPLAIN COPY source_table to 'test_files/scratch/copy/table' query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: query"\) @@ -64,6 +64,42 @@ select * from validate_parquet; 1 Foo 2 Bar +# Copy parquet with all supported statment overrides +query IT +COPY source_table +TO 'test_files/scratch/copy/table_with_options' +(format parquet, +single_file_output false, +compression 'snappy', +max_row_group_size 12345, +data_pagesize_limit 1234, +write_batch_size 1234, +writer_version 2.0, +dictionary_page_size_limit 123, +created_by 'DF copy.slt', +column_index_truncate_length 123, +data_page_row_count_limit 1234, +bloom_filter_enabled true, +encoding plain, +dictionary_enabled false, +statistics_enabled page, +max_statistics_size 123, +bloom_filter_fpp 0.001, +bloom_filter_ndv 100 +) +---- +2 + +# validate multiple parquet file output with all options set +statement ok +CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/'; + +query IT +select * from validate_parquet_with_options; +---- +1 Foo +2 Bar + # Copy from table to single file query IT COPY source_table to 'test_files/scratch/copy/table.parquet'; @@ -160,10 +196,36 @@ select * from validate_single_json; 1 Foo 2 Bar +# COPY csv files with all options set +query IT +COPY source_table +to 'test_files/scratch/copy/table_csv_with_options' +(format csv, +single_file_output false, +header false, +compression 'uncompressed', +datetime_format '%FT%H:%M:%S.%9f', +delimeter ';', +null_value 'NULLVAL'); +---- +2 + +# Validate single csv output +statement ok +CREATE EXTERNAL TABLE validate_csv_with_options +STORED AS csv +LOCATION 'test_files/scratch/copy/table_csv_with_options'; + +query T +select * from validate_csv_with_options; +---- +1;Foo +2;Bar + # Error cases: # Copy from table with options -query error DataFusion error: Invalid Option: Found unsupported option row_group_size with value 55 for JSON format! +query error DataFusion error: Invalid or Unsupported Configuration: Found unsupported option row_group_size with value 55 for JSON format! COPY source_table to 'test_files/scratch/copy/table.json' (row_group_size 55); # Incomplete statement From fcdb3d43f3c2fe96caec7401dfd8ce62148986f2 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 28 Aug 2023 08:57:11 -0400 Subject: [PATCH 6/7] update Display for FileTypeWriterOptions --- datafusion/common/src/file_options/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index f02129a05795..6181aaade594 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -277,11 +277,11 @@ impl FileTypeWriterOptions { impl Display for FileTypeWriterOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let name = match self { - FileTypeWriterOptions::Arrow(_) => "Arrow", - FileTypeWriterOptions::Avro(_) => "Avro", - FileTypeWriterOptions::CSV(_) => "CSV", - FileTypeWriterOptions::JSON(_) => "JSON", - FileTypeWriterOptions::Parquet(_) => "Parquet", + FileTypeWriterOptions::Arrow(_) => "ArrowWriterOptions", + FileTypeWriterOptions::Avro(_) => "AvroWriterOptions", + FileTypeWriterOptions::CSV(_) => "CsvWriterOptions", + FileTypeWriterOptions::JSON(_) => "JsonWriterOptions", + FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions", }; write!(f, "{}", name) } From e2581aaa5a07b41476ec091f6ebfa3a94f28dbed Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Mon, 28 Aug 2023 17:00:47 -0400 Subject: [PATCH 7/7] add unit tests --- datafusion/common/src/file_options/mod.rs | 132 ++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 6181aaade594..29ee73f80fc6 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -286,3 +286,135 @@ impl Display for FileTypeWriterOptions { write!(f, "{}", name) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use parquet::{ + basic::{Compression, Encoding, ZstdLevel}, + file::properties::{EnabledStatistics, WriterVersion}, + schema::types::ColumnPath, + }; + + use crate::{ + config::ConfigOptions, + file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, + parsers::CompressionTypeVariant, + }; + + use crate::Result; + + use super::{parquet_writer::ParquetWriterOptions, StatementOptions}; + + #[test] + fn test_writeroptions_parquet_from_statement_options() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + option_map.insert("max_row_group_size".to_owned(), "123".to_owned()); + option_map.insert("data_pagesize_limit".to_owned(), "123".to_owned()); + option_map.insert("write_batch_size".to_owned(), "123".to_owned()); + option_map.insert("writer_version".to_owned(), "2.0".to_owned()); + option_map.insert("dictionary_page_size_limit".to_owned(), "123".to_owned()); + option_map.insert("created_by".to_owned(), "df write unit test".to_owned()); + option_map.insert("column_index_truncate_length".to_owned(), "123".to_owned()); + option_map.insert("data_page_row_count_limit".to_owned(), "123".to_owned()); + option_map.insert("bloom_filter_enabled".to_owned(), "true".to_owned()); + option_map.insert("encoding".to_owned(), "plain".to_owned()); + option_map.insert("dictionary_enabled".to_owned(), "true".to_owned()); + option_map.insert("compression".to_owned(), "zstd(4)".to_owned()); + option_map.insert("statistics_enabled".to_owned(), "page".to_owned()); + option_map.insert("bloom_filter_fpp".to_owned(), "0.123".to_owned()); + option_map.insert("bloom_filter_ndv".to_owned(), "123".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let parquet_options = ParquetWriterOptions::try_from((&config, &options))?; + let properties = parquet_options.writer_options(); + + // Verify the expected options propagated down to parquet crate WriterProperties struct + assert_eq!(properties.max_row_group_size(), 123); + assert_eq!(properties.data_page_size_limit(), 123); + assert_eq!(properties.write_batch_size(), 123); + assert_eq!(properties.writer_version(), WriterVersion::PARQUET_2_0); + assert_eq!(properties.dictionary_page_size_limit(), 123); + assert_eq!(properties.created_by(), "df write unit test"); + assert_eq!(properties.column_index_truncate_length(), Some(123)); + assert_eq!(properties.data_page_row_count_limit(), 123); + properties + .bloom_filter_properties(&ColumnPath::from("")) + .expect("expected bloom filter enabled"); + assert_eq!( + properties + .encoding(&ColumnPath::from("")) + .expect("expected default encoding"), + Encoding::PLAIN + ); + assert!(properties.dictionary_enabled(&ColumnPath::from(""))); + assert_eq!( + properties.compression(&ColumnPath::from("")), + Compression::ZSTD(ZstdLevel::try_new(4_i32)?) + ); + assert_eq!( + properties.statistics_enabled(&ColumnPath::from("")), + EnabledStatistics::Page + ); + assert_eq!( + properties + .bloom_filter_properties(&ColumnPath::from("")) + .expect("expected bloom properties!") + .fpp, + 0.123 + ); + assert_eq!( + properties + .bloom_filter_properties(&ColumnPath::from("")) + .expect("expected bloom properties!") + .ndv, + 123 + ); + + Ok(()) + } + + #[test] + fn test_writeroptions_csv_from_statement_options() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + option_map.insert("header".to_owned(), "true".to_owned()); + option_map.insert("date_format".to_owned(), "123".to_owned()); + option_map.insert("datetime_format".to_owned(), "123".to_owned()); + option_map.insert("timestamp_format".to_owned(), "2.0".to_owned()); + option_map.insert("time_format".to_owned(), "123".to_owned()); + option_map.insert("rfc3339".to_owned(), "true".to_owned()); + option_map.insert("null_value".to_owned(), "123".to_owned()); + option_map.insert("compression".to_owned(), "gzip".to_owned()); + option_map.insert("delimeter".to_owned(), ";".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let csv_options = CsvWriterOptions::try_from((&config, &options))?; + let builder = csv_options.writer_options; + let buff = Vec::new(); + let _properties = builder.build(buff); + assert!(csv_options.has_header); + assert_eq!(csv_options.compression, CompressionTypeVariant::GZIP); + // TODO expand unit test if csv::WriterBuilder allows public read access to properties + + Ok(()) + } + + #[test] + fn test_writeroptions_json_from_statement_options() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + option_map.insert("compression".to_owned(), "gzip".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let json_options = JsonWriterOptions::try_from((&config, &options))?; + assert_eq!(json_options.compression, CompressionTypeVariant::GZIP); + + Ok(()) + } +}