From 692a7cb677e2fce8d01a96d1360e24610ef0421e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 25 Feb 2026 18:50:00 +0800 Subject: [PATCH 1/4] Add SQL logic tests for filter pushdown scenarios - Implement tests for push down filters in outer joins, ensuring filters are applied correctly based on join conditions. - Introduce tests for push down filters with Parquet files, including scenarios with limits and dynamic filters. - Add regression tests to address specific issues related to filter pushdown, ensuring stability and correctness. - Include tests for unnest operations with filters, verifying that filters are pushed down appropriately based on the context. --- .../test_files/push_down_filter.slt | 745 ------------------ .../push_down_filter_outer_joins.slt | 264 +++++++ .../test_files/push_down_filter_parquet.slt | 188 +++++ .../push_down_filter_regression.slt | 200 +++++ .../test_files/push_down_filter_unnest.slt | 148 ++++ 5 files changed, 800 insertions(+), 745 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/push_down_filter.slt create mode 100644 datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt create mode 100644 datafusion/sqllogictest/test_files/push_down_filter_parquet.slt create mode 100644 datafusion/sqllogictest/test_files/push_down_filter_regression.slt create mode 100644 datafusion/sqllogictest/test_files/push_down_filter_unnest.slt diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt deleted file mode 100644 index edafcfaa543f2..0000000000000 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ /dev/null @@ -1,745 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Test push down filter - -statement ok -set datafusion.explain.physical_plan_only = true; - -statement ok -CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]); - -query I -select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; ----- -3 -4 -5 - -# test push down filter for unnest with filter on non-unnest column -# filter plan is pushed down into projection plan -query TT -explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; ----- -physical_plan -01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2] -02)--UnnestExec -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)] -05)--------FilterExec: column1@0 = 2, projection=[column2@1] -06)----------DataSourceExec: partitions=1, partition_sizes=[1] - -query I -select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3; ----- -4 -5 - -# test push down filter for unnest with filter on unnest column -query TT -explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3; ----- -physical_plan -01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2] -02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------UnnestExec -05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)] -06)----------DataSourceExec: partitions=1, partition_sizes=[1] - -query II -select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2; ----- -4 2 -5 2 - -# Could push the filter (column1 = 2) down below unnest -query TT -explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2; ----- -physical_plan -01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1] -02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 -03)----UnnestExec -04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1] -06)----------FilterExec: column1@0 = 2 -07)------------DataSourceExec: partitions=1, partition_sizes=[1] - -query II -select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; ----- -3 2 -4 2 -5 2 - -# only non-unnest filter in AND clause could be pushed down -query TT -explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; ----- -physical_plan -01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1] -02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2 -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------UnnestExec -05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1] -06)----------DataSourceExec: partitions=1, partition_sizes=[1] - -statement ok -drop table v; - -# test with unnest struct, should not push down filter -statement ok -CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]); - -query I? -select * from (select column1, unnest(column2) as o from d) where o['a'] = 1; ----- -1 {a: 1, b: 2} - -query TT -explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1; ----- -physical_plan -01)ProjectionExec: expr=[column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as o] -02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1, __unnest_placeholder(d.column2,depth=1)@2] -03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)------ProjectionExec: expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as __datafusion_extracted_1, column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as __unnest_placeholder(d.column2,depth=1)] -05)--------UnnestExec -06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as __unnest_placeholder(d.column2)] -07)------------DataSourceExec: partitions=1, partition_sizes=[1] - -statement ok -drop table d; - -statement ok -CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 'b', 4)), (named_struct('a', 5, 'b', 6)); - -query II -select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5; ----- -5 6 - -query TT -explain select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5; ----- -physical_plan -01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5 -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----UnnestExec -04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)] -05)--------DataSourceExec: partitions=1, partition_sizes=[1] - -statement ok -drop table d; - -# Test push down filter with limit for parquet -statement ok -set datafusion.execution.parquet.pushdown_filters = true; - -# this one is also required to make DF skip second file due to "sufficient" amount of rows -statement ok -set datafusion.execution.collect_statistics = true; - -# Create a table as a data source -statement ok -CREATE TABLE src_table ( - part_key INT, - value INT -) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6); - - -# There will be more than 2 records filtered from the table to check that `limit 1` actually applied. -# Setup 3 files, i.e., as many as there are partitions: - -# File 1: -query I -COPY (SELECT * FROM src_table where part_key = 1) -TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet' -STORED AS PARQUET; ----- -3 - -# File 2: -query I -COPY (SELECT * FROM src_table where part_key = 2) -TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet' -STORED AS PARQUET; ----- -4 - -# File 3: -query I -COPY (SELECT * FROM src_table where part_key = 3) -TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet' -STORED AS PARQUET; ----- -3 - -statement ok -CREATE EXTERNAL TABLE test_filter_with_limit -( - part_key INT, - value INT -) -STORED AS PARQUET -LOCATION 'test_files/scratch/push_down_filter/test_filter_with_limit/'; - -query TT -explain select * from test_filter_with_limit where value = 2 limit 1; ----- -physical_plan -01)CoalescePartitionsExec: fetch=1 -02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)] - -query II -select * from test_filter_with_limit where value = 2 limit 1; ----- -2 2 - - -# Tear down test_filter_with_limit table: -statement ok -DROP TABLE test_filter_with_limit; - -# Tear down src_table table: -statement ok -DROP TABLE src_table; - - -query I -COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) -TO 'test_files/scratch/push_down_filter/t.parquet' -STORED AS PARQUET; ----- -10 - -statement ok -CREATE EXTERNAL TABLE t -( - a INT -) -STORED AS PARQUET -LOCATION 'test_files/scratch/push_down_filter/t.parquet'; - - -# The predicate should not have a column cast when the value is a valid i32 -query TT -explain select a from t where a = '100'; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] - -# The predicate should not have a column cast when the value is a valid i32 -query TT -explain select a from t where a != '100'; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)] - -# The predicate should still have the column cast when the value is a NOT valid i32 -query TT -explain select a from t where a = '99999999999'; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999 - -# The predicate should still have the column cast when the value is a NOT valid i32 -query TT -explain select a from t where a = '99.99'; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99 - -# The predicate should still have the column cast when the value is a NOT valid i32 -query TT -explain select a from t where a = ''; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = - -# The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information. -query TT -explain select a from t where cast(a as string) = '100'; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] - -# The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost). -query TT -explain select a from t where CAST(a AS string) = '0123'; ----- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123 - - -# Test dynamic filter pushdown with swapped join inputs (issue #17196) -# Create tables with different sizes to force join input swapping -statement ok -copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; - -statement ok -copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; - -statement ok -create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; - -statement ok -create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; - -# Test that dynamic filter is applied to the correct table after join input swapping -# The small_table should be the build side, large_table should be the probe side with dynamic filter -query TT -explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; ----- -physical_plan -01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet -03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] - -statement ok -drop table small_table; - -statement ok -drop table large_table; - -statement ok -drop table t; - -# Regression test for https://github.com/apache/datafusion/issues/17188 -query I -COPY (select i as k from generate_series(1, 10000000) as t(i)) -TO 'test_files/scratch/push_down_filter/t1.parquet' -STORED AS PARQUET; ----- -10000000 - -query I -COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) -TO 'test_files/scratch/push_down_filter/t2.parquet' -STORED AS PARQUET; ----- -10000000 - -statement ok -create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; - -statement ok -create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; - -# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random -# So we'll run the same query a couple of times just to have more certainty it's fixed -# Sorry about the spam in this slt test... - -query III rowsort -select * -from t1 -join t2 on t1.k = t2.k -where v = 1 or v = 10000000 -order by t1.k, t2.v; ----- -1 1 1 -10000000 10000000 10000000 - -query III rowsort -select * -from t1 -join t2 on t1.k = t2.k -where v = 1 or v = 10000000 -order by t1.k, t2.v; ----- -1 1 1 -10000000 10000000 10000000 - -query III rowsort -select * -from t1 -join t2 on t1.k = t2.k -where v = 1 or v = 10000000 -order by t1.k, t2.v; ----- -1 1 1 -10000000 10000000 10000000 - -query III rowsort -select * -from t1 -join t2 on t1.k = t2.k -where v = 1 or v = 10000000 -order by t1.k, t2.v; ----- -1 1 1 -10000000 10000000 10000000 - -query III rowsort -select * -from t1 -join t2 on t1.k = t2.k -where v = 1 or v = 10000000 -order by t1.k, t2.v; ----- -1 1 1 -10000000 10000000 10000000 - -# Regression test for https://github.com/apache/datafusion/issues/17512 - -query I -COPY ( - SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz, 'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp -) -TO 'test_files/scratch/push_down_filter/17512.parquet' -STORED AS PARQUET; ----- -1 - -statement ok -CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter/17512.parquet'; - -query I -SELECT 1 -FROM ( - SELECT start_timestamp - FROM records - WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz -) AS t -WHERE t.start_timestamp::time < '00:00:01'::time; ----- -1 - -# Test aggregate dynamic filter pushdown -# Note: most of the test coverage lives in `datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs` -# , to compare dynamic filter content easier. Here the tests are simple end-to-end -# exercises. - -statement ok -set datafusion.explain.format = 'indent'; - -statement ok -set datafusion.explain.physical_plan_only = true; - -statement ok -set datafusion.execution.target_partitions = 2; - -statement ok -set datafusion.execution.parquet.pushdown_filters = true; - -statement ok -set datafusion.optimizer.enable_dynamic_filter_pushdown = true; - -statement ok -set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; - -statement ok -create external table agg_dyn_test stored as parquet location '../core/tests/data/test_statistics_per_partition'; - -# Expect dynamic filter available inside data source -query TT -explain select max(id) from agg_dyn_test where id > 1; ----- -physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] -02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] - -query I -select max(id) from agg_dyn_test where id > 1; ----- -4 - -# Expect dynamic filter available inside data source -query TT -explain select max(id) from agg_dyn_test where (id+1) > 1; ----- -physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] -02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ] - -# Expect dynamic filter available inside data source -query TT -explain select max(id), min(id) from agg_dyn_test where id < 10; ----- -physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] -02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] -04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] - -# Dynamic filter should not be available for grouping sets -query TT -explain select max(id) from agg_dyn_test where id < 10 -group by grouping sets ((), (id)) ----- -physical_plan -01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)] -02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[max(agg_dyn_test.id)] -03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), input_partitions=2 -04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)] -05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] - -statement ok -drop table agg_dyn_test; - -statement ok -drop table t1; - -statement ok -drop table t2; - - - -# check LEFT/RIGHT joins with filter pushdown to both relations (when possible) - -statement ok -create table t1(k int, v int); - -statement ok -create table t2(k int, v int); - -statement ok -insert into t1 values - (1, 10), - (2, 20), - (3, 30), - (null, 40), - (50, null), - (null, null); - -statement ok -insert into t2 values - (1, 11), - (2, 21), - (2, 22), - (null, 41), - (51, null), - (null, null); - -statement ok -set datafusion.explain.physical_plan_only = false; - -statement ok -set datafusion.explain.logical_plan_only = true; - - -# left join + filter on join key -> pushed -query TT -explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1; ----- -logical_plan -01)Left Join: t1.k = t2.k -02)--Filter: t1.k > Int32(1) -03)----TableScan: t1 projection=[k, v] -04)--Filter: t2.k > Int32(1) -05)----TableScan: t2 projection=[k, v] - -query IIII rowsort -select * from t1 left join t2 on t1.k = t2.k where t1.k > 1; ----- -2 20 2 21 -2 20 2 22 -3 30 NULL NULL -50 NULL NULL NULL - -# left join + filter on another column -> not pushed -query TT -explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1; ----- -logical_plan -01)Left Join: t1.k = t2.k -02)--Filter: t1.v > Int32(1) -03)----TableScan: t1 projection=[k, v] -04)--TableScan: t2 projection=[k, v] - -query IIII rowsort -select * from t1 left join t2 on t1.k = t2.k where t1.v > 1; ----- -1 10 1 11 -2 20 2 21 -2 20 2 22 -3 30 NULL NULL -NULL 40 NULL NULL - -# left join + or + filter on another column -> not pushed -query TT -explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; ----- -logical_plan -01)Left Join: t1.k = t2.k -02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) -03)----TableScan: t1 projection=[k, v] -04)--TableScan: t2 projection=[k, v] - -query IIII rowsort -select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; ----- -3 30 NULL NULL -50 NULL NULL NULL -NULL 40 NULL NULL - - -# right join + filter on join key -> pushed -query TT -explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1; ----- -logical_plan -01)Inner Join: t1.k = t2.k -02)--Filter: t1.k > Int32(1) -03)----TableScan: t1 projection=[k, v] -04)--Filter: t2.k > Int32(1) -05)----TableScan: t2 projection=[k, v] - -query IIII rowsort -select * from t1 right join t2 on t1.k = t2.k where t1.k > 1; ----- -2 20 2 21 -2 20 2 22 - -# right join + filter on another column -> not pushed -query TT -explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1; ----- -logical_plan -01)Inner Join: t1.k = t2.k -02)--Filter: t1.v > Int32(1) -03)----TableScan: t1 projection=[k, v] -04)--TableScan: t2 projection=[k, v] - -query IIII rowsort -select * from t1 right join t2 on t1.k = t2.k where t1.v > 1; ----- -1 10 1 11 -2 20 2 21 -2 20 2 22 - -# right join + or + filter on another column -> not pushed -query TT -explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; ----- -logical_plan -01)Inner Join: t1.k = t2.k -02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) -03)----TableScan: t1 projection=[k, v] -04)--TableScan: t2 projection=[k, v] - -query IIII rowsort -select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; ----- - - -# left anti join + filter on join key -> pushed -query TT -explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1; ----- -logical_plan -01)LeftAnti Join: t1.k = t2.k -02)--Filter: t1.k > Int32(1) -03)----TableScan: t1 projection=[k, v] -04)--Filter: t2.k > Int32(1) -05)----TableScan: t2 projection=[k] - -query II rowsort -select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1; ----- -3 30 -50 NULL - -# left anti join + filter on another column -> not pushed -query TT -explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1; ----- -logical_plan -01)LeftAnti Join: t1.k = t2.k -02)--Filter: t1.v > Int32(1) -03)----TableScan: t1 projection=[k, v] -04)--TableScan: t2 projection=[k] - -query II rowsort -select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1; ----- -3 30 -NULL 40 - -# left anti join + or + filter on another column -> not pushed -query TT -explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; ----- -logical_plan -01)LeftAnti Join: t1.k = t2.k -02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) -03)----TableScan: t1 projection=[k, v] -04)--TableScan: t2 projection=[k] - -query II rowsort -select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; ----- -3 30 -50 NULL -NULL 40 - - -# right anti join + filter on join key -> pushed -query TT -explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1; ----- -logical_plan -01)RightAnti Join: t1.k = t2.k -02)--Filter: t1.k > Int32(1) -03)----TableScan: t1 projection=[k] -04)--Filter: t2.k > Int32(1) -05)----TableScan: t2 projection=[k, v] - -query II rowsort -select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1; ----- -51 NULL - -# right anti join + filter on another column -> not pushed -query TT -explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1; ----- -logical_plan -01)RightAnti Join: t1.k = t2.k -02)--TableScan: t1 projection=[k] -03)--Filter: t2.v > Int32(1) -04)----TableScan: t2 projection=[k, v] - -query II rowsort -select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1; ----- -NULL 41 - -# right anti join + or + filter on another column -> not pushed -query TT -explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20; ----- -logical_plan -01)RightAnti Join: t1.k = t2.k -02)--TableScan: t1 projection=[k] -03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20) -04)----TableScan: t2 projection=[k, v] - -query II rowsort -select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20; ----- -51 NULL -NULL 41 - - -statement ok -set datafusion.explain.logical_plan_only = false; - -statement ok -drop table t1; - -statement ok -drop table t2; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt b/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt new file mode 100644 index 0000000000000..2e5f7c317fd43 --- /dev/null +++ b/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt @@ -0,0 +1,264 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test push down filter + +# check LEFT/RIGHT joins with filter pushdown to both relations (when possible) + +statement ok +create table t1(k int, v int); + +statement ok +create table t2(k int, v int); + +statement ok +insert into t1 values + (1, 10), + (2, 20), + (3, 30), + (null, 40), + (50, null), + (null, null); + +statement ok +insert into t2 values + (1, 11), + (2, 21), + (2, 22), + (null, 41), + (51, null), + (null, null); + +statement ok +set datafusion.explain.physical_plan_only = false; + +statement ok +set datafusion.explain.logical_plan_only = true; + + +# left join + filter on join key -> pushed +query TT +explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1; +---- +logical_plan +01)Left Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 left join t2 on t1.k = t2.k where t1.k > 1; +---- +2 20 2 21 +2 20 2 22 +3 30 NULL NULL +50 NULL NULL NULL + +# left join + filter on another column -> not pushed +query TT +explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1; +---- +logical_plan +01)Left Join: t1.k = t2.k +02)--Filter: t1.v > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 left join t2 on t1.k = t2.k where t1.v > 1; +---- +1 10 1 11 +2 20 2 21 +2 20 2 22 +3 30 NULL NULL +NULL 40 NULL NULL + +# left join + or + filter on another column -> not pushed +query TT +explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +logical_plan +01)Left Join: t1.k = t2.k +02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +3 30 NULL NULL +50 NULL NULL NULL +NULL 40 NULL NULL + + +# right join + filter on join key -> pushed +query TT +explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1; +---- +logical_plan +01)Inner Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 right join t2 on t1.k = t2.k where t1.k > 1; +---- +2 20 2 21 +2 20 2 22 + +# right join + filter on another column -> not pushed +query TT +explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1; +---- +logical_plan +01)Inner Join: t1.k = t2.k +02)--Filter: t1.v > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 right join t2 on t1.k = t2.k where t1.v > 1; +---- +1 10 1 11 +2 20 2 21 +2 20 2 22 + +# right join + or + filter on another column -> not pushed +query TT +explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +logical_plan +01)Inner Join: t1.k = t2.k +02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k, v] + +query IIII rowsort +select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- + + +# left anti join + filter on join key -> pushed +query TT +explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1; +---- +logical_plan +01)LeftAnti Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k] + +query II rowsort +select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1; +---- +3 30 +50 NULL + +# left anti join + filter on another column -> not pushed +query TT +explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1; +---- +logical_plan +01)LeftAnti Join: t1.k = t2.k +02)--Filter: t1.v > Int32(1) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k] + +query II rowsort +select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1; +---- +3 30 +NULL 40 + +# left anti join + or + filter on another column -> not pushed +query TT +explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +logical_plan +01)LeftAnti Join: t1.k = t2.k +02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20) +03)----TableScan: t1 projection=[k, v] +04)--TableScan: t2 projection=[k] + +query II rowsort +select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20; +---- +3 30 +50 NULL +NULL 40 + + +# right anti join + filter on join key -> pushed +query TT +explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1; +---- +logical_plan +01)RightAnti Join: t1.k = t2.k +02)--Filter: t1.k > Int32(1) +03)----TableScan: t1 projection=[k] +04)--Filter: t2.k > Int32(1) +05)----TableScan: t2 projection=[k, v] + +query II rowsort +select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1; +---- +51 NULL + +# right anti join + filter on another column -> not pushed +query TT +explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1; +---- +logical_plan +01)RightAnti Join: t1.k = t2.k +02)--TableScan: t1 projection=[k] +03)--Filter: t2.v > Int32(1) +04)----TableScan: t2 projection=[k, v] + +query II rowsort +select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1; +---- +NULL 41 + +# right anti join + or + filter on another column -> not pushed +query TT +explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20; +---- +logical_plan +01)RightAnti Join: t1.k = t2.k +02)--TableScan: t1 projection=[k] +03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20) +04)----TableScan: t2 projection=[k, v] + +query II rowsort +select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20; +---- +51 NULL +NULL 41 + + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +drop table t1; + +statement ok +drop table t2; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt new file mode 100644 index 0000000000000..e1c83c8c330d8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -0,0 +1,188 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test push down filter + +statement ok +set datafusion.explain.physical_plan_only = true; + +# Test push down filter with limit for parquet +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +# this one is also required to make DF skip second file due to "sufficient" amount of rows +statement ok +set datafusion.execution.collect_statistics = true; + +# Create a table as a data source +statement ok +CREATE TABLE src_table ( + part_key INT, + value INT +) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4), (3, 5), (3, 6); + + +# There will be more than 2 records filtered from the table to check that `limit 1` actually applied. +# Setup 3 files, i.e., as many as there are partitions: + +# File 1: +query I +COPY (SELECT * FROM src_table where part_key = 1) +TO 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet' +STORED AS PARQUET; +---- +3 + +# File 2: +query I +COPY (SELECT * FROM src_table where part_key = 2) +TO 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet' +STORED AS PARQUET; +---- +4 + +# File 3: +query I +COPY (SELECT * FROM src_table where part_key = 3) +TO 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet' +STORED AS PARQUET; +---- +3 + +statement ok +CREATE EXTERNAL TABLE test_filter_with_limit +( + part_key INT, + value INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/'; + +query TT +explain select * from test_filter_with_limit where value = 2 limit 1; +---- +physical_plan +01)CoalescePartitionsExec: fetch=1 +02)--DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet]]}, projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 = 2, pruning_predicate=value_null_count@2 != row_count@3 AND value_min@0 <= 2 AND 2 <= value_max@1, required_guarantees=[value in (2)] + +query II +select * from test_filter_with_limit where value = 2 limit 1; +---- +2 2 + + +# Tear down test_filter_with_limit table: +statement ok +DROP TABLE test_filter_with_limit; + +# Tear down src_table table: +statement ok +DROP TABLE src_table; + + +query I +COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) +TO 'test_files/scratch/push_down_filter_parquet/t.parquet' +STORED AS PARQUET; +---- +10 + +statement ok +CREATE EXTERNAL TABLE t +( + a INT +) +STORED AS PARQUET +LOCATION 'test_files/scratch/push_down_filter_parquet/t.parquet'; + + +# The predicate should not have a column cast when the value is a valid i32 +query TT +explain select a from t where a = '100'; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] + +# The predicate should not have a column cast when the value is a valid i32 +query TT +explain select a from t where a != '100'; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 != 100, pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 != a_max@1), required_guarantees=[a not in (100)] + +# The predicate should still have the column cast when the value is a NOT valid i32 +query TT +explain select a from t where a = '99999999999'; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999 + +# The predicate should still have the column cast when the value is a NOT valid i32 +query TT +explain select a from t where a = '99.99'; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99 + +# The predicate should still have the column cast when the value is a NOT valid i32 +query TT +explain select a from t where a = ''; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = + +# The predicate should not have a column cast when the operator is = or != and the literal can be round-trip casted without losing information. +query TT +explain select a from t where cast(a as string) = '100'; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=a@0 = 100, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <= a_max@1, required_guarantees=[a in (100)] + +# The predicate should still have the column cast when the literal alters its string representation after round-trip casting (leading zero lost). +query TT +explain select a from t where CAST(a AS string) = '0123'; +---- +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123 + + +# Test dynamic filter pushdown with swapped join inputs (issue #17196) +# Create tables with different sizes to force join input swapping +statement ok +copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter_parquet/small_table.parquet'; + +statement ok +copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter_parquet/large_table.parquet'; + +statement ok +create external table small_table stored as parquet location 'test_files/scratch/push_down_filter_parquet/small_table.parquet'; + +statement ok +create external table large_table stored as parquet location 'test_files/scratch/push_down_filter_parquet/large_table.parquet'; + +# Test that dynamic filter is applied to the correct table after join input swapping +# The small_table should be the build side, large_table should be the probe side with dynamic filter +query TT +explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; +---- +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]}, projection=[k], file_type=parquet +03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] + +statement ok +drop table small_table; + +statement ok +drop table large_table; + +statement ok +drop table t; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt new file mode 100644 index 0000000000000..ca4a30fa96c35 --- /dev/null +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -0,0 +1,200 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test push down filter + +# Regression test for https://github.com/apache/datafusion/issues/17188 +query I +COPY (select i as k from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter_regression/t1.parquet' +STORED AS PARQUET; +---- +10000000 + +query I +COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter_regression/t2.parquet' +STORED AS PARQUET; +---- +10000000 + +statement ok +create external table t1 stored as parquet location 'test_files/scratch/push_down_filter_regression/t1.parquet'; + +statement ok +create external table t2 stored as parquet location 'test_files/scratch/push_down_filter_regression/t2.parquet'; + +# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random +# So we'll run the same query a couple of times just to have more certainty it's fixed +# Sorry about the spam in this slt test... + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +# Regression test for https://github.com/apache/datafusion/issues/17512 + +query I +COPY ( + SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz, 'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp +) +TO 'test_files/scratch/push_down_filter_regression/17512.parquet' +STORED AS PARQUET; +---- +1 + +statement ok +CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION 'test_files/scratch/push_down_filter_regression/17512.parquet'; + +query I +SELECT 1 +FROM ( + SELECT start_timestamp + FROM records + WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz +) AS t +WHERE t.start_timestamp::time < '00:00:01'::time; +---- +1 + +# Test aggregate dynamic filter pushdown +# Note: most of the test coverage lives in `datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs` +# , to compare dynamic filter content easier. Here the tests are simple end-to-end +# exercises. + +statement ok +set datafusion.explain.format = 'indent'; + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +statement ok +set datafusion.optimizer.enable_dynamic_filter_pushdown = true; + +statement ok +set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; + +statement ok +create external table agg_dyn_test stored as parquet location '../core/tests/data/test_statistics_per_partition'; + +# Expect dynamic filter available inside data source +query TT +explain select max(id) from agg_dyn_test where id > 1; +---- +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[] + +query I +select max(id) from agg_dyn_test where id > 1; +---- +4 + +# Expect dynamic filter available inside data source +query TT +explain select max(id) from agg_dyn_test where (id+1) > 1; +---- +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=CAST(id@0 AS Int64) + 1 > 1 AND DynamicFilter [ empty ] + +# Expect dynamic filter available inside data source +query TT +explain select max(id), min(id) from agg_dyn_test where id < 10; +---- +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id), min(agg_dyn_test.id)] +04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] + +# Dynamic filter should not be available for grouping sets +query TT +explain select max(id) from agg_dyn_test where id < 10 +group by grouping sets ((), (id)) +---- +physical_plan +01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[max(agg_dyn_test.id)] +03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2), input_partitions=2 +04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)] +05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[] + +statement ok +drop table agg_dyn_test; + +statement ok +drop table t1; + +statement ok +drop table t2; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt new file mode 100644 index 0000000000000..58fe24e2e2ccd --- /dev/null +++ b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test push down filter + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]); + +query I +select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; +---- +3 +4 +5 + +# test push down filter for unnest with filter on non-unnest column +# filter plan is pushed down into projection plan +query TT +explain select uc2 from (select unnest(column2) as uc2, column1 from v) where column1 = 2; +---- +physical_plan +01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2] +02)--UnnestExec +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)] +05)--------FilterExec: column1@0 = 2, projection=[column2@1] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +query I +select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3; +---- +4 +5 + +# test push down filter for unnest with filter on unnest column +query TT +explain select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3; +---- +physical_plan +01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2] +02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------UnnestExec +05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +query II +select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2; +---- +4 2 +5 2 + +# Could push the filter (column1 = 2) down below unnest +query TT +explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2; +---- +physical_plan +01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1] +02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 +03)----UnnestExec +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1] +06)----------FilterExec: column1@0 = 2 +07)------------DataSourceExec: partitions=1, partition_sizes=[1] + +query II +select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; +---- +3 2 +4 2 +5 2 + +# only non-unnest filter in AND clause could be pushed down +query TT +explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; +---- +physical_plan +01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2, column1@1 as column1] +02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------UnnestExec +05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2), column1@0 as column1] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +drop table v; + +# test with unnest struct, should not push down filter +statement ok +CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b', 2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]); + +query I? +select * from (select column1, unnest(column2) as o from d) where o['a'] = 1; +---- +1 {a: 1, b: 2} + +query TT +explain select * from (select column1, unnest(column2) as o from d) where o['a'] = 1; +---- +physical_plan +01)ProjectionExec: expr=[column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as o] +02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1, __unnest_placeholder(d.column2,depth=1)@2] +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------ProjectionExec: expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as __datafusion_extracted_1, column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as __unnest_placeholder(d.column2,depth=1)] +05)--------UnnestExec +06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as __unnest_placeholder(d.column2)] +07)------------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +drop table d; + +statement ok +CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3, 'b', 4)), (named_struct('a', 5, 'b', 6)); + +query II +select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5; +---- +5 6 + +query TT +explain select * from (select unnest(column1) from d) where "__unnest_placeholder(d.column1).b" > 5; +---- +physical_plan +01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5 +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +03)----UnnestExec +04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +drop table d; From e8369bb38ffca0e4835293b4874391f09269fe5d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 26 Feb 2026 14:59:56 +0800 Subject: [PATCH 2/4] trigger sqllogictest --- .../optimizer/src/simplify_expressions/expr_simplifier.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index c6644e008645a..f9cf4f61f120d 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -2313,6 +2313,8 @@ fn simplify_right_is_one_case( } } +// trigger ci test + #[cfg(test)] mod tests { use super::*; From b2d26352c996289204946a4139874248ee05c510 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Feb 2026 08:55:45 -0500 Subject: [PATCH 3/4] Run log running .slt tests first --- datafusion/sqllogictest/bin/sqllogictests.rs | 47 ++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 463b7b03a760c..01e8d3301e802 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -18,7 +18,9 @@ use clap::{ColorChoice, Parser}; use datafusion::common::instant::Instant; use datafusion::common::utils::get_available_parallelism; -use datafusion::common::{DataFusionError, Result, exec_datafusion_err, exec_err}; +use datafusion::common::{ + DataFusionError, HashMap, Result, exec_datafusion_err, exec_err, +}; use datafusion_sqllogictest::{ CurrentlyExecutingSqlTracker, DataFusion, DataFusionSubstraitRoundTrip, Filter, TestContext, df_value_validator, read_dir_recursive, setup_scratch_dir, @@ -47,8 +49,8 @@ use std::fs; use std::io::{IsTerminal, stderr, stdout}; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, LazyLock}; #[cfg(feature = "postgres")] mod postgres_container; @@ -59,6 +61,31 @@ const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; const SQLITE_PREFIX: &str = "sqlite"; const ERRS_PER_FILE_LIMIT: usize = 10; +/// TEST PRIORITY +/// +/// Heuristically prioritize some test to run earlier. +/// +/// Prioritizes test to run earlier if they are known to be long running (as +/// each test file itself is run sequentially, but multiple test files are run +/// in parallel. +/// +/// Tests not listed here will run after the listed tests in an arbitrary order. +static TEST_PRIORITY: LazyLock> = LazyLock::new(|| { + [ + (PathBuf::from("push_down_filter_regression.slt"), 0), // longest running, so run first. + (PathBuf::from(" aggregate.slt"), 1), + (PathBuf::from(" joins.slt"), 2), + (PathBuf::from("imdb.slt"), 3), + (PathBuf::from("array.slt"), 4), + ] + .into_iter() + .collect() +}); + +/// Default priority for tests not in the TEST_PRIORITY map. Tests with lower +/// priority values run first. +static DEFAULT_PRIORITY: usize = 100; + pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -739,7 +766,21 @@ fn read_test_files(options: &Options) -> Result> { paths.append(&mut sqlite_paths) } - Ok(paths) + Ok(sort_tests(paths)) +} + +/// Sort the tests heuristically by order of "priority" +/// +/// Prioritizes test to run earlier if they are known to be long running (as +/// each test file itself is run sequentially, but multiple test files are run +/// in parallel. +fn sort_tests(mut tests: Vec) -> Vec { + tests.sort_by_key(|f| { + TEST_PRIORITY + .get(&f.relative_path) + .unwrap_or(&DEFAULT_PRIORITY) + }); + tests } /// Parsed command line options From acce9a4c7e373643bfb5570648e23eed895a4cd6 Mon Sep 17 00:00:00 2001 From: Tim-53 <82676248+Tim-53@users.noreply.github.com> Date: Thu, 26 Feb 2026 23:51:48 +0100 Subject: [PATCH 4/4] reuse parquet file in push_down_filter_regression test --- .../test_files/push_down_filter_regression.slt | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt index ca4a30fa96c35..8459fcc682485 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt @@ -18,13 +18,6 @@ # Test push down filter # Regression test for https://github.com/apache/datafusion/issues/17188 -query I -COPY (select i as k from generate_series(1, 10000000) as t(i)) -TO 'test_files/scratch/push_down_filter_regression/t1.parquet' -STORED AS PARQUET; ----- -10000000 - query I COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) TO 'test_files/scratch/push_down_filter_regression/t2.parquet' @@ -33,10 +26,10 @@ STORED AS PARQUET; 10000000 statement ok -create external table t1 stored as parquet location 'test_files/scratch/push_down_filter_regression/t1.parquet'; +create external table t2 stored as parquet location 'test_files/scratch/push_down_filter_regression/t2.parquet'; statement ok -create external table t2 stored as parquet location 'test_files/scratch/push_down_filter_regression/t2.parquet'; +create external table t1 (k bigint not null) stored as parquet location 'test_files/scratch/push_down_filter_regression/t2.parquet'; # The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random # So we'll run the same query a couple of times just to have more certainty it's fixed