From 8c8e00f6950b2d7f034b4c25513a3f6a4b446a7a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sat, 2 Sep 2023 20:07:57 -0400 Subject: [PATCH 1/3] alt syntax for compression implemented --- .../common/src/file_options/parquet_writer.rs | 13 ++++++++++--- datafusion/common/src/file_options/parse_utils.rs | 13 +++++++++++++ datafusion/sqllogictest/test_files/copy.slt | 5 ++++- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index ea3276b062ac..b8d82d228906 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -28,7 +28,7 @@ use crate::{ DataFusionError, Result, }; -use super::StatementOptions; +use super::{parse_utils::split_option_and_column_path, StatementOptions}; /// Options for writing parquet files #[derive(Clone, Debug)] @@ -115,6 +115,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { let statement_options = configs_and_statement_options.1; let mut builder = default_builder(configs)?; for (option, value) in &statement_options.options { + let (option, col_path) = split_option_and_column_path(option); builder = match option.to_lowercase().as_str(){ "max_row_group_size" => builder .set_max_row_group_size(value.parse() @@ -146,8 +147,14 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { "dictionary_enabled" => builder .set_dictionary_enabled(value.parse() .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), - "compression" => builder - .set_compression(parse_compression_string(value)?), + "compression" => { + println!("Got {value} for {col_path:?}"); + let parsed_compression = parse_compression_string(value)?; + match col_path{ + Some(path) => builder.set_column_compression(path, parsed_compression), + None => builder.set_compression(parsed_compression) + } + }, "statistics_enabled" => builder .set_statistics_enabled(parse_statistics_string(value)?), "max_statistics_size" => builder diff --git a/datafusion/common/src/file_options/parse_utils.rs b/datafusion/common/src/file_options/parse_utils.rs index da8d31436b9e..5e47d1781772 100644 --- a/datafusion/common/src/file_options/parse_utils.rs +++ b/datafusion/common/src/file_options/parse_utils.rs @@ -20,6 +20,7 @@ use parquet::{ basic::{BrotliLevel, GzipLevel, ZstdLevel}, file::properties::{EnabledStatistics, WriterVersion}, + schema::types::ColumnPath, }; use crate::{DataFusionError, Result}; @@ -181,3 +182,15 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result (String, Option) { + match str_setting.replace('\'', "").split_once("::") { + Some((s1, s2)) => { + let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect()); + (s1.to_owned(), Some(col_path)) + } + None => (str_setting.to_owned(), None), + } +} diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index f095552dadf2..cb4e58040bdd 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -70,7 +70,10 @@ COPY source_table TO 'test_files/scratch/copy/table_with_options' (format parquet, single_file_output false, -compression 'snappy', +compression snappy, +'compression::col1' 'zstd(5)', +'compression::col2' 'zstd(10)', +'compression::col3.nested' 'zstd(12)', max_row_group_size 12345, data_pagesize_limit 1234, write_batch_size 1234, From dec3137a843afef73013fa1ed6517114df8d8260 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sat, 2 Sep 2023 20:45:52 -0400 Subject: [PATCH 2/3] add unit test --- datafusion/common/src/file_options/mod.rs | 120 ++++++++++++++++++ .../common/src/file_options/parquet_writer.rs | 74 ++++++++--- 2 files changed, 174 insertions(+), 20 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 29ee73f80fc6..ab792bdca7f4 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -377,6 +377,126 @@ mod tests { Ok(()) } + #[test] + fn test_writeroptions_parquet_column_specific() -> Result<()> { + let mut option_map: HashMap = HashMap::new(); + + option_map.insert("bloom_filter_enabled::col1".to_owned(), "true".to_owned()); + option_map.insert( + "bloom_filter_enabled::col2.nested".to_owned(), + "true".to_owned(), + ); + option_map.insert("encoding::col1".to_owned(), "plain".to_owned()); + option_map.insert("encoding::col2.nested".to_owned(), "rle".to_owned()); + option_map.insert("dictionary_enabled::col1".to_owned(), "true".to_owned()); + option_map.insert( + "dictionary_enabled::col2.nested".to_owned(), + "true".to_owned(), + ); + option_map.insert("compression::col1".to_owned(), "zstd(4)".to_owned()); + option_map.insert("compression::col2.nested".to_owned(), "zstd(10)".to_owned()); + option_map.insert("statistics_enabled::col1".to_owned(), "page".to_owned()); + option_map.insert( + "statistics_enabled::col2.nested".to_owned(), + "none".to_owned(), + ); + option_map.insert("bloom_filter_fpp::col1".to_owned(), "0.123".to_owned()); + option_map.insert( + "bloom_filter_fpp::col2.nested".to_owned(), + "0.456".to_owned(), + ); + option_map.insert("bloom_filter_ndv::col1".to_owned(), "123".to_owned()); + option_map.insert("bloom_filter_ndv::col2.nested".to_owned(), "456".to_owned()); + + let options = StatementOptions::from(&option_map); + let config = ConfigOptions::new(); + + let parquet_options = ParquetWriterOptions::try_from((&config, &options))?; + let properties = parquet_options.writer_options(); + + let col1 = ColumnPath::from(vec!["col1".to_owned()]); + let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]); + + // Verify the expected options propagated down to parquet crate WriterProperties struct + + properties + .bloom_filter_properties(&col1) + .expect("expected bloom filter enabled for col1"); + + properties + .bloom_filter_properties(&col2_nested) + .expect("expected bloom filter enabled cor col2_nested"); + + assert_eq!( + properties.encoding(&col1).expect("expected encoding"), + Encoding::PLAIN + ); + + assert_eq!( + properties + .encoding(&col2_nested) + .expect("expected encoding"), + Encoding::RLE + ); + + assert!(properties.dictionary_enabled(&col1)); + assert!(properties.dictionary_enabled(&col2_nested)); + + assert_eq!( + properties.compression(&col1), + Compression::ZSTD(ZstdLevel::try_new(4_i32)?) + ); + + assert_eq!( + properties.compression(&col2_nested), + Compression::ZSTD(ZstdLevel::try_new(10_i32)?) + ); + + assert_eq!( + properties.statistics_enabled(&col1), + EnabledStatistics::Page + ); + + assert_eq!( + properties.statistics_enabled(&col2_nested), + EnabledStatistics::None + ); + + assert_eq!( + properties + .bloom_filter_properties(&col1) + .expect("expected bloom properties!") + .fpp, + 0.123 + ); + + assert_eq!( + properties + .bloom_filter_properties(&col2_nested) + .expect("expected bloom properties!") + .fpp, + 0.456 + ); + + assert_eq!( + properties + .bloom_filter_properties(&col1) + .expect("expected bloom properties!") + .ndv, + 123 + ); + + assert_eq!( + properties + .bloom_filter_properties(&col2_nested) + .expect("expected bloom properties!") + .ndv, + 456 + ); + + Ok(()) + } + #[test] fn test_writeroptions_csv_from_statement_options() -> Result<()> { let mut option_map: HashMap = HashMap::new(); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index b8d82d228906..79443e6d28c9 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -139,33 +139,67 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions { "data_page_row_count_limit" => builder .set_data_page_row_count_limit(value.parse() .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), - "bloom_filter_enabled" => builder - .set_bloom_filter_enabled(value.parse() - .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), - "encoding" => builder - .set_encoding(parse_encoding_string(value)?), - "dictionary_enabled" => builder - .set_dictionary_enabled(value.parse() - .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?), + "bloom_filter_enabled" => { + let parsed_value = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?; + match col_path{ + Some(path) => builder.set_column_bloom_filter_enabled(path, parsed_value), + None => builder.set_bloom_filter_enabled(parsed_value) + } + }, + "encoding" => { + let parsed_encoding = parse_encoding_string(value)?; + match col_path{ + Some(path) => builder.set_column_encoding(path, parsed_encoding), + None => builder.set_encoding(parsed_encoding) + } + }, + "dictionary_enabled" => { + let parsed_value = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?; + match col_path{ + Some(path) => builder.set_column_dictionary_enabled(path, parsed_value), + None => builder.set_dictionary_enabled(parsed_value) + } + }, "compression" => { - println!("Got {value} for {col_path:?}"); let parsed_compression = parse_compression_string(value)?; match col_path{ Some(path) => builder.set_column_compression(path, parsed_compression), None => builder.set_compression(parsed_compression) } }, - "statistics_enabled" => builder - .set_statistics_enabled(parse_statistics_string(value)?), - "max_statistics_size" => builder - .set_max_statistics_size(value.parse() - .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?), - "bloom_filter_fpp" => builder - .set_bloom_filter_fpp(value.parse() - .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?), - "bloom_filter_ndv" => builder - .set_bloom_filter_ndv(value.parse() - .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?), + "statistics_enabled" => { + let parsed_value = parse_statistics_string(value)?; + match col_path{ + Some(path) => builder.set_column_statistics_enabled(path, parsed_value), + None => builder.set_statistics_enabled(parsed_value) + } + }, + "max_statistics_size" => { + let parsed_value = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?; + match col_path{ + Some(path) => builder.set_column_max_statistics_size(path, parsed_value), + None => builder.set_max_statistics_size(parsed_value) + } + }, + "bloom_filter_fpp" => { + let parsed_value = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?; + match col_path{ + Some(path) => builder.set_column_bloom_filter_fpp(path, parsed_value), + None => builder.set_bloom_filter_fpp(parsed_value) + } + }, + "bloom_filter_ndv" => { + let parsed_value = value.parse() + .map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?; + match col_path{ + Some(path) => builder.set_column_bloom_filter_ndv(path, parsed_value), + None => builder.set_bloom_filter_ndv(parsed_value) + } + }, _ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!"))) } } From 6f3084b08d238901a193e11cb5604d665250ff5d Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 6 Sep 2023 18:16:37 -0400 Subject: [PATCH 3/3] exxpand copy.slt test --- datafusion/sqllogictest/test_files/copy.slt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index cb4e58040bdd..3ade43b4e879 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -72,8 +72,7 @@ TO 'test_files/scratch/copy/table_with_options' single_file_output false, compression snappy, 'compression::col1' 'zstd(5)', -'compression::col2' 'zstd(10)', -'compression::col3.nested' 'zstd(12)', +'compression::col2' snappy, max_row_group_size 12345, data_pagesize_limit 1234, write_batch_size 1234, @@ -83,9 +82,15 @@ created_by 'DF copy.slt', column_index_truncate_length 123, data_page_row_count_limit 1234, bloom_filter_enabled true, +'bloom_filter_enabled::col1' false, +'bloom_filter_fpp::col2' 0.456, +'bloom_filter_ndv::col2' 456, encoding plain, +'encoding::col1' DELTA_BINARY_PACKED, +'dictionary_enabled::col2' true, dictionary_enabled false, statistics_enabled page, +'statistics_enabled::col2' none, max_statistics_size 123, bloom_filter_fpp 0.001, bloom_filter_ndv 100