-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
When pushing down a filter into a FileSource (more specifically - when updating the node in FileSource::try_pushdown_filters, even if there's no filter), it seems like minor changes can result in very different behaviors when interacting with output ordering, sometimes repartitioning even very small files into many partitions.
| Ordering | No Ordering | |
|---|---|---|
| Filter | DataScanExec is partitiond to target_partitions |
DataSourceExec isn't repartitioned, but gets a RepartitionedExec on top |
| No Filter | Not partitioned, one partition per file | No partitioning at all, just DataSourceExec |
I've run into this bug while working on the Vortex FileSource, which had a code path where it will return an updated node with no actual change (for example - when the filters array is empty), which will result in an actual change in the plan (For Parquet - that code path doesn't exist but as far as I can tell that's the main difference)
I'm not 100% sure this is a bug, but it seems like repartitioning small files is potentially pretty wasteful, this also seems somewhat related to #4967, which is why I'm filing it.
To Reproduce
Ordering and filter
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS PARQUET \
WITH ORDER (c1 ASC) \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl WHERE c2 > 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: my_tbl projection=[c1, c2] |
| physical_plan | DataSourceExec: file_groups={1 group: [[Users/adamgs/code/repartition-repro/my_tbl/VSb9EmxEdPYbb6EJ_0.parquet]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], file_type=parquet |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Ordering, no filter
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
WITH ORDER (c1 ASC) \
STORED AS PARQUET \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: my_tbl projection=[c1, c2] |
| physical_plan | DataSourceExec: file_groups={1 group: [[Users/adamgs/code/repartition-repro/my_tbl/9rqWvTvx7fWJYwEY_0.parquet]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], file_type=parquet |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
No ordering, with filter
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS PARQUET \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl WHERE c2 > 10;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: my_tbl.c2 > Int32(10) |
| | TableScan: my_tbl projection=[c1, c2], partial_filters=[my_tbl.c2 > Int32(10)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: c2@1 > 10 |
| | RepartitionExec: partitioning=RoundRobinBatch(14), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/adamgs/code/repartition-repro/my_tbl/uhkDHvDvl1Q3yo5v_0.parquet]]}, projection=[c1, c2], file_type=parquet, predicate=c2@1 > 10, pruning_predicate=c2_null_count@1 != row_count@2 AND c2_max@0 > 10, required_guarantees=[] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
No ordering, no filter
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS PARQUET \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: my_tbl projection=[c1, c2] |
| physical_plan | DataSourceExec: file_groups={1 group: [[Users/adamgs/code/repartition-repro/my_tbl/sQrcFZmANMuthBwu_0.parquet]]}, projection=[c1, c2], file_type=parquet |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+Expected behavior
No response
Additional context
No response