Skip to content

Commit

Permalink
fix(planner): fix push down filter through eval scalar (datafuselabs#…
Browse files Browse the repository at this point in the history
…13232)

fix push down filter through eval scalar
  • Loading branch information
leiysky authored and andylokandy committed Nov 27, 2023
1 parent e4f39b7 commit 14e3b1e
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use common_exception::Result;
use crate::optimizer::rule::Rule;
use crate::optimizer::rule::RuleID;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::ColumnSet;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::plans::AggregateFunction;
Expand All @@ -43,7 +42,7 @@ use crate::MetadataRef;
pub struct RulePushDownFilterEvalScalar {
id: RuleID,
patterns: Vec<SExpr>,
metadata: MetadataRef,
_metadata: MetadataRef,
}

impl RulePushDownFilterEvalScalar {
Expand Down Expand Up @@ -77,7 +76,7 @@ impl RulePushDownFilterEvalScalar {
))),
)),
)],
metadata,
_metadata: metadata,
}
}

Expand Down Expand Up @@ -249,61 +248,47 @@ impl Rule for RulePushDownFilterEvalScalar {
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let mut filter: Filter = s_expr.plan().clone().try_into()?;

let mut used_columns = ColumnSet::new();
for pred in filter.predicates.iter() {
used_columns = used_columns.union(&pred.used_columns()).cloned().collect();
}

let input = s_expr.child(0)?;
let filter: Filter = s_expr.plan().clone().try_into()?;
let eval_scalar: EvalScalar = s_expr.child(0)?.plan().clone().try_into()?;

let rel_expr = RelExpr::with_s_expr(input);
let eval_scalar_child_prop = rel_expr.derive_relational_prop_child(0)?;

let scalar_rel_expr = RelExpr::with_s_expr(s_expr);
let eval_scalar_prop = scalar_rel_expr.derive_relational_prop_child(0)?;

let metadata = self.metadata.read();
let table_entries = metadata.tables();
let is_source_of_view = table_entries.iter().any(|t| t.is_source_of_view());
let mut remaining_predicates = vec![];
let mut pushed_down_predicates = vec![];

// Replacing `DerivedColumn` in `Filter` with the column expression defined in the view.
// This allows us to eliminate the `EvalScalar` and push the filter down to the `Scan`.
if (used_columns.is_subset(&eval_scalar_prop.output_columns)
&& !used_columns.is_subset(&eval_scalar_child_prop.output_columns))
|| is_source_of_view
{
let new_predicates = &filter
.predicates
.iter()
.map(|predicate| Self::replace_predicate(predicate, &eval_scalar.items))
.collect::<Result<Vec<ScalarExpr>>>()?;
for pred in filter.predicates.iter() {
if pred
.used_columns()
.is_subset(&eval_scalar_prop.output_columns)
{
// Replace `BoundColumnRef` with the column expression introduced in `EvalScalar`.
let rewritten_predicate = Self::replace_predicate(pred, &eval_scalar.items)?;
pushed_down_predicates.push(rewritten_predicate);
} else {
remaining_predicates.push(pred.clone());
}
}

filter.predicates = new_predicates.to_vec();
let mut result = s_expr.child(0)?.child(0)?.clone();

used_columns.clear();
for pred in filter.predicates.iter() {
used_columns = used_columns.union(&pred.used_columns()).cloned().collect();
}
if !pushed_down_predicates.is_empty() {
let pushed_down_filter = Filter {
predicates: pushed_down_predicates,
};
result = SExpr::create_unary(Arc::new(pushed_down_filter.into()), Arc::new(result));
}

// Check if `Filter` can be satisfied by children of `EvalScalar`
if used_columns.is_subset(&eval_scalar_child_prop.output_columns) {
// TODO(leiysky): partial push down conjunctions
// For example, `select a from (select a, a+1 as b from t) where a = 1 and b = 2`
// can be optimized as `select a from (select a, a+1 as b from t where a = 1) where b = 2`
let new_expr = SExpr::create_unary(
Arc::new(eval_scalar.into()),
Arc::new(SExpr::create_unary(
Arc::new(filter.into()),
Arc::new(input.child(0)?.clone()),
)),
);
state.add_result(new_expr);
result = SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(result));

if !remaining_predicates.is_empty() {
let remaining_filter = Filter {
predicates: remaining_predicates,
};
result = SExpr::create_unary(Arc::new(remaining_filter.into()), Arc::new(result));
result.set_applied_rule(&self.id);
}

state.add_result(result);
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ RowFetch
├── estimated rows: 0.00
└── Filter
├── output columns: [t_11831.uid (#0), t_11831.time (#3), t_11831._row_id (#4)]
├── filters: [is_true(t1.uid (#0) = 11), is_true(t_11831.time (#3) >= 1686672000000), is_true(t_11831.time (#3) <= 1686758399000)]
├── filters: [is_true(t_11831.uid (#0) = 11), is_true(t_11831.time (#3) >= 1686672000000), is_true(t_11831.time (#3) <= 1686758399000)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t_11831
Expand All @@ -106,7 +106,7 @@ RowFetch
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [and_filters(and_filters(and_filters(and_filters(t_11831.time (#3) >= 1686672000000, t_11831.time (#3) <= 1686758399000), t1.uid (#0) = 11), t_11831.time (#3) >= 1686672000000), t_11831.time (#3) <= 1686758399000)], limit: NONE]
├── push downs: [filters: [and_filters(and_filters(and_filters(and_filters(t_11831.time (#3) >= 1686672000000, t_11831.time (#3) <= 1686758399000), t_11831.uid (#0) = 11), t_11831.time (#3) >= 1686672000000), t_11831.time (#3) <= 1686758399000)], limit: NONE]
└── estimated rows: 0.00

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Limit
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [numbers.b (#0) > 1]
├── filters: [numbers.number (#0) > 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ Limit
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [numbers.b (#0) > 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) > 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ explain select * from (select * from numbers(1)) as t1 where number = 1
----
Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.number (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -25,7 +25,7 @@ Filter
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.number (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down Expand Up @@ -58,7 +58,7 @@ EvalScalar
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.a (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ EvalScalar
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.a (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
8 changes: 4 additions & 4 deletions tests/sqllogictests/suites/mode/standalone/explain/sort.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Sort
├── estimated rows: 0.00
└── Filter
├── output columns: [t1.a (#0)]
├── filters: [is_true(t2.a (#0) > 1)]
├── filters: [is_true(t1.a (#0) > 1)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
Expand All @@ -19,7 +19,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

query T
Expand Down Expand Up @@ -52,7 +52,7 @@ Sort
├── estimated rows: 0.00
└── Filter
├── output columns: [t1.a (#0)]
├── filters: [is_true(t2.a (#0) > 1)]
├── filters: [is_true(t1.a (#0) > 1)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
Expand All @@ -61,7 +61,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Limit
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [numbers.b (#0) > 1]
├── filters: [numbers.number (#0) > 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ Limit
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [numbers.b (#0) > 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) > 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ explain select * from (select * from numbers(1)) as t1 where number = 1
----
Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.number (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -25,7 +25,7 @@ Filter
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.number (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down Expand Up @@ -58,7 +58,7 @@ EvalScalar
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.a (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ EvalScalar
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.a (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

query T
Expand Down Expand Up @@ -49,7 +49,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
Expand Down

0 comments on commit 14e3b1e

Please sign in to comment.