Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,21 @@ impl EmbeddedProjection for FilterExec {
}
}

/// Converts an interval bound to a [`Precision`] value. NULL bounds (which
/// represent "unbounded" in the [`Interval`] type) map to [`Precision::Absent`].
fn interval_bound_to_precision(
bound: ScalarValue,
is_exact: bool,
) -> Precision<ScalarValue> {
if bound.is_null() {
Precision::Absent
} else if is_exact {
Precision::Exact(bound)
} else {
Precision::Inexact(bound)
}
}

/// This function ensures that all bounds in the `ExprBoundaries` vector are
/// converted to closed bounds. If a lower/upper bound is initially open, it
/// is adjusted by using the next/previous value for its data type to convert
Expand Down Expand Up @@ -771,11 +786,9 @@ fn collect_new_statistics(
};
};
let (lower, upper) = interval.into_bounds();
let (min_value, max_value) = if lower.eq(&upper) {
(Precision::Exact(lower), Precision::Exact(upper))
} else {
(Precision::Inexact(lower), Precision::Inexact(upper))
};
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
let min_value = interval_bound_to_precision(lower, is_exact);
let max_value = interval_bound_to_precision(upper, is_exact);
ColumnStatistics {
null_count: input_column_stats[idx].null_count.to_inexact(),
max_value,
Expand Down Expand Up @@ -2053,4 +2066,40 @@ mod tests {

Ok(())
}

/// Columns with Absent min/max statistics should remain Absent after
/// FilterExec.
#[tokio::test]
async fn test_filter_statistics_absent_columns_stay_absent() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics::default(),
ColumnStatistics::default(),
],
},
schema.clone(),
));

let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);

let statistics = filter.partition_statistics(None)?;
let col_b_stats = &statistics.column_statistics[1];
assert_eq!(col_b_stats.min_value, Precision::Absent);
assert_eq!(col_b_stats.max_value, Precision::Absent);

Ok(())
}
}