diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 48750e3c995c..f75efaf490d1 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -245,7 +245,7 @@ impl PruningStatistics for PartitionPruningStatistics { match acc { None => Some(Some(eq_result)), Some(acc_array) => { - arrow::compute::kernels::boolean::and(&acc_array, &eq_result) + arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result) .map(Some) .ok() } @@ -560,6 +560,79 @@ mod tests { assert_eq!(partition_stats.num_containers(), 2); } + #[test] + fn test_partition_pruning_statistics_multiple_positive_values() { + let partition_values = vec![ + vec![ScalarValue::from(1i32), ScalarValue::from(2i32)], + vec![ScalarValue::from(3i32), ScalarValue::from(4i32)], + ]; + let partition_fields = vec![ + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("b", DataType::Int32, false)), + ]; + let partition_stats = + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); + + let column_a = Column::new_unqualified("a"); + + let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]); + let contained_a = partition_stats.contained(&column_a, &values).unwrap(); + let expected_contained_a = BooleanArray::from(vec![true, true]); + assert_eq!(contained_a, expected_contained_a); + } + + #[test] + fn test_partition_pruning_statistics_null_in_values() { + let partition_values = vec![ + vec![ + ScalarValue::from(1i32), + ScalarValue::from(2i32), + ScalarValue::from(3i32), + ], + vec![ + ScalarValue::from(4i32), + ScalarValue::from(5i32), + ScalarValue::from(6i32), + ], + ]; + let partition_fields = vec![ + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("b", DataType::Int32, false)), + Arc::new(Field::new("c", DataType::Int32, false)), + ]; + let partition_stats = + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); + + let column_a = Column::new_unqualified("a"); + let column_b = Column::new_unqualified("b"); + let column_c = Column::new_unqualified("c"); + + let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]); + let contained_a = partition_stats.contained(&column_a, &values_a).unwrap(); + let mut builder = BooleanArray::builder(2); + builder.append_value(true); + builder.append_null(); + let expected_contained_a = builder.finish(); + assert_eq!(contained_a, expected_contained_a); + + // First match creates a NULL boolean array + // The accumulator should update the value to true for the second value + let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]); + let contained_b = partition_stats.contained(&column_b, &values_b).unwrap(); + let mut builder = BooleanArray::builder(2); + builder.append_null(); + builder.append_value(true); + let expected_contained_b = builder.finish(); + assert_eq!(contained_b, expected_contained_b); + + // All matches are null, contained should return None + let values_c = HashSet::from([ScalarValue::Int32(None)]); + let contained_c = partition_stats.contained(&column_c, &values_c); + assert!(contained_c.is_none()); + } + #[test] fn test_partition_pruning_statistics_empty() { let partition_values = vec![];