-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
I have a hive partitioned TPC-DS dataset and I'm using a custom table provider where I'm doing some pre-scan partition pruning using PartitionPruningStatistics
A query with a single value in the filter expr works
select ss_list_price
from store_sales
where ss_sold_date_sk = 2451529 limit 10;
But when there are multiple values in the filter expr it fails
select ss_list_price
from store_sales
where ss_sold_date_sk in (2451529, 2452570, 2452596) limit 10;
This part of the code seems to be the problem
datafusion/datafusion/common/src/pruning.rs
Lines 240 to 250 in d24eb4a
| let index = self.partition_schema.index_of(column.name()).ok()?; | |
| let array = self.partition_values.get(index)?; | |
| let boolean_array = values.iter().try_fold(None, |acc, v| { | |
| let arrow_value = v.to_scalar().ok()?; | |
| let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?; | |
| match acc { | |
| None => Some(Some(eq_result)), | |
| Some(acc_array) => { | |
| arrow::compute::kernels::boolean::and(&acc_array, &eq_result) | |
| .map(Some) | |
| .ok() |
I think arrow::compute::kernels::boolean::or should be used here instead of the arrow::compute::kernels::boolean::and
This query works from the datafusion-cli as I suspect that the file statistics prevents the accidental pruning
CREATE EXTERNAL TABLE store_sales
STORED AS PARQUET
LOCATION '/path/to/tpcds_1_delta/store_sales/';
select ss_list_price
from store_sales
where ss_sold_date_sk in (2451529, 2452570, 2452596) limit 10;
To Reproduce
The following unit test will fail in pruning.rs
#[test]
fn test_partition_pruning_statistics_multiple_values() {
let partition_values = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
// Corresponds to
// select * from table where a in (1, 3);
let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
let contained_a = partition_stats.contained(&column_a, &values).unwrap();
let expected_contained_a = BooleanArray::from(vec![true, true]);
assert_eq!(contained_a, expected_contained_a);
}
Expected behavior
The unit test mentioned above should pass
Additional context
I can raise a PR for this, let me know if my analysis was correct and if there is any background context behind why the condition is an AND in the contained function