Skip to content

Commit

Permalink
Split parquet bloom filter config and enable bloom filter on read by …
Browse files Browse the repository at this point in the history
…default (#10306)

* Split bloom filter config

* Fix proto

* Set bloom_filter on write as false

* Fix tests

* fmt md

* Fix test

* Fix slt tests

* clippy

* Update datafusion/common/src/config.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/common/src/config.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Remove enabled suffix

* Regen proto and fix tests

* Update configs.md

* Improve bloom filter test

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
lewiszlw and alamb committed May 2, 2024
1 parent 2c0afce commit 6d77748
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 46 deletions.
8 changes: 6 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,11 @@ config_namespace! {
/// default parquet writer setting
pub encoding: Option<String>, default = None

/// Sets if bloom filter is enabled for any column
pub bloom_filter_enabled: bool, default = false
/// Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// Write bloom filters for all columns when creating parquet files
pub bloom_filter_on_write: bool, default = false

/// Sets bloom filter false positive probability. If NULL, uses
/// default parquet writer setting
Expand Down Expand Up @@ -1662,6 +1665,7 @@ config_namespace! {
}

#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
CSV(CsvOptions),
JSON(JsonOptions),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
"format.data_page_row_count_limit".to_owned(),
"123".to_owned(),
);
option_map.insert("format.bloom_filter_enabled".to_owned(), "true".to_owned());
option_map.insert("format.bloom_filter_on_write".to_owned(), "true".to_owned());
option_map.insert("format.encoding".to_owned(), "plain".to_owned());
option_map.insert("format.dictionary_enabled".to_owned(), "true".to_owned());
option_map.insert("format.compression".to_owned(), "zstd(4)".to_owned());
Expand Down
5 changes: 3 additions & 2 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
bloom_filter_on_write,
encoding,
dictionary_enabled,
compression,
Expand All @@ -80,6 +80,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Expand All @@ -104,7 +105,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_bloom_filter_enabled(*bloom_filter_on_write)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,7 @@ mod tests {
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"datafusion.execution.parquet.bloom_filter_on_write".into(),
"true".into(),
);
config_map.insert(
Expand Down Expand Up @@ -1710,7 +1710,7 @@ mod tests {
"delta_binary_packed".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"datafusion.execution.parquet.bloom_filter_on_write".into(),
"true".into(),
);
config_map.insert(
Expand Down
22 changes: 16 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,24 @@ impl ParquetExec {
}

/// If enabled, the reader will read by the bloom filter
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.table_parquet_options.global.bloom_filter_enabled = enable_bloom_filter;
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self) -> bool {
self.table_parquet_options.global.bloom_filter_enabled
/// If enabled, the writer will write by the bloom filter
pub fn with_bloom_filter_on_write(
mut self,
enable_bloom_filter_on_write: bool,
) -> Self {
self.table_parquet_options.global.bloom_filter_on_write =
enable_bloom_filter_on_write;
self
}

/// Return the value described in [`Self::with_bloom_filter_on_read`]
fn bloom_filter_on_read(&self) -> bool {
self.table_parquet_options.global.bloom_filter_on_read
}

fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning {
Expand Down Expand Up @@ -407,7 +417,7 @@ impl ExecutionPlan for ParquetExec {
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.enable_bloom_filter(),
enable_bloom_filter: self.bloom_filter_on_read(),
};

let stream =
Expand Down
4 changes: 2 additions & 2 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,12 @@ impl SessionConfig {

/// Returns true if bloom filter should be used to skip parquet row groups
pub fn parquet_bloom_filter_pruning(&self) -> bool {
self.options.execution.parquet.bloom_filter_enabled
self.options.execution.parquet.bloom_filter_on_read
}

/// Enables or disables the use of bloom filter for parquet readers to skip row groups
pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.bloom_filter_enabled = enabled;
self.options.execution.parquet.bloom_filter_on_read = enabled;
self
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1215,10 +1215,12 @@ message ParquetOptions {
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
uint64 write_batch_size = 8; // default = 1024
string writer_version = 9; // default = "1.0"
bool bloom_filter_enabled = 20; // default = false
// bool bloom_filter_enabled = 20; // default = false
bool allow_single_file_parallelism = 23; // default = true
uint64 maximum_parallel_row_group_writers = 24; // default = 1
uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
54 changes: 36 additions & 18 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,8 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_enabled: value.bloom_filter_enabled,
bloom_filter_on_read: value.bloom_filter_on_read,
bloom_filter_on_write: value.bloom_filter_on_write,
bloom_filter_fpp: value.clone()
.bloom_filter_fpp_opt
.map(|opt| match opt {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,8 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
data_page_row_count_limit: value.data_page_row_count_limit as u64,
encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
bloom_filter_enabled: value.bloom_filter_enabled,
bloom_filter_on_read: value.bloom_filter_on_read,
bloom_filter_on_write: value.bloom_filter_on_write,
bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
allow_single_file_parallelism: value.allow_single_file_parallelism,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
TableOptions::default_from_session_config(ctx.state().config_options());
let mut parquet_format = table_options.parquet;

parquet_format.global.bloom_filter_enabled = true;
parquet_format.global.bloom_filter_on_read = true;
parquet_format.global.created_by = "DataFusion Test".to_string();
parquet_format.global.writer_version = "PARQUET_2_0".to_string();
parquet_format.global.write_batch_size = 111;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ OPTIONS (
'format.created_by' 'DF copy.slt',
'format.column_index_truncate_length' 123,
'format.data_page_row_count_limit' 1234,
'format.bloom_filter_enabled' true,
'format.bloom_filter_on_read' true,
'format.bloom_filter_enabled::col1' false,
'format.bloom_filter_fpp::col2' 0.456,
'format.bloom_filter_ndv::col2' 456,
Expand Down
Loading

0 comments on commit 6d77748

Please sign in to comment.