Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use super::{
};
use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
Column, DisplayFormatType, ExecutionPlan,
DisplayFormatType, ExecutionPlan,
};

use arrow::compute::filter_record_batch;
Expand Down Expand Up @@ -192,9 +192,7 @@ impl FilterExec {
let mut eq_properties = input.equivalence_properties().clone();
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
for (lhs, rhs) in equal_pairs {
let lhs_expr = Arc::new(lhs.clone()) as _;
let rhs_expr = Arc::new(rhs.clone()) as _;
eq_properties.add_equal_conditions(&lhs_expr, &rhs_expr)
eq_properties.add_equal_conditions(lhs, rhs)
}
// Add the columns that have only one viable value (singleton) after
// filtering to constants.
Expand Down Expand Up @@ -401,34 +399,33 @@ impl RecordBatchStream for FilterExecStream {

/// Return the equals Column-Pairs and Non-equals Column-Pairs
fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
let mut eq_predicate_columns = Vec::<(&Column, &Column)>::new();
let mut ne_predicate_columns = Vec::<(&Column, &Column)>::new();
let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();

let predicates = split_conjunction(predicate);
predicates.into_iter().for_each(|p| {
if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
if let (Some(left_column), Some(right_column)) = (
binary.left().as_any().downcast_ref::<Column>(),
binary.right().as_any().downcast_ref::<Column>(),
) {
match binary.op() {
Operator::Eq => {
eq_predicate_columns.push((left_column, right_column))
}
Operator::NotEq => {
ne_predicate_columns.push((left_column, right_column))
}
_ => {}
match binary.op() {
Operator::Eq => {
eq_predicate_columns.push((binary.left(), binary.right()))
}
Operator::NotEq => {
ne_predicate_columns.push((binary.left(), binary.right()))
}
_ => {}
}
}
});

(eq_predicate_columns, ne_predicate_columns)
}

/// Pair of `Arc<dyn PhysicalExpr>`s
pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);

/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates
pub type EqualAndNonEqual<'a> =
(Vec<(&'a Column, &'a Column)>, Vec<(&'a Column, &'a Column)>);
(Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -478,14 +475,16 @@ mod tests {
)?;

let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
assert_eq!(2, equal_pairs.len());
assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
assert!(equal_pairs[0].1.eq(&lit(4u32)));

assert_eq!(1, equal_pairs.len());
assert_eq!(equal_pairs[0].0.name(), "c2");
assert_eq!(equal_pairs[0].1.name(), "c9");
assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));

assert_eq!(1, ne_pairs.len());
assert_eq!(ne_pairs[0].0.name(), "c1");
assert_eq!(ne_pairs[0].1.name(), "c13");
assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::DataPtr;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};
Expand Down
21 changes: 21 additions & 0 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,27 @@ AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[COUNT(*)]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2], has_header=true

# FilterExec can track equality of non-column expressions.
# plan below shouldn't have a SortExec because given column 'a' is ordered.
# 'CAST(ROUND(b) as INT)' is also ordered. After filter is applied.
query TT
EXPLAIN SELECT *
FROM annotated_data_finite2
WHERE CAST(ROUND(b) as INT) = a
ORDER BY CAST(ROUND(b) as INT);
----
logical_plan
Sort: CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) ASC NULLS LAST
--Filter: CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) = annotated_data_finite2.a
----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) = annotated_data_finite2.a]
physical_plan
SortPreservingMergeExec: [CAST(round(CAST(b@2 AS Float64)) AS Int32) ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: CAST(round(CAST(b@2 AS Float64)) AS Int32) = a@1
------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true


statement ok
drop table annotated_data_finite2;

Expand Down