Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl PruningStatistics for PartitionPruningStatistics {
match acc {
None => Some(Some(eq_result)),
Some(acc_array) => {
arrow::compute::kernels::boolean::and(&acc_array, &eq_result)
arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result)
.map(Some)
.ok()
}
Expand Down Expand Up @@ -560,6 +560,79 @@ mod tests {
assert_eq!(partition_stats.num_containers(), 2);
}

#[test]
fn test_partition_pruning_statistics_multiple_positive_values() {
let partition_values = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();

let column_a = Column::new_unqualified("a");

let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
let contained_a = partition_stats.contained(&column_a, &values).unwrap();
let expected_contained_a = BooleanArray::from(vec![true, true]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stats:

a b
1 2
3 4

Running contained on a with values = [1, 3] returns [true, true].

I wonder if we need some cases that return false, or None as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The earlier case covers false. I've added a test cases for the null scenario, changed the or to a or_kleene so it works for null

assert_eq!(contained_a, expected_contained_a);
}

#[test]
fn test_partition_pruning_statistics_null_in_values() {
let partition_values = vec![
vec![
ScalarValue::from(1i32),
ScalarValue::from(2i32),
ScalarValue::from(3i32),
],
vec![
ScalarValue::from(4i32),
ScalarValue::from(5i32),
ScalarValue::from(6i32),
],
];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
Arc::new(Field::new("c", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();

let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
let column_c = Column::new_unqualified("c");

let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]);
let contained_a = partition_stats.contained(&column_a, &values_a).unwrap();
let mut builder = BooleanArray::builder(2);
builder.append_value(true);
builder.append_null();
let expected_contained_a = builder.finish();
assert_eq!(contained_a, expected_contained_a);

// First match creates a NULL boolean array
// The accumulator should update the value to true for the second value
let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]);
let contained_b = partition_stats.contained(&column_b, &values_b).unwrap();
let mut builder = BooleanArray::builder(2);
builder.append_null();
builder.append_value(true);
let expected_contained_b = builder.finish();
assert_eq!(contained_b, expected_contained_b);

// All matches are null, contained should return None
let values_c = HashSet::from([ScalarValue::Int32(None)]);
let contained_c = partition_stats.contained(&column_c, &values_c);
assert!(contained_c.is_none());
}

#[test]
fn test_partition_pruning_statistics_empty() {
let partition_values = vec![];
Expand Down