Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,126 @@ mod tests {
Ok(())
}

#[test]
fn test_writeroptions_parquet_column_specific() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();

option_map.insert("bloom_filter_enabled::col1".to_owned(), "true".to_owned());
option_map.insert(
"bloom_filter_enabled::col2.nested".to_owned(),
"true".to_owned(),
);
option_map.insert("encoding::col1".to_owned(), "plain".to_owned());
option_map.insert("encoding::col2.nested".to_owned(), "rle".to_owned());
option_map.insert("dictionary_enabled::col1".to_owned(), "true".to_owned());
option_map.insert(
"dictionary_enabled::col2.nested".to_owned(),
"true".to_owned(),
);
option_map.insert("compression::col1".to_owned(), "zstd(4)".to_owned());
option_map.insert("compression::col2.nested".to_owned(), "zstd(10)".to_owned());
option_map.insert("statistics_enabled::col1".to_owned(), "page".to_owned());
option_map.insert(
"statistics_enabled::col2.nested".to_owned(),
"none".to_owned(),
);
option_map.insert("bloom_filter_fpp::col1".to_owned(), "0.123".to_owned());
option_map.insert(
"bloom_filter_fpp::col2.nested".to_owned(),
"0.456".to_owned(),
);
option_map.insert("bloom_filter_ndv::col1".to_owned(), "123".to_owned());
option_map.insert("bloom_filter_ndv::col2.nested".to_owned(), "456".to_owned());

let options = StatementOptions::from(&option_map);
let config = ConfigOptions::new();

let parquet_options = ParquetWriterOptions::try_from((&config, &options))?;
let properties = parquet_options.writer_options();

let col1 = ColumnPath::from(vec!["col1".to_owned()]);
let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);

// Verify the expected options propagated down to parquet crate WriterProperties struct

properties
.bloom_filter_properties(&col1)
.expect("expected bloom filter enabled for col1");

properties
.bloom_filter_properties(&col2_nested)
.expect("expected bloom filter enabled cor col2_nested");

assert_eq!(
properties.encoding(&col1).expect("expected encoding"),
Encoding::PLAIN
);

assert_eq!(
properties
.encoding(&col2_nested)
.expect("expected encoding"),
Encoding::RLE
);

assert!(properties.dictionary_enabled(&col1));
assert!(properties.dictionary_enabled(&col2_nested));

assert_eq!(
properties.compression(&col1),
Compression::ZSTD(ZstdLevel::try_new(4_i32)?)
);

assert_eq!(
properties.compression(&col2_nested),
Compression::ZSTD(ZstdLevel::try_new(10_i32)?)
);

assert_eq!(
properties.statistics_enabled(&col1),
EnabledStatistics::Page
);

assert_eq!(
properties.statistics_enabled(&col2_nested),
EnabledStatistics::None
);

assert_eq!(
properties
.bloom_filter_properties(&col1)
.expect("expected bloom properties!")
.fpp,
0.123
);

assert_eq!(
properties
.bloom_filter_properties(&col2_nested)
.expect("expected bloom properties!")
.fpp,
0.456
);

assert_eq!(
properties
.bloom_filter_properties(&col1)
.expect("expected bloom properties!")
.ndv,
123
);

assert_eq!(
properties
.bloom_filter_properties(&col2_nested)
.expect("expected bloom properties!")
.ndv,
456
);

Ok(())
}

#[test]
fn test_writeroptions_csv_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
Expand Down
85 changes: 63 additions & 22 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
DataFusionError, Result,
};

use super::StatementOptions;
use super::{parse_utils::split_option_and_column_path, StatementOptions};

/// Options for writing parquet files
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -115,6 +115,7 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
let statement_options = configs_and_statement_options.1;
let mut builder = default_builder(configs)?;
for (option, value) in &statement_options.options {
let (option, col_path) = split_option_and_column_path(option);
builder = match option.to_lowercase().as_str(){
"max_row_group_size" => builder
.set_max_row_group_size(value.parse()
Expand All @@ -138,27 +139,67 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
"data_page_row_count_limit" => builder
.set_data_page_row_count_limit(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
"bloom_filter_enabled" => builder
.set_bloom_filter_enabled(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
"encoding" => builder
.set_encoding(parse_encoding_string(value)?),
"dictionary_enabled" => builder
.set_dictionary_enabled(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
"compression" => builder
.set_compression(parse_compression_string(value)?),
"statistics_enabled" => builder
.set_statistics_enabled(parse_statistics_string(value)?),
"max_statistics_size" => builder
.set_max_statistics_size(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?),
"bloom_filter_fpp" => builder
.set_bloom_filter_fpp(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?),
"bloom_filter_ndv" => builder
.set_bloom_filter_ndv(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?),
"bloom_filter_enabled" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_bloom_filter_enabled(path, parsed_value),
None => builder.set_bloom_filter_enabled(parsed_value)
}
},
"encoding" => {
let parsed_encoding = parse_encoding_string(value)?;
match col_path{
Some(path) => builder.set_column_encoding(path, parsed_encoding),
None => builder.set_encoding(parsed_encoding)
}
},
"dictionary_enabled" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_dictionary_enabled(path, parsed_value),
None => builder.set_dictionary_enabled(parsed_value)
}
},
"compression" => {
let parsed_compression = parse_compression_string(value)?;
match col_path{
Some(path) => builder.set_column_compression(path, parsed_compression),
None => builder.set_compression(parsed_compression)
}
},
"statistics_enabled" => {
let parsed_value = parse_statistics_string(value)?;
match col_path{
Some(path) => builder.set_column_statistics_enabled(path, parsed_value),
None => builder.set_statistics_enabled(parsed_value)
}
},
"max_statistics_size" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as usize as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_max_statistics_size(path, parsed_value),
None => builder.set_max_statistics_size(parsed_value)
}
},
"bloom_filter_fpp" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as f64 as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_bloom_filter_fpp(path, parsed_value),
None => builder.set_bloom_filter_fpp(parsed_value)
}
},
"bloom_filter_ndv" => {
let parsed_value = value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as u64 as required for {option}!")))?;
match col_path{
Some(path) => builder.set_column_bloom_filter_ndv(path, parsed_value),
None => builder.set_bloom_filter_ndv(parsed_value)
}
},
_ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for Parquet format!")))
}
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/file_options/parse_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
schema::types::ColumnPath,
};

use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -181,3 +182,15 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatis
))),
}
}

pub(crate) fn split_option_and_column_path(
str_setting: &str,
) -> (String, Option<ColumnPath>) {
match str_setting.replace('\'', "").split_once("::") {
Some((s1, s2)) => {
let col_path = ColumnPath::new(s2.split('.').map(|s| s.to_owned()).collect());
(s1.to_owned(), Some(col_path))
}
None => (str_setting.to_owned(), None),
}
}
10 changes: 9 additions & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ COPY source_table
TO 'test_files/scratch/copy/table_with_options'
(format parquet,
single_file_output false,
compression 'snappy',
compression snappy,
'compression::col1' 'zstd(5)',
'compression::col2' snappy,
max_row_group_size 12345,
data_pagesize_limit 1234,
write_batch_size 1234,
Expand All @@ -80,9 +82,15 @@ created_by 'DF copy.slt',
column_index_truncate_length 123,
data_page_row_count_limit 1234,
bloom_filter_enabled true,
'bloom_filter_enabled::col1' false,
'bloom_filter_fpp::col2' 0.456,
'bloom_filter_ndv::col2' 456,
encoding plain,
'encoding::col1' DELTA_BINARY_PACKED,
'dictionary_enabled::col2' true,
dictionary_enabled false,
statistics_enabled page,
'statistics_enabled::col2' none,
max_statistics_size 123,
bloom_filter_fpp 0.001,
bloom_filter_ndv 100
Expand Down