Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::execution::dataframe_impl::DataFrameImpl;
use crate::logical_plan::{
FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::constant_folding::ConstantFolding;
use crate::optimizer::filter_push_down::FilterPushDown;
use crate::optimizer::limit_push_down::LimitPushDown;
Expand Down Expand Up @@ -709,6 +710,7 @@ impl Default for ExecutionConfig {
batch_size: 8192,
optimizers: vec![
Arc::new(ConstantFolding::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(ProjectionPushDown::new()),
Arc::new(FilterPushDown::new()),
Expand Down Expand Up @@ -988,6 +990,7 @@ impl FunctionRegistry for ExecutionContextState {
mod tests {

use super::*;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::make_scalar_function;
use crate::physical_plan::{collect, collect_partitioned};
use crate::test;
Expand Down Expand Up @@ -1966,6 +1969,27 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn aggregate_avg_add() -> Result<()> {
let results = execute(
"SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
4,
)
.await?;
assert_eq!(results.len(), 1);

let expected = vec![
"+--------------+----------------------------+----------------------------+----------------------------+",
"| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |",
"+--------------+----------------------------+----------------------------+----------------------------+",
"| 1.5 | 2.5 | 3.5 | 2.5 |",
"+--------------+----------------------------+----------------------------+----------------------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn join_partitioned() -> Result<()> {
// self join on partition id (workaround for duplicate column name)
Expand Down Expand Up @@ -2134,6 +2158,30 @@ mod tests {
}
}

#[tokio::test]
async fn unprojected_filter() {
let mut ctx = ExecutionContext::new();
let df = ctx
.read_table(test::table_with_sequence(1, 3).unwrap())
.unwrap();

let df = df
.select(vec![binary_expr(col("i"), Operator::Plus, col("i"))])
.unwrap()
.filter(col("i").gt(lit(2)))
.unwrap();
let results = df.collect().await.unwrap();

let expected = vec![
"+--------------------------+",
"| ?table?.i Plus ?table?.i |",
"+--------------------------+",
"| 6 |",
"+--------------------------+",
];
assert_batches_sorted_eq!(expected, &results);
}

#[tokio::test]
async fn group_by_dictionary() {
async fn run_test_case<K: ArrowDictionaryKeyType>() {
Expand Down
47 changes: 35 additions & 12 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,11 @@ impl Expr {
where
R: ExprRewriter,
{
if !rewriter.pre_visit(&self)? {
return Ok(self);
let need_mutate = match rewriter.pre_visit(&self)? {
RewriteRecursion::Mutate => return rewriter.mutate(self),
RewriteRecursion::Stop => return Ok(self),
RewriteRecursion::Continue => true,
RewriteRecursion::Skip => false,
};

// recurse into all sub expressions(and cover all expression types)
Expand Down Expand Up @@ -915,14 +918,18 @@ impl Expr {
negated,
} => Expr::InList {
expr: rewrite_boxed(expr, rewriter)?,
list,
list: rewrite_vec(list, rewriter)?,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an accident.

Since this PR now contains two changes about "expr" (this one and RewriteRecursion below), perhaps it would be clearer to move these changes to another PR?

cc @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree two PRs is almost always clearer :)

please do let me know when you would like another look at this PR

negated,
},
Expr::Wildcard => Expr::Wildcard,
};

// now rewrite this expression itself
rewriter.mutate(expr)
if need_mutate {
rewriter.mutate(expr)
} else {
Ok(expr)
}
}
}

Expand Down Expand Up @@ -990,15 +997,27 @@ pub trait ExpressionVisitor: Sized {
}
}

/// Controls how the [ExprRewriter] recursion should proceed.
pub enum RewriteRecursion {
/// Continue rewrite / visit this expression.
Continue,
/// Call [mutate()] immediately and return.
Mutate,
/// Do not rewrite / visit the children of this expression.
Stop,
/// Keep recursive but skip mutate on this expression
Skip,
}

/// Trait for potentially recursively rewriting an [`Expr`] expression
/// tree. When passed to `Expr::rewrite`, `ExpressionVisitor::mutate` is
/// invoked recursively on all nodes of an expression tree. See the
/// comments on `Expr::rewrite` for details on its use
pub trait ExprRewriter: Sized {
/// Invoked before any children of `expr` are rewritten /
/// visited. Default implementation returns `Ok(true)`
fn pre_visit(&mut self, _expr: &Expr) -> Result<bool> {
Ok(true)
/// visited. Default implementation returns `Ok(RewriteRecursion::Continue)`
fn pre_visit(&mut self, _expr: &Expr) -> Result<RewriteRecursion> {
Ok(RewriteRecursion::Continue)
}

/// Invoked after all children of `expr` have been mutated and
Expand Down Expand Up @@ -1721,13 +1740,17 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
} => {
let mut name = "CASE ".to_string();
if let Some(e) = expr {
name += &format!("{:?} ", e);
let e = create_name(e, input_schema)?;
name += &format!("{} ", e);
}
for (w, t) in when_then_expr {
name += &format!("WHEN {:?} THEN {:?} ", w, t);
let when = create_name(w, input_schema)?;
let then = create_name(t, input_schema)?;
name += &format!("WHEN {} THEN {} ", when, then);
}
if let Some(e) = else_expr {
name += &format!("ELSE {:?} ", e);
let e = create_name(e, input_schema)?;
name += &format!("ELSE {} ", e);
}
name += "END";
Ok(name)
Expand Down Expand Up @@ -1887,9 +1910,9 @@ mod tests {
Ok(expr)
}

fn pre_visit(&mut self, expr: &Expr) -> Result<bool> {
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
self.v.push(format!("Previsited {:?}", expr));
Ok(true)
Ok(RewriteRecursion::Continue)
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use expr::{
right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part,
sqrt, starts_with, strpos, substr, sum, tan, to_hex, translate, trim, trunc,
unnormalize_col, unnormalize_cols, upper, when, Column, Expr, ExprRewriter,
ExpressionVisitor, Literal, Recursion,
ExpressionVisitor, Literal, Recursion, RewriteRecursion,
};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
Expand Down
Loading