From d03f31af6b7f26a8fa117d0c485a1defb371e90f Mon Sep 17 00:00:00 2001 From: Nimalan Date: Tue, 25 Nov 2025 10:47:09 +0530 Subject: [PATCH 1/2] fix: partition pruning stats incorrect result when multiple values are present Signed-off-by: Nimalan --- datafusion/common/src/pruning.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 48750e3c995c..503204a7086f 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -245,7 +245,7 @@ impl PruningStatistics for PartitionPruningStatistics { match acc { None => Some(Some(eq_result)), Some(acc_array) => { - arrow::compute::kernels::boolean::and(&acc_array, &eq_result) + arrow::compute::kernels::boolean::or(&acc_array, &eq_result) .map(Some) .ok() } @@ -560,6 +560,28 @@ mod tests { assert_eq!(partition_stats.num_containers(), 2); } + #[test] + fn test_partition_pruning_statistics_multiple_values() { + let partition_values = vec![ + vec![ScalarValue::from(1i32), ScalarValue::from(2i32)], + vec![ScalarValue::from(3i32), ScalarValue::from(4i32)], + ]; + let partition_fields = vec![ + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("b", DataType::Int32, false)), + ]; + let partition_stats = + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); + + let column_a = Column::new_unqualified("a"); + + let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]); + let contained_a = partition_stats.contained(&column_a, &values).unwrap(); + let expected_contained_a = BooleanArray::from(vec![true, true]); + assert_eq!(contained_a, expected_contained_a); + } + #[test] fn test_partition_pruning_statistics_empty() { let partition_values = vec![]; From 07773511f653f343fdebd3044d260b16b9a8407b Mon Sep 17 00:00:00 2001 From: Nimalan Date: Wed, 26 Nov 2025 11:14:44 +0530 Subject: [PATCH 2/2] fix: Use or_kleene, add test case when value is null --- datafusion/common/src/pruning.rs | 55 ++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 503204a7086f..f75efaf490d1 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -245,7 +245,7 @@ impl PruningStatistics for PartitionPruningStatistics { match acc { None => Some(Some(eq_result)), Some(acc_array) => { - arrow::compute::kernels::boolean::or(&acc_array, &eq_result) + arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result) .map(Some) .ok() } @@ -561,7 +561,7 @@ mod tests { } #[test] - fn test_partition_pruning_statistics_multiple_values() { + fn test_partition_pruning_statistics_multiple_positive_values() { let partition_values = vec![ vec![ScalarValue::from(1i32), ScalarValue::from(2i32)], vec![ScalarValue::from(3i32), ScalarValue::from(4i32)], @@ -582,6 +582,57 @@ mod tests { assert_eq!(contained_a, expected_contained_a); } + #[test] + fn test_partition_pruning_statistics_null_in_values() { + let partition_values = vec![ + vec![ + ScalarValue::from(1i32), + ScalarValue::from(2i32), + ScalarValue::from(3i32), + ], + vec![ + ScalarValue::from(4i32), + ScalarValue::from(5i32), + ScalarValue::from(6i32), + ], + ]; + let partition_fields = vec![ + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("b", DataType::Int32, false)), + Arc::new(Field::new("c", DataType::Int32, false)), + ]; + let partition_stats = + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); + + let column_a = Column::new_unqualified("a"); + let column_b = Column::new_unqualified("b"); + let column_c = Column::new_unqualified("c"); + + let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]); + let contained_a = partition_stats.contained(&column_a, &values_a).unwrap(); + let mut builder = BooleanArray::builder(2); + builder.append_value(true); + builder.append_null(); + let expected_contained_a = builder.finish(); + assert_eq!(contained_a, expected_contained_a); + + // First match creates a NULL boolean array + // The accumulator should update the value to true for the second value + let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]); + let contained_b = partition_stats.contained(&column_b, &values_b).unwrap(); + let mut builder = BooleanArray::builder(2); + builder.append_null(); + builder.append_value(true); + let expected_contained_b = builder.finish(); + assert_eq!(contained_b, expected_contained_b); + + // All matches are null, contained should return None + let values_c = HashSet::from([ScalarValue::Int32(None)]); + let contained_c = partition_stats.contained(&column_c, &values_c); + assert!(contained_c.is_none()); + } + #[test] fn test_partition_pruning_statistics_empty() { let partition_values = vec![];