diff --git a/Cargo.lock b/Cargo.lock index e0f811f16f1b..cd0e0bdce232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,9 +234,8 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df8bb5b0bd64c0b9bc61317fcc480bad0f00e56d3bc32c69a4c8dada4786bae" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-arith", "arrow-array", @@ -258,9 +257,8 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a640186d3bd30a24cb42264c2dafb30e236a6f50d510e56d40b708c9582491" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", @@ -272,9 +270,8 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219fe420e6800979744c8393b687afb0252b3f8a89b91027d27887b72aa36d31" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -291,9 +288,8 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76885a2697a7edf6b59577f568b456afc94ce0e2edc15b784ce3685b6c3c5c27" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "bytes", "half", @@ -303,13 +299,13 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9ebb4c987e6b3b236fb4a14b20b34835abfdd80acead3ccf1f9bf399e1f168" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", + "arrow-ord", "arrow-schema", "arrow-select", "atoi", @@ -324,9 +320,8 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92386159c8d4bce96f8bd396b0642a0d544d471bdc2ef34d631aec80db40a09c" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-cast", @@ -339,9 +334,8 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727681b95de313b600eddc2a37e736dcb21980a40f640314dcf360e2f36bc89b" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-buffer", "arrow-schema", @@ -352,9 +346,8 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f70bb56412a007b0cfc116d15f24dda6adeed9611a213852a004cda20085a3b9" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-arith", "arrow-array", @@ -380,9 +373,8 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9ba92e3de170295c98a84e5af22e2b037f0c7b32449445e6c493b5fca27f27" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", @@ -396,9 +388,8 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b969b4a421ae83828591c6bf5450bd52e6d489584142845ad6a861f42fe35df8" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", @@ -420,9 +411,8 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141c05298b21d03e88062317a1f1a73f5ba7b6eb041b350015b1cd6aabc0519b" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", @@ -433,9 +423,8 @@ dependencies = [ [[package]] name = "arrow-pyarrow" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfcfb2be2e9096236f449c11f425cddde18c4cc540f516d90f066f10a29ed515" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-data", @@ -445,9 +434,8 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f3c06a6abad6164508ed283c7a02151515cef3de4b4ff2cebbcaeb85533db2" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", @@ -458,9 +446,8 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cfa7a03d1eee2a4d061476e1840ad5c9867a544ca6c4c59256496af5d0a8be5" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "bitflags 2.10.0", "serde", @@ -470,9 +457,8 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bafa595babaad59f2455f4957d0f26448fb472722c186739f4fac0823a1bdb47" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -484,9 +470,8 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f46457dbbb99f2650ff3ac23e46a929e0ab81db809b02aa5511c258348bef2" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "arrow-array", "arrow-buffer", @@ -2824,7 +2809,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2968,7 +2953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3618,7 +3603,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.62.2", + "windows-core", ] [[package]] @@ -4104,9 +4089,9 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "lz4_flex" -version = "0.11.5" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e" dependencies = [ "twox-hash", ] @@ -4246,7 +4231,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4471,9 +4456,8 @@ dependencies = [ [[package]] name = "parquet" -version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0f31027ef1af7549f7cec603a9a21dce706d3f8d7c2060a68f43c1773be95a" +version = "57.1.0" +source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb%2Fprepare_57.1.0#15679c02153d9056529fef3613c6da749665bfab" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -5070,7 +5054,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -5489,7 +5473,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6229,7 +6213,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7131,7 +7115,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7147,7 +7131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" dependencies = [ "windows-collections", - "windows-core 0.61.2", + "windows-core", "windows-future", "windows-link 0.1.3", "windows-numerics", @@ -7159,7 +7143,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" dependencies = [ - "windows-core 0.61.2", + "windows-core", ] [[package]] @@ -7171,21 +7155,8 @@ dependencies = [ "windows-implement", "windows-interface", "windows-link 0.1.3", - "windows-result 0.3.4", - "windows-strings 0.4.2", -] - -[[package]] -name = "windows-core" -version = "0.62.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" -dependencies = [ - "windows-implement", - "windows-interface", - "windows-link 0.2.1", - "windows-result 0.4.1", - "windows-strings 0.5.1", + "windows-result", + "windows-strings", ] [[package]] @@ -7194,7 +7165,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ - "windows-core 0.61.2", + "windows-core", "windows-link 0.1.3", "windows-threading", ] @@ -7239,7 +7210,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ - "windows-core 0.61.2", + "windows-core", "windows-link 0.1.3", ] @@ -7252,15 +7223,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-result" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" -dependencies = [ - "windows-link 0.2.1", -] - [[package]] name = "windows-strings" version = "0.4.2" @@ -7270,15 +7232,6 @@ dependencies = [ "windows-link 0.1.3", ] -[[package]] -name = "windows-strings" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" -dependencies = [ - "windows-link 0.2.1", -] - [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index e5acbd20224a..8b9da4e8eb16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -263,3 +263,20 @@ incremental = false inherits = "release" debug = true strip = false + + +## Temporary arrow-rs patch until 57.1.0 is released + +[patch.crates-io] +arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-array = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-buffer = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-cast = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-data = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-ipc = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-schema = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-select = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-string = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-ord = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } +parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/prepare_57.1.0" } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 09fa8ef15af8..de666fced7e6 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -592,9 +592,9 @@ mod tests { +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ - | alltypes_plain.parquet | 1851 | 6957 | 2 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 267014 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 996 | 2 | page_index=false | + | alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false | + | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | + | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); @@ -623,9 +623,9 @@ mod tests { +-----------------------------------+-----------------+---------------------+------+------------------+ | filename | file_size_bytes | metadata_size_bytes | hits | extra | +-----------------------------------+-----------------+---------------------+------+------------------+ - | alltypes_plain.parquet | 1851 | 6957 | 5 | page_index=false | - | alltypes_tiny_pages.parquet | 454233 | 267014 | 2 | page_index=true | - | lz4_raw_compressed_larger.parquet | 380836 | 996 | 3 | page_index=false | + | alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false | + | alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true | + | lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false | +-----------------------------------+-----------------+---------------------+------+------------------+ "); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 212db653f713..80b5a80cff78 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,13 +692,19 @@ config_namespace! { /// (reading) If true, filter expressions are be applied during the parquet decoding operation to /// reduce the number of rows decoded. This optimization is sometimes called "late materialization". - pub pushdown_filters: bool, default = false + pub pushdown_filters: bool, default = true /// (reading) If true, filter expressions evaluated during the parquet decoding operation /// will be reordered heuristically to minimize the cost of evaluation. If false, /// the filters are applied in the same order as written in the query pub reorder_filters: bool, default = false + /// (reading) Force the use of RowSelections for filter results, when + /// pushdown_filters is enabled. If false, the reader will automatically + /// choose between a RowSelection and a Bitmap based on the number and + /// pattern of selected rows. + pub force_filter_selections: bool, default = false + /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, /// and `Binary/BinaryLarge` with `BinaryView`. pub schema_force_view_types: bool, default = true diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 564929c61bab..96dff928d8b3 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -200,6 +200,7 @@ impl ParquetOptions { metadata_size_hint: _, pushdown_filters: _, reorder_filters: _, + force_filter_selections: _, // not used for writer props allow_single_file_parallelism: _, maximum_parallel_row_group_writers: _, maximum_buffered_record_batches_per_stream: _, @@ -464,6 +465,7 @@ mod tests { metadata_size_hint: defaults.metadata_size_hint, pushdown_filters: defaults.pushdown_filters, reorder_filters: defaults.reorder_filters, + force_filter_selections: defaults.force_filter_selections, allow_single_file_parallelism: defaults.allow_single_file_parallelism, maximum_parallel_row_group_writers: defaults .maximum_parallel_row_group_writers, @@ -577,6 +579,7 @@ mod tests { metadata_size_hint: global_options_defaults.metadata_size_hint, pushdown_filters: global_options_defaults.pushdown_filters, reorder_filters: global_options_defaults.reorder_filters, + force_filter_selections: global_options_defaults.force_filter_selections, allow_single_file_parallelism: global_options_defaults .allow_single_file_parallelism, maximum_parallel_row_group_writers: global_options_defaults diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2ae5aed30df9..264738b7d57b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -3106,7 +3106,7 @@ mod tests { assert_contains!( &e, - r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": Boolean), Utf8]"# + r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": non-null Boolean), Utf8]"# ); Ok(()) diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index 265862ff9af8..56cdd78d7051 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -313,10 +313,10 @@ async fn test_fn_arrow_typeof() -> Result<()> { +----------------------+ | arrow_typeof(test.l) | +----------------------+ - | List(nullable Int32) | - | List(nullable Int32) | - | List(nullable Int32) | - | List(nullable Int32) | + | List(Int32) | + | List(Int32) | + | List(Int32) | + | List(Int32) | +----------------------+ "); diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 966f25161397..1d64669fadd9 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -636,6 +636,27 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> { config.options_mut().execution.parquet.pushdown_filters = true; let ctx = SessionContext::new_with_config(config); // The cache is on by default, and used when filter pushdown is enabled + PredicateCacheTest { + expected_inner_records: 8, + expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied + } + .run(&ctx) + .await +} + +#[tokio::test] +async fn predicate_cache_pushdown_default_selections_only( +) -> datafusion_common::Result<()> { + let mut config = SessionConfig::new(); + config.options_mut().execution.parquet.pushdown_filters = true; + // forcing filter selections minimizes the number of rows read from the cache + config + .options_mut() + .execution + .parquet + .force_filter_selections = true; + let ctx = SessionContext::new_with_config(config); + // The cache is on by default, and used when filter pushdown is enabled PredicateCacheTest { expected_inner_records: 8, expected_records: 4, diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index 84899137e50a..5a51451461ed 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -222,10 +222,10 @@ async fn test_parameter_invalid_types() -> Result<()> { .await; assert_snapshot!(results.unwrap_err().strip_backtrace(), @r" - type_coercion - caused by - Error during planning: Cannot infer common argument type for comparison operation List(nullable Int32) = Int32 - "); + type_coercion + caused by + Error during planning: Cannot infer common argument type for comparison operation List(Int32) = Int32 + "); Ok(()) } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3c905d950a96..83235dafdaf8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -53,7 +53,9 @@ use futures::{ready, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use parquet::arrow::arrow_reader::{ + ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, +}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; @@ -87,6 +89,8 @@ pub(super) struct ParquetOpener { pub pushdown_filters: bool, /// Should the filters be reordered to optimize the scan? pub reorder_filters: bool, + /// Should we force the reader to use RowSelections for filtering + pub force_filter_selections: bool, /// Should the page index be read from parquet files, if present, to skip /// data pages pub enable_page_index: bool, @@ -147,6 +151,7 @@ impl FileOpener for ParquetOpener { let partition_fields = self.partition_fields.clone(); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; + let force_filter_selections = self.force_filter_selections; let coerce_int96 = self.coerce_int96; let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; @@ -347,6 +352,10 @@ impl FileOpener for ParquetOpener { } }; }; + if force_filter_selections { + builder = + builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } // Determine which row groups to actually read. The idea is to skip // as many row groups as possible based on the metadata and query @@ -887,6 +896,7 @@ mod test { partition_fields: vec![], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -960,6 +970,7 @@ mod test { ))], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1049,6 +1060,7 @@ mod test { ))], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1141,6 +1153,7 @@ mod test { ))], pushdown_filters: true, // note that this is true! reorder_filters: true, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1233,6 +1246,7 @@ mod test { ))], pushdown_filters: false, // note that this is false! reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), @@ -1383,6 +1397,7 @@ mod test { partition_fields: vec![], pushdown_filters: true, reorder_filters: false, + force_filter_selections: false, enable_page_index: false, enable_bloom_filter: false, schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index da7bc125d2f6..30c90f02d077 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -404,6 +404,11 @@ impl ParquetSource { self.table_parquet_options.global.reorder_filters } + /// Return the value of [`datafusion_common::config::ParquetOptions::force_filter_selections`] + fn force_filter_selections(&self) -> bool { + self.table_parquet_options.global.force_filter_selections + } + /// If enabled, the reader will read the page index /// This is used to optimize filter pushdown /// via `RowSelector` and `RowFilter` by @@ -591,6 +596,7 @@ impl FileSource for ParquetSource { parquet_file_reader_factory, pushdown_filters: self.pushdown_filters(), reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 4fb0f8553b4b..a557d3356dba 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -2465,7 +2465,7 @@ mod test { assert_analyzed_plan_eq!( plan, @r#" - Projection: a = CAST(CAST(a AS Map("key_value": Struct("key": Utf8, "value": nullable Float64), unsorted)) AS Map("entries": Struct("key": Utf8, "value": nullable Float64), unsorted)) + Projection: a = CAST(CAST(a AS Map("key_value": non-null Struct("key": non-null Utf8, "value": Float64), unsorted)) AS Map("entries": non-null Struct("key": non-null Utf8, "value": Float64), unsorted)) EmptyRelation: rows=0 "# ) diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 267953556b16..15c82e948c95 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -519,6 +519,7 @@ message ParquetOptions { bool skip_metadata = 3; // default = true bool pushdown_filters = 5; // default = false bool reorder_filters = 6; // default = false + bool force_filter_selections = 34; // default = false uint64 data_pagesize_limit = 7; // default = 1024 * 1024 uint64 write_batch_size = 8; // default = 1024 string writer_version = 9; // default = "1.0" diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 4ede5b970eae..8187f956813e 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -943,6 +943,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { .unwrap_or(None), pushdown_filters: value.pushdown_filters, reorder_filters: value.reorder_filters, + force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as usize, write_batch_size: value.write_batch_size as usize, writer_version: value.writer_version.clone(), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e63f345459b8..66659ad14cbb 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5557,6 +5557,9 @@ impl serde::Serialize for ParquetOptions { if self.reorder_filters { len += 1; } + if self.force_filter_selections { + len += 1; + } if self.data_pagesize_limit != 0 { len += 1; } @@ -5651,6 +5654,9 @@ impl serde::Serialize for ParquetOptions { if self.reorder_filters { struct_ser.serialize_field("reorderFilters", &self.reorder_filters)?; } + if self.force_filter_selections { + struct_ser.serialize_field("forceFilterSelections", &self.force_filter_selections)?; + } if self.data_pagesize_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5816,6 +5822,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "pushdownFilters", "reorder_filters", "reorderFilters", + "force_filter_selections", + "forceFilterSelections", "data_pagesize_limit", "dataPagesizeLimit", "write_batch_size", @@ -5875,6 +5883,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SkipMetadata, PushdownFilters, ReorderFilters, + ForceFilterSelections, DataPagesizeLimit, WriteBatchSize, WriterVersion, @@ -5927,6 +5936,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata), "pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters), "reorderFilters" | "reorder_filters" => Ok(GeneratedField::ReorderFilters), + "forceFilterSelections" | "force_filter_selections" => Ok(GeneratedField::ForceFilterSelections), "dataPagesizeLimit" | "data_pagesize_limit" => Ok(GeneratedField::DataPagesizeLimit), "writeBatchSize" | "write_batch_size" => Ok(GeneratedField::WriteBatchSize), "writerVersion" | "writer_version" => Ok(GeneratedField::WriterVersion), @@ -5977,6 +5987,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut skip_metadata__ = None; let mut pushdown_filters__ = None; let mut reorder_filters__ = None; + let mut force_filter_selections__ = None; let mut data_pagesize_limit__ = None; let mut write_batch_size__ = None; let mut writer_version__ = None; @@ -6035,6 +6046,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } reorder_filters__ = Some(map_.next_value()?); } + GeneratedField::ForceFilterSelections => { + if force_filter_selections__.is_some() { + return Err(serde::de::Error::duplicate_field("forceFilterSelections")); + } + force_filter_selections__ = Some(map_.next_value()?); + } GeneratedField::DataPagesizeLimit => { if data_pagesize_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dataPagesizeLimit")); @@ -6213,6 +6230,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { skip_metadata: skip_metadata__.unwrap_or_default(), pushdown_filters: pushdown_filters__.unwrap_or_default(), reorder_filters: reorder_filters__.unwrap_or_default(), + force_filter_selections: force_filter_selections__.unwrap_or_default(), data_pagesize_limit: data_pagesize_limit__.unwrap_or_default(), write_batch_size: write_batch_size__.unwrap_or_default(), writer_version: writer_version__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa7c3d51a9d6..eaeed5276b24 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "6")] pub reorder_filters: bool, + /// default = false + #[prost(bool, tag = "34")] + pub force_filter_selections: bool, /// default = 1024 * 1024 #[prost(uint64, tag = "7")] pub data_pagesize_limit: u64, diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e9de1d9e9a9e..93bca2bcb6b2 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -856,6 +856,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { metadata_size_hint_opt: value.metadata_size_hint.map(|v| protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v as u64)), pushdown_filters: value.pushdown_filters, reorder_filters: value.reorder_filters, + force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as u64, write_batch_size: value.write_batch_size as u64, writer_version: value.writer_version.clone(), diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa7c3d51a9d6..eaeed5276b24 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "6")] pub reorder_filters: bool, + /// default = false + #[prost(bool, tag = "34")] + pub force_filter_selections: bool, /// default = 1024 * 1024 #[prost(uint64, tag = "7")] pub data_pagesize_limit: u64, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d32bfb22ffdd..20b3c6bb7aef 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -375,6 +375,7 @@ mod parquet { }), pushdown_filters: global_options.global.pushdown_filters, reorder_filters: global_options.global.reorder_filters, + force_filter_selections: global_options.global.force_filter_selections, data_pagesize_limit: global_options.global.data_pagesize_limit as u64, write_batch_size: global_options.global.write_batch_size as u64, writer_version: global_options.global.writer_version.clone(), @@ -471,6 +472,7 @@ mod parquet { }), pushdown_filters: proto.pushdown_filters, reorder_filters: proto.reorder_filters, + force_filter_selections: proto.force_filter_selections, data_pagesize_limit: proto.data_pagesize_limit as usize, write_batch_size: proto.write_batch_size as usize, writer_version: proto.writer_version.clone(), diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 77197721e1f1..a8f4bb85a1ed 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -710,13 +710,13 @@ select query TTT select arrow_typeof(column1), arrow_typeof(column2), arrow_typeof(column3) from arrays; ---- -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) -List(nullable List(nullable Int64)) List(nullable Float64) List(nullable Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) +List(List(Int64)) List(Float64) List(Utf8) # arrays table query ??? @@ -1182,7 +1182,7 @@ select make_array(make_array(1), arrow_cast(make_array(-1), 'LargeList(Int8)')) query T select arrow_typeof(make_array(make_array(1), arrow_cast(make_array(-1), 'LargeList(Int8)'))); ---- -List(nullable LargeList(nullable Int64)) +List(LargeList(Int64)) query ??? @@ -1978,11 +1978,11 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 0, ---- [1, 2, 3, 4, 5] [h, e, l, l, o] -# TODO: Enable once arrow_cast supports ListView types. +# TODO: Enable once array_slice supports LargeListView types. # Expected output (once supported): # ---- # [1, 2, 3, 4, 5] [h, e, l, l, o] -query error DataFusion error: Execution error: Unsupported type 'ListView\(Int64\)'. Must be a supported arrow type name such as 'Int32' or 'Timestamp\(ns\)'. Error unknown token: ListView +query error Failed to coerce arguments to satisfy a call to 'array_slice' function: select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'ListView(Int64)'), 0, 6), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'ListView(Utf8)'), 0, 5); @@ -2025,14 +2025,15 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 2, ---- [2, 3, 4, 5] [l, l, o] -# TODO: Enable once arrow_cast supports LargeListView types. +# TODO: Enable once array_slice supports LargeListView types. # Expected output (once supported): # ---- # [2, 3, 4, 5] [l, l, o] -query error DataFusion error: Execution error: Unsupported type 'LargeListView\(Int64\)'. Must be a supported arrow type name such as 'Int32' or 'Timestamp\(ns\)'. Error unknown token: LargeListView +query error Failed to coerce arguments to satisfy a call to 'array_slice' function: select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeListView(Int64)'), 2, 6), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeListView(Utf8)'), 3, 7); + # array_slice scalar function #6 (with positive indexes; nested array) query ? select array_slice(make_array(make_array(1, 2, 3, 4, 5), make_array(6, 7, 8, 9, 10)), 1, 1); @@ -3321,7 +3322,7 @@ select array_concat([arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')]), arrow_typeof(array_concat([arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')])); ---- -[1, 2, 3] List(nullable Utf8View) +[1, 2, 3] List(Utf8View) # array_concat error query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support type Int64" @@ -4614,7 +4615,7 @@ NULL [baz] baz query T SELECT arrow_typeof(make_array(arrow_cast('a', 'Utf8View'), 'b', 'c', 'd')); ---- -List(nullable Utf8View) +List(Utf8View) # expect a,b,c,d. make_array forces all types to be of a common type (see above) query T @@ -7708,8 +7709,8 @@ CREATE EXTERNAL TABLE fixed_size_list_array STORED AS PARQUET LOCATION '../core/ query T select arrow_typeof(f0) from fixed_size_list_array; ---- -FixedSizeList(2 x nullable Int64) -FixedSizeList(2 x nullable Int64) +FixedSizeList(2 x Int64) +FixedSizeList(2 x Int64) query ? select * from fixed_size_list_array; @@ -7738,8 +7739,8 @@ select make_array(arrow_cast(f0, 'List(Int64)')) from fixed_size_list_array query T select arrow_typeof(make_array(arrow_cast(f0, 'List(Int64)'))) from fixed_size_list_array ---- -List(nullable List(nullable Int64)) -List(nullable List(nullable Int64)) +List(List(Int64)) +List(List(Int64)) query ? select make_array(f0) from fixed_size_list_array @@ -7750,8 +7751,8 @@ select make_array(f0) from fixed_size_list_array query T select arrow_typeof(make_array(f0)) from fixed_size_list_array ---- -List(nullable FixedSizeList(2 x nullable Int64)) -List(nullable FixedSizeList(2 x nullable Int64)) +List(FixedSizeList(2 x Int64)) +List(FixedSizeList(2 x Int64)) query ? select array_concat(column1, [7]) from arrays_values_v2; @@ -7798,7 +7799,7 @@ select flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5) arrow_typeof(flatten(arrow_cast(make_array([1], [2, 3], [null], make_array(4, null, 5)), 'FixedSizeList(4, LargeList(Int64))'))), arrow_typeof(flatten(arrow_cast(make_array([[1.1], [2.2]], [[3.3], [4.4]]), 'List(LargeList(FixedSizeList(1, Float64)))'))); ---- -[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]] LargeList(nullable Int64) LargeList(nullable FixedSizeList(1 x nullable Float64)) +[1, 2, 3, NULL, 4, NULL, 5] [[1.1], [2.2], [3.3], [4.4]] LargeList(Int64) LargeList(FixedSizeList(1 x Float64)) # flatten with column values query ???? @@ -8338,19 +8339,19 @@ select * from test_create_array_table; query T select arrow_typeof(a) from test_create_array_table; ---- -List(nullable Int32) +List(Int32) query T select arrow_typeof(c) from test_create_array_table; ---- -List(nullable List(nullable Int32)) +List(List(Int32)) # Test casting to array types # issue: https://github.com/apache/datafusion/issues/9440 query ??T select [1,2,3]::int[], [['1']]::int[][], arrow_typeof([]::text[]); ---- -[1, 2, 3] [[1]] List(nullable Utf8View) +[1, 2, 3] [[1]] List(Utf8View) # test empty arrays return length # issue: https://github.com/apache/datafusion/pull/12459 @@ -8370,8 +8371,8 @@ create table fixed_size_col_table (a int[3]) as values ([1,2,3]), ([4,5,6]); query T select arrow_typeof(a) from fixed_size_col_table; ---- -FixedSizeList(3 x nullable Int32) -FixedSizeList(3 x nullable Int32) +FixedSizeList(3 x Int32) +FixedSizeList(3 x Int32) query ? rowsort SELECT DISTINCT a FROM fixed_size_col_table diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 5ba62be6873c..c213f2abf719 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -357,12 +357,12 @@ select arrow_cast(make_array(1, 2, 3), 'List(Int64)'); query T select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'List(Int64)')); ---- -List(nullable Int64) +List(Int64) query T select arrow_typeof(arrow_cast(arrow_cast(make_array([1, 2, 3]), 'LargeList(LargeList(Int64))'), 'List(List(Int64))')); ---- -List(nullable List(nullable Int64)) +List(List(Int64)) ## LargeList @@ -380,12 +380,12 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'); query T select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)')); ---- -LargeList(nullable Int64) +LargeList(Int64) query T select arrow_typeof(arrow_cast(make_array([1, 2, 3]), 'LargeList(LargeList(Int64))')); ---- -LargeList(nullable LargeList(nullable Int64)) +LargeList(LargeList(Int64)) ## FixedSizeList @@ -417,7 +417,7 @@ select arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)'); query T select arrow_typeof(arrow_cast(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'), 'FixedSizeList(3, Int64)')); ---- -FixedSizeList(3 x nullable Int64) +FixedSizeList(3 x Int64) query ? select arrow_cast([1, 2, 3], 'FixedSizeList(3, Int64)'); diff --git a/datafusion/sqllogictest/test_files/coalesce.slt b/datafusion/sqllogictest/test_files/coalesce.slt index e34a601851d7..9e5b71b87129 100644 --- a/datafusion/sqllogictest/test_files/coalesce.slt +++ b/datafusion/sqllogictest/test_files/coalesce.slt @@ -199,14 +199,14 @@ select coalesce(array[1, 2], array[3, 4]), arrow_typeof(coalesce(array[1, 2], array[3, 4])); ---- -[1, 2] List(nullable Int64) +[1, 2] List(Int64) query ?T select coalesce(null, array[3, 4]), arrow_typeof(coalesce(array[1, 2], array[3, 4])); ---- -[3, 4] List(nullable Int64) +[3, 4] List(Int64) # coalesce with array query ?T @@ -214,7 +214,7 @@ select coalesce(array[1, 2], array[arrow_cast(3, 'Int32'), arrow_cast(4, 'Int32')]), arrow_typeof(coalesce(array[1, 2], array[arrow_cast(3, 'Int32'), arrow_cast(4, 'Int32')])); ---- -[1, 2] List(nullable Int64) +[1, 2] List(Int64) # test dict(int32, utf8) statement ok diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index eba527ed2b21..61c319192668 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -244,6 +244,7 @@ datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.force_filter_selections false datafusion.execution.parquet.max_predicate_cache_size NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 @@ -366,6 +367,7 @@ datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionar datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting +datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index fa8e9ad3c537..7563fa2a8708 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -43,8 +43,8 @@ LOCATION '../core/tests/data/parquet_map.parquet'; query TTT describe data; ---- -ints Map("entries": Struct("key": Utf8, "value": Int64), unsorted) NO -strings Map("entries": Struct("key": Utf8, "value": Utf8), unsorted) NO +ints Map("entries": non-null Struct("key": non-null Utf8, "value": non-null Int64), unsorted) NO +strings Map("entries": non-null Struct("key": non-null Utf8, "value": non-null Utf8), unsorted) NO timestamp Utf8View NO query ??T diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index aa87026c5cf3..5f9276bdb78e 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -939,4 +939,4 @@ SELECT t2.a, t2.b, t2.c FROM t2 WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150) ---- -4 101 1001 \ No newline at end of file +4 101 1001 diff --git a/datafusion/sqllogictest/test_files/spark/math/csc.slt b/datafusion/sqllogictest/test_files/spark/math/csc.slt index 5eb9f4447280..837704113da4 100644 --- a/datafusion/sqllogictest/test_files/spark/math/csc.slt +++ b/datafusion/sqllogictest/test_files/spark/math/csc.slt @@ -43,4 +43,4 @@ SELECT csc(a) FROM (VALUES (pi()), (-pi()), (pi()/2) , (arrow_cast('NAN','Float3 8165619676597685 -8165619676597685 1 -NaN \ No newline at end of file +NaN diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index dce5fe036b4e..0989dd382d51 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -53,9 +53,9 @@ select * from struct_values; query TT select arrow_typeof(s1), arrow_typeof(s2) from struct_values; ---- -Struct("c0": nullable Int32) Struct("a": nullable Int32, "b": nullable Utf8View) -Struct("c0": nullable Int32) Struct("a": nullable Int32, "b": nullable Utf8View) -Struct("c0": nullable Int32) Struct("a": nullable Int32, "b": nullable Utf8View) +Struct("c0": Int32) Struct("a": Int32, "b": Utf8View) +Struct("c0": Int32) Struct("a": Int32, "b": Utf8View) +Struct("c0": Int32) Struct("a": Int32, "b": Utf8View) # struct[i] @@ -229,12 +229,12 @@ select named_struct('field_a', 1, 'field_b', 2); query T select arrow_typeof(named_struct('first', 1, 'second', 2, 'third', 3)); ---- -Struct("first": nullable Int64, "second": nullable Int64, "third": nullable Int64) +Struct("first": Int64, "second": Int64, "third": Int64) query T select arrow_typeof({'first': 1, 'second': 2, 'third': 3}); ---- -Struct("first": nullable Int64, "second": nullable Int64, "third": nullable Int64) +Struct("first": Int64, "second": Int64, "third": Int64) # test nested struct literal query ? @@ -413,7 +413,7 @@ create table t(a struct, b struct) as valu query T select arrow_typeof([a, b]) from t; ---- -List(nullable Struct("r": nullable Utf8View, "c": nullable Float32)) +List(Struct("r": Utf8View, "c": Float32)) query ? select [a, b] from t; @@ -464,12 +464,12 @@ select * from t; query T select arrow_typeof(c1) from t; ---- -Struct("r": nullable Utf8View, "b": nullable Int32) +Struct("r": Utf8View, "b": Int32) query T select arrow_typeof(c2) from t; ---- -Struct("r": nullable Utf8View, "b": nullable Float32) +Struct("r": Utf8View, "b": Float32) statement ok drop table t; @@ -486,8 +486,8 @@ select * from t; query T select arrow_typeof(column1) from t; ---- -Struct("r": nullable Utf8, "c": nullable Float64) -Struct("r": nullable Utf8, "c": nullable Float64) +Struct("r": Utf8, "c": Float64) +Struct("r": Utf8, "c": Float64) statement ok drop table t; @@ -519,9 +519,9 @@ select coalesce(s1) from t; query T select arrow_typeof(coalesce(s1, s2)) from t; ---- -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) statement ok drop table t; @@ -546,9 +546,9 @@ select coalesce(s1, s2) from t; query T select arrow_typeof(coalesce(s1, s2)) from t; ---- -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) -Struct("a": nullable Float32, "b": nullable Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) +Struct("a": Float32, "b": Utf8View) statement ok drop table t; @@ -583,7 +583,7 @@ create table t(a struct(r varchar, c int), b struct(r varchar, c float)) as valu query T select arrow_typeof([a, b]) from t; ---- -List(nullable Struct("r": nullable Utf8View, "c": nullable Float32)) +List(Struct("r": Utf8View, "c": Float32)) statement ok drop table t; @@ -606,13 +606,13 @@ create table t(a struct(r varchar, c int, g float), b struct(r varchar, c float, query T select arrow_typeof(a) from t; ---- -Struct("r": nullable Utf8View, "c": nullable Int32, "g": nullable Float32) +Struct("r": Utf8View, "c": Int32, "g": Float32) # type of each column should not coerced but perserve as it is query T select arrow_typeof(b) from t; ---- -Struct("r": nullable Utf8View, "c": nullable Float32, "g": nullable Int32) +Struct("r": Utf8View, "c": Float32, "g": Int32) statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 2e228472d68c..5144e7ddcb20 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5910,7 +5910,7 @@ LIMIT 5 ---- DataFusion error: type_coercion caused by -Error during planning: Cannot infer common argument type for comparison operation Int64 >= List(nullable Null) +Error during planning: Cannot infer common argument type for comparison operation Int64 >= List(Null) @@ -5938,7 +5938,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[c1@2 as c1, c2@3 as c2, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as count1, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as array_agg1, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as array_agg2] 02)--GlobalLimitExec: skip=0, fetch=5 -03)----BoundedWindowAggExec: wdw=[sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(nullable Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(nullable Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +03)----BoundedWindowAggExec: wdw=[sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable List(Int64) }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 04)------SortPreservingMergeExec: [c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], fetch=5 05)--------SortExec: TopK(fetch=5), expr=[c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], preserve_partitioning=[true] 06)----------ProjectionExec: expr=[__common_expr_3@0 as __common_expr_1, __common_expr_3@0 AND c2@2 < 4 AND c1@1 > 0 as __common_expr_2, c1@1 as c1, c2@2 as c2] diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index e116bfffeda6..d2042511589c 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -25,6 +25,28 @@ You can see the current [status of the `52.0.0` release here](https://github.com/apache/datafusion/issues/18566) +### Adaptive filter representation in Parquet filter pushdown + +As of Arrow 57.1.0, DataFusion uses a new adaptive filter strategy when +evaluating pushed down filters for Parquet files. This new strategy improves +performance for certain types of queries where the results of filtering are +more efficiently represented with a bitmask rather than a selection. +See [arrow-rs #5523] for more details. + +This change only applies to the built-in Parquet data source with filter-pushdown enabled ( +which is [not yet the default behavior]). + +You can disable the new behavior by setting the +`datafusion.execution.parquet.force_filter_selections` [configuration setting] to true. + +```sql +> set datafusion.execution.parquet.force_filter_selections = true; +``` + +[arrow-rs #5523]: https://github.com/apache/arrow-rs/issues/5523 +[configuration setting]: https://datafusion.apache.org/user-guide/configs.html +[not yet the default behavior]: https://github.com/apache/datafusion/issues/3463 + ### Statistics handling moved from `FileSource` to `FileScanConfig` Statistics are now managed directly by `FileScanConfig` instead of being delegated to `FileSource` implementations. This simplifies the `FileSource` trait and provides more consistent statistics handling across all file formats. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c3eda544a1de..55708de7c140 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ The following configuration settings are available: | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. |