diff --git a/Cargo.lock b/Cargo.lock index 5186c6d6d598..eebb802394b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,9 +240,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd798aea3553913a5986813e9c6ad31a2d2b04e931fe8ea4a37155eb541cebb5" +checksum = "c26b57282a08ae92f727497805122fec964c6245cfa0e13f0e75452eaf3bc41f" dependencies = [ "arrow-arith", "arrow-array", @@ -264,9 +264,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "508dafb53e5804a238cab7fd97a59ddcbfab20cc4d9814b1ab5465b9fa147f2e" +checksum = "cebf38ca279120ff522f4954b81a39527425b6e9f615e6b72842f4de1ffe02b8" dependencies = [ "arrow-array", "arrow-buffer", @@ -278,9 +278,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2730bc045d62bb2e53ef8395b7d4242f5c8102f41ceac15e8395b9ac3d08461" +checksum = "744109142cdf8e7b02795e240e20756c2a782ac9180d4992802954a8f871c0de" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54295b93beb702ee9a6f6fbced08ad7f4d76ec1c297952d4b83cf68755421d1d" +checksum = "601bb103c4c374bcd1f62c66bcea67b42a2ee91a690486c37d4c180236f11ccc" dependencies = [ "bytes", "half", @@ -306,9 +306,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67e8bcb7dc971d779a7280593a1bf0c2743533b8028909073e804552e85e75b5" +checksum = "eed61d9d73eda8df9e3014843def37af3050b5080a9acbe108f045a316d5a0be" dependencies = [ "arrow-array", "arrow-buffer", @@ -327,9 +327,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "673fd2b5fb57a1754fdbfac425efd7cf54c947ac9950c1cce86b14e248f1c458" +checksum = "fa95b96ce0c06b4d33ac958370db8c0d31e88e54f9d6e08b0353d18374d9f991" dependencies = [ "arrow-array", "arrow-cast", @@ -342,9 +342,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97c22fe3da840039c69e9f61f81e78092ea36d57037b4900151f063615a2f6b4" +checksum = "43407f2c6ba2367f64d85d4603d6fb9c4b92ed79d2ffd21021b37efa96523e12" dependencies = [ "arrow-buffer", "arrow-schema", @@ -354,9 +354,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6808d235786b721e49e228c44dd94242f2e8b46b7e95b233b0733c46e758bfee" +checksum = "d7c66c5e4a7aedc2bfebffeabc2116d76adb22e08d230b968b995da97f8b11ca" dependencies = [ "arrow-arith", "arrow-array", @@ -381,14 +381,15 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "778de14c5a69aedb27359e3dd06dd5f9c481d5f6ee9fbae912dba332fd64636b" +checksum = "e4b0487c4d2ad121cbc42c4db204f1509f8618e589bc77e635e9c40b502e3b90" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", + "arrow-select", "flatbuffers", "lz4_flex", "zstd", @@ -396,9 +397,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3860db334fe7b19fcf81f6b56f8d9d95053f3839ffe443d56b5436f7a29a1794" +checksum = "26d747573390905905a2dc4c5a61a96163fe2750457f90a04ee2a88680758c79" dependencies = [ "arrow-array", "arrow-buffer", @@ -418,9 +419,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "425fa0b42a39d3ff55160832e7c25553e7f012c3f187def3d70313e7a29ba5d9" +checksum = "c142a147dceb59d057bad82400f1693847c80dca870d008bf7b91caf902810ae" dependencies = [ "arrow-array", "arrow-buffer", @@ -431,9 +432,9 @@ dependencies = [ [[package]] name = "arrow-pyarrow" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d944d8ae9b77230124e6570865b570416c33a5809f32c4136c679bbe774e45c9" +checksum = "5b9038de599df1b81f63b42220e2b6cd6fd4f09af333858cd320db9bef5ac757" dependencies = [ "arrow-array", "arrow-data", @@ -443,9 +444,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df9c9423c9e71abd1b08a7f788fcd203ba2698ac8e72a1f236f1faa1a06a7414" +checksum = "dac6620667fccdab4204689ca173bd84a15de6bb6b756c3a8764d4d7d0c2fc04" dependencies = [ "arrow-array", "arrow-buffer", @@ -456,9 +457,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fa1babc4a45fdc64a92175ef51ff00eba5ebbc0007962fecf8022ac1c6ce28" +checksum = "dfa93af9ff2bb80de539e6eb2c1c8764abd0f4b73ffb0d7c82bf1f9868785e66" dependencies = [ "bitflags 2.9.1", "serde", @@ -467,9 +468,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8854d15f1cf5005b4b358abeb60adea17091ff5bdd094dca5d3f73787d81170" +checksum = "be8b2e0052cd20d36d64f32640b68a5ab54d805d24a473baee5d52017c85536c" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -481,9 +482,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c477e8b89e1213d5927a2a84a72c384a9bf4dd0dbf15f9fd66d821aafd9e95e" +checksum = "c2155e26e17f053c8975c546fc70cf19c00542f9abf43c23a88a46ef7204204f" dependencies = [ "arrow-array", "arrow-buffer", @@ -4529,9 +4530,9 @@ dependencies = [ [[package]] name = "parquet" -version = "56.0.0" +version = "56.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7288a07ed5d25939a90f9cb1ca5afa6855faa08ec7700613511ae64bdb0620c" +checksum = "89b56b41d1bd36aae415e42f91cae70ee75cf6cba74416b14dce3e958d5990ec" dependencies = [ "ahash 0.8.12", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 53c35ed35f0d..78e437250627 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,19 +90,19 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.20", default-features = false } -arrow = { version = "56.0.0", features = [ +arrow = { version = "56.1.0", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "56.0.0", default-features = false } -arrow-flight = { version = "56.0.0", features = [ +arrow-buffer = { version = "56.1.0", default-features = false } +arrow-flight = { version = "56.1.0", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "56.0.0", default-features = false, features = [ +arrow-ipc = { version = "56.1.0", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "56.0.0", default-features = false } -arrow-schema = { version = "56.0.0", default-features = false } +arrow-ord = { version = "56.1.0", default-features = false } +arrow-schema = { version = "56.1.0", default-features = false } async-trait = "0.1.89" bigdecimal = "0.4.8" bytes = "1.10" @@ -157,7 +157,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.12.3", default-features = false } parking_lot = "0.12" -parquet = { version = "56.0.0", default-features = false, features = [ +parquet = { version = "56.1.0", default-features = false, features = [ "arrow", "async", "object_store", diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index a28e97a9f88e..7b73ea08cc27 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -570,15 +570,15 @@ mod tests { let df = ctx.sql(sql).await?; let rbs = df.collect().await?; - assert_snapshot!(batches_to_string(&rbs),@r#" + assert_snapshot!(batches_to_string(&rbs),@r" +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ | alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 881634 | 2 | page_index=true | + | alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true | | lz4_raw_compressed_larger.parquet | 380836 | 2939 | 2 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ - "#); + "); // increase the number of hits ctx.sql("select * from alltypes_plain") @@ -601,15 +601,15 @@ mod tests { let df = ctx.sql(sql).await?; let rbs = df.collect().await?; - assert_snapshot!(batches_to_string(&rbs),@r#" + assert_snapshot!(batches_to_string(&rbs),@r" +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ | alltypes_plain.parquet | 1851 | 10181 | 5 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 881634 | 2 | page_index=true | + | alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true | | lz4_raw_compressed_larger.parquet | 380836 | 2939 | 3 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ - "#); + "); Ok(()) } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index cdd8e72a06cc..51fc95466394 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -560,6 +560,14 @@ config_namespace! { /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true + /// (reading) The maximum predicate cache size, in bytes. When + /// `pushdown_filters` is enabled, sets the maximum memory used to cache + /// the results of predicate evaluation between filter evaluation and + /// output generation. Decreasing this value will reduce memory usage, + /// but may increase IO and CPU usage. None means use the default + /// parquet reader setting. 0 means no caching. + pub max_predicate_cache_size: Option, default = None + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 185826aef47d..d052e6bb5948 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -233,6 +233,7 @@ impl ParquetOptions { binary_as_string: _, // not used for writer props coerce_int96: _, // not used for writer props skip_arrow_metadata: _, + max_predicate_cache_size: _, } = self; let mut builder = WriterProperties::builder() @@ -425,6 +426,10 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result usize { } } } + +#[tokio::test] +async fn predicate_cache_default() -> datafusion_common::Result<()> { + let ctx = SessionContext::new(); + // The cache is on by default, but not used unless filter pushdown is enabled + PredicateCacheTest { + expected_inner_records: 0, + expected_records: 0, + } + .run(&ctx) + .await +} + +#[tokio::test] +async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> { + let mut config = SessionConfig::new(); + config.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(config); + // The cache is on by default, and used when filter pushdown is enabled + PredicateCacheTest { + expected_inner_records: 8, + expected_records: 4, + } + .run(&ctx) + .await +} + +#[tokio::test] +async fn predicate_cache_pushdown_disable() -> datafusion_common::Result<()> { + // Can disable the cache even with filter pushdown by setting the size to 0. In this case we + // expect the inner records are reported but no records are read from the cache + let mut config = SessionConfig::new(); + config.options_mut().execution.parquet.pushdown_filters = true; + config + .options_mut() + .execution + .parquet + .max_predicate_cache_size = Some(0); + let ctx = SessionContext::new_with_config(config); + PredicateCacheTest { + // file has 8 rows, which need to be read twice, one for filter, one for + // final output + expected_inner_records: 16, + // Expect this to 0 records read as the cache is disabled. However, it is + // non zero due to https://github.com/apache/arrow-rs/issues/8307 + expected_records: 3, + } + .run(&ctx) + .await +} + +/// Runs the query "SELECT * FROM alltypes_plain WHERE double_col != 0.0" +/// with a given SessionContext and asserts that the predicate cache metrics +/// are as expected +#[derive(Debug)] +struct PredicateCacheTest { + /// Expected records read from the underlying reader (to evaluate filters) + /// -- this is the total number of records in the file + expected_inner_records: usize, + /// Expected records to be read from the cache (after filtering) + expected_records: usize, +} + +impl PredicateCacheTest { + async fn run(self, ctx: &SessionContext) -> datafusion_common::Result<()> { + let Self { + expected_inner_records, + expected_records, + } = self; + // Create a dataframe that scans the "alltypes_plain.parquet" file with + // a filter on `double_col != 0.0` + let path = parquet_test_data() + "/alltypes_plain.parquet"; + let exec = ctx + .read_parquet(path, ParquetReadOptions::default()) + .await? + .filter(col("double_col").not_eq(lit(0.0)))? + .create_physical_plan() + .await?; + + // run the plan to completion + let _ = collect(exec.clone(), ctx.task_ctx()).await?; // run plan + let metrics = + TestParquetFile::parquet_metrics(&exec).expect("found parquet metrics"); + + // verify the predicate cache metrics + assert_eq!( + get_value(&metrics, "predicate_cache_inner_records"), + expected_inner_records + ); + assert_eq!( + get_value(&metrics, "predicate_cache_records"), + expected_records + ); + Ok(()) + } +} diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 71c81a25001b..8e3de5bbf2c0 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -39,7 +39,9 @@ use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_to_arrow_schema; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use parquet::file::metadata::{ + PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, +}; use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -148,7 +150,7 @@ impl<'a> DFParquetMetadata<'a> { if cache_metadata && file_metadata_cache.is_some() { // Need to retrieve the entire metadata for the caching to be effective. - reader = reader.with_page_indexes(true); + reader = reader.with_page_index_policy(PageIndexPolicy::Optional); } let metadata = Arc::new( diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 574fe2a040ea..d75a979d4cad 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -72,6 +72,13 @@ pub struct ParquetFileMetrics { pub page_index_eval_time: Time, /// Total time spent reading and parsing metadata from the footer pub metadata_load_time: Time, + /// Predicate Cache: number of records read directly from the inner reader. + /// This is the number of rows decoded while evaluating predicates + pub predicate_cache_inner_records: Count, + /// Predicate Cache: number of records read from the cache. This is the + /// number of rows that were stored in the cache after evaluating predicates + /// reused for the output. + pub predicate_cache_records: Count, } impl ParquetFileMetrics { @@ -140,6 +147,14 @@ impl ParquetFileMetrics { let files_ranges_pruned_statistics = MetricBuilder::new(metrics) .counter("files_ranges_pruned_statistics", partition); + let predicate_cache_inner_records = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("predicate_cache_inner_records", partition); + + let predicate_cache_records = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("predicate_cache_records", partition); + Self { files_ranges_pruned_statistics, predicate_evaluation_errors, @@ -157,6 +172,8 @@ impl ParquetFileMetrics { bloom_filter_eval_time, page_index_eval_time, metadata_load_time, + predicate_cache_inner_records, + predicate_cache_records, } } } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 93a3d4af5432..def53706c34a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -51,10 +51,11 @@ use datafusion_execution::parquet_encryption::EncryptionFactory; use futures::{ready, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; +use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::ParquetMetaDataReader; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -105,6 +106,9 @@ pub(super) struct ParquetOpener { #[cfg(feature = "parquet_encryption")] pub encryption_factory: Option<(Arc, EncryptionFactoryOptions)>, + /// Maximum size of the predicate cache, in bytes. If none, uses + /// the arrow-rs default. + pub max_predicate_cache_size: Option, } impl FileOpener for ParquetOpener { @@ -152,6 +156,7 @@ impl FileOpener for ParquetOpener { let enable_page_index = self.enable_page_index; let encryption_context = self.get_encryption_context(); + let max_predicate_cache_size = self.max_predicate_cache_size; Ok(Box::pin(async move { let file_decryption_properties = encryption_context @@ -401,21 +406,42 @@ impl FileOpener for ParquetOpener { builder = builder.with_limit(limit) } + if let Some(max_predicate_cache_size) = max_predicate_cache_size { + builder = builder.with_max_predicate_cache_size(max_predicate_cache_size); + } + + // metrics from the arrow reader itself + let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + let stream = builder .with_projection(mask) .with_batch_size(batch_size) .with_row_groups(row_group_indexes) + .with_metrics(arrow_reader_metrics.clone()) .build()?; - let stream = stream - .map_err(DataFusionError::from) - .map(move |b| b.and_then(|b| schema_mapping.map_batch(b))); + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + let predicate_cache_inner_records = + file_metrics.predicate_cache_inner_records.clone(); + let predicate_cache_records = file_metrics.predicate_cache_records.clone(); + + let stream = stream.map_err(DataFusionError::from).map(move |b| { + b.and_then(|b| { + copy_arrow_reader_metrics( + &arrow_reader_metrics, + &predicate_cache_inner_records, + &predicate_cache_records, + ); + schema_mapping.map_batch(b) + }) + }); if let Some(file_pruner) = file_pruner { Ok(EarlyStoppingStream::new( stream, file_pruner, - file_metrics.files_ranges_pruned_statistics.clone(), + files_ranges_pruned_statistics, ) .boxed()) } else { @@ -425,6 +451,22 @@ impl FileOpener for ParquetOpener { } } +/// Copies metrics from ArrowReaderMetrics (the metrics collected by the +/// arrow-rs parquet reader) to the parquet file metrics for DataFusion +fn copy_arrow_reader_metrics( + arrow_reader_metrics: &ArrowReaderMetrics, + predicate_cache_inner_records: &Count, + predicate_cache_records: &Count, +) { + if let Some(v) = arrow_reader_metrics.records_read_from_inner() { + predicate_cache_inner_records.add(v); + } + + if let Some(v) = arrow_reader_metrics.records_read_from_cache() { + predicate_cache_records.add(v); + } +} + /// Wraps an inner RecordBatchStream and a [`FilePruner`] /// /// This can terminate the scan early when some dynamic filters is updated after @@ -652,8 +694,8 @@ async fn load_page_index( if missing_column_index || missing_offset_index { let m = Arc::try_unwrap(Arc::clone(parquet_metadata)) .unwrap_or_else(|e| e.as_ref().clone()); - let mut reader = - ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + let mut reader = ParquetMetaDataReader::new_with_metadata(m) + .with_page_index_policy(PageIndexPolicy::Optional); reader.load_page_index(input).await?; let new_parquet_metadata = reader.finish()?; let new_arrow_reader = @@ -823,6 +865,7 @@ mod test { expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] encryption_factory: None, + max_predicate_cache_size: None, } }; @@ -911,6 +954,7 @@ mod test { expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] encryption_factory: None, + max_predicate_cache_size: None, } }; @@ -1015,6 +1059,7 @@ mod test { expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] encryption_factory: None, + max_predicate_cache_size: None, } }; let make_meta = || FileMeta { @@ -1129,6 +1174,7 @@ mod test { expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] encryption_factory: None, + max_predicate_cache_size: None, } }; @@ -1244,6 +1290,7 @@ mod test { expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)), #[cfg(feature = "parquet_encryption")] encryption_factory: None, + max_predicate_cache_size: None, } }; @@ -1426,6 +1473,7 @@ mod test { expr_adapter_factory: None, #[cfg(feature = "parquet_encryption")] encryption_factory: None, + max_predicate_cache_size: None, }; let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 007c239ef492..4fff23413dd0 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -426,6 +426,12 @@ impl ParquetSource { self.table_parquet_options.global.bloom_filter_on_read } + /// Return the maximum predicate cache size, in bytes, used when + /// `pushdown_filters` + pub fn max_predicate_cache_size(&self) -> Option { + self.table_parquet_options.global.max_predicate_cache_size + } + /// Applies schema adapter factory from the FileScanConfig if present. /// /// # Arguments @@ -580,6 +586,7 @@ impl FileSource for ParquetSource { expr_adapter_factory, #[cfg(feature = "parquet_encryption")] encryption_factory: self.get_encryption_factory_with_config(), + max_predicate_cache_size: self.max_predicate_cache_size(), }) } diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index fab62bff840f..fbea6ae85a39 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -752,7 +752,7 @@ mod tests { .unwrap(); let size = get_record_batch_memory_size(&batch); - assert_eq!(size, 8320); + assert_eq!(size, 8208); } // ==== Spill manager tests ==== diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index f5c79cf3d9a4..f56c7318609a 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -580,6 +580,10 @@ message ParquetOptions { oneof coerce_int96_opt { string coerce_int96 = 32; } + + oneof max_predicate_cache_size_opt { + uint64 max_predicate_cache_size = 33; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index c5242d0176e6..103e52dfec90 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -999,6 +999,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, + max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { + protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), + }).unwrap_or(None), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 48782ff1d93a..f4b0022722c9 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5610,6 +5610,9 @@ impl serde::Serialize for ParquetOptions { if self.coerce_int96_opt.is_some() { len += 1; } + if self.max_predicate_cache_size_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5763,6 +5766,15 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.max_predicate_cache_size_opt.as_ref() { + match v { + parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxPredicateCacheSize", ToString::to_string(&v).as_str())?; + } + } + } struct_ser.end() } } @@ -5830,6 +5842,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv", "coerce_int96", "coerceInt96", + "max_predicate_cache_size", + "maxPredicateCacheSize", ]; #[allow(clippy::enum_variant_names)] @@ -5864,6 +5878,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterFpp, BloomFilterNdv, CoerceInt96, + MaxPredicateCacheSize, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5915,6 +5930,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp), "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), + "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5964,6 +5980,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_fpp_opt__ = None; let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; + let mut max_predicate_cache_size_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6160,6 +6177,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } coerce_int96_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_options::CoerceInt96Opt::CoerceInt96); } + GeneratedField::MaxPredicateCacheSize => { + if max_predicate_cache_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("maxPredicateCacheSize")); + } + max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); + } } } Ok(ParquetOptions { @@ -6193,6 +6216,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_fpp_opt: bloom_filter_fpp_opt__, bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, + max_predicate_cache_size_opt: max_predicate_cache_size_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa23cea57470..aa94c7937661 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -833,6 +833,10 @@ pub struct ParquetOptions { pub bloom_filter_ndv_opt: ::core::option::Option, #[prost(oneof = "parquet_options::CoerceInt96Opt", tags = "32")] pub coerce_int96_opt: ::core::option::Option, + #[prost(oneof = "parquet_options::MaxPredicateCacheSizeOpt", tags = "33")] + pub max_predicate_cache_size_opt: ::core::option::Option< + parquet_options::MaxPredicateCacheSizeOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -886,6 +890,11 @@ pub mod parquet_options { #[prost(string, tag = "32")] CoerceInt96(::prost::alloc::string::String), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum MaxPredicateCacheSizeOpt { + #[prost(uint64, tag = "33")] + MaxPredicateCacheSize(u64), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index c06427065733..610531a5bf91 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -842,6 +842,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa23cea57470..aa94c7937661 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -833,6 +833,10 @@ pub struct ParquetOptions { pub bloom_filter_ndv_opt: ::core::option::Option, #[prost(oneof = "parquet_options::CoerceInt96Opt", tags = "32")] pub coerce_int96_opt: ::core::option::Option, + #[prost(oneof = "parquet_options::MaxPredicateCacheSizeOpt", tags = "33")] + pub max_predicate_cache_size_opt: ::core::option::Option< + parquet_options::MaxPredicateCacheSizeOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -886,6 +890,11 @@ pub mod parquet_options { #[prost(string, tag = "32")] CoerceInt96(::prost::alloc::string::String), } + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] + pub enum MaxPredicateCacheSizeOpt { + #[prost(uint64, tag = "33")] + MaxPredicateCacheSize(u64), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 654607bd733d..09e4a6aa2a89 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -411,6 +411,9 @@ impl TableParquetOptionsProto { coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { parquet_options::CoerceInt96Opt::CoerceInt96(compression) }), + max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { + parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -504,6 +507,9 @@ impl From<&ParquetOptionsProto> for ParquetOptions { coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), }), + max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { + parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, + }), } } } diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 0df361a75bae..b4916cb6e6ec 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1314,7 +1314,7 @@ physical_plan 11)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 12)│ DataSourceExec ││ DataSourceExec │ 13)│ -------------------- ││ -------------------- │ -14)│ bytes: 6040 ││ bytes: 6040 │ +14)│ bytes: 5932 ││ bytes: 5932 │ 15)│ format: memory ││ format: memory │ 16)│ rows: 1 ││ rows: 1 │ 17)└───────────────────────────┘└───────────────────────────┘ @@ -1798,7 +1798,7 @@ physical_plan 11)┌─────────────┴─────────────┐ 12)│ DataSourceExec │ 13)│ -------------------- │ -14)│ bytes: 2672 │ +14)│ bytes: 2576 │ 15)│ format: memory │ 16)│ rows: 1 │ 17)└───────────────────────────┘ @@ -1821,7 +1821,7 @@ physical_plan 11)┌─────────────┴─────────────┐ 12)│ DataSourceExec │ 13)│ -------------------- │ -14)│ bytes: 2672 │ +14)│ bytes: 2576 │ 15)│ format: memory │ 16)│ rows: 1 │ 17)└───────────────────────────┘ @@ -1844,7 +1844,7 @@ physical_plan 11)┌─────────────┴─────────────┐ 12)│ DataSourceExec │ 13)│ -------------------- │ -14)│ bytes: 2672 │ +14)│ bytes: 2576 │ 15)│ format: memory │ 16)│ rows: 1 │ 17)└───────────────────────────┘ diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fb2c89020112..0c3c1a320234 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -241,6 +241,7 @@ datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 @@ -355,6 +356,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3d4730958fb3..c3fe5ffb1134 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -88,6 +88,7 @@ The following configuration settings are available: | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" |