Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.14", optional = true }
stacker = { version = "0.1.14"}

[dev-dependencies]
criterion = "0.3"
Expand Down
5 changes: 5 additions & 0 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub enum DataFusionError {
/// Error returned during execution of the query.
/// Examples include files not found, errors in parsing certain types.
Execution(String),
/// Error returned if recursion exceeded limit
RecursionLimitErr(usize),
}

impl DataFusionError {
Expand Down Expand Up @@ -129,6 +131,9 @@ impl Display for DataFusionError {
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {}", desc)
}
DataFusionError::RecursionLimitErr(ref desc) => {
write!(f, "Recursion exceeded limit: {}", desc)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub use arrow;
pub use parquet;

pub(crate) mod field_util;
pub(crate) mod safe_stack;

#[cfg(feature = "pyarrow")]
mod pyarrow;
Expand Down
245 changes: 125 additions & 120 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::physical_plan::{
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
window_functions,
};
use crate::safe_stack::maybe_grow;
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
Expand Down Expand Up @@ -379,7 +380,7 @@ impl Expr {
/// This happens when e.g. the expression refers to a column that does not exist in the schema, or when
/// the expression is incorrectly typed (e.g. `[utf8] + [bool]`).
pub fn get_type(&self, schema: &DFSchema) -> Result<DataType> {
match self {
maybe_grow(|| match self {
Expr::Alias(expr, _) => expr.get_type(schema),
Expr::Column(c) => Ok(schema.field_from_column(c)?.data_type().clone()),
Expr::ScalarVariable(_) => Ok(DataType::Utf8),
Expand Down Expand Up @@ -446,7 +447,7 @@ impl Expr {

get_indexed_field(&data_type, key).map(|x| x.data_type().clone())
}
}
})
}

/// Returns the nullability of the expression based on [arrow::datatypes::Schema].
Expand All @@ -456,7 +457,7 @@ impl Expr {
/// This function errors when it is not possible to compute its nullability.
/// This happens when the expression refers to a column that does not exist in the schema.
pub fn nullable(&self, input_schema: &DFSchema) -> Result<bool> {
match self {
maybe_grow(|| match self {
Expr::Alias(expr, _) => expr.nullable(input_schema),
Expr::Column(c) => Ok(input_schema.field_from_column(c)?.is_nullable()),
Expr::Literal(value) => Ok(value.is_null()),
Expand Down Expand Up @@ -505,7 +506,7 @@ impl Expr {
let data_type = expr.get_type(input_schema)?;
get_indexed_field(&data_type, key).map(|x| x.is_nullable())
}
}
})
}

/// Returns the name of this expression based on [crate::logical_plan::DFSchema].
Expand Down Expand Up @@ -702,7 +703,7 @@ impl Expr {
};

// recurse (and cover all expression types)
let visitor = match self {
let visitor = maybe_grow(|| match self {
Expr::Alias(expr, _) => expr.accept(visitor),
Expr::Column(_) => Ok(visitor),
Expr::ScalarVariable(..) => Ok(visitor),
Expand Down Expand Up @@ -784,7 +785,7 @@ impl Expr {
}
Expr::Wildcard => Ok(visitor),
Expr::GetIndexedField { ref expr, .. } => expr.accept(visitor),
}?;
})?;

visitor.post_visit(self)
}
Expand Down Expand Up @@ -834,121 +835,125 @@ impl Expr {
};

// recurse into all sub expressions(and cover all expression types)
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(_) => self.clone(),
Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
Expr::Negative(expr) => Expr::Negative(rewrite_boxed(expr, rewriter)?),
Expr::Between {
expr,
low,
high,
negated,
} => Expr::Between {
expr: rewrite_boxed(expr, rewriter)?,
low: rewrite_boxed(low, rewriter)?,
high: rewrite_boxed(high, rewriter)?,
negated,
},
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let expr = rewrite_option_box(expr, rewriter)?;
let when_then_expr = when_then_expr
.into_iter()
.map(|(when, then)| {
Ok((
rewrite_boxed(when, rewriter)?,
rewrite_boxed(then, rewriter)?,
))
})
.collect::<Result<Vec<_>>>()?;

let else_expr = rewrite_option_box(else_expr, rewriter)?;

let expr = maybe_grow(|| {
Ok::<_, DataFusionError>(match self {
Expr::Alias(expr, name) => {
Expr::Alias(rewrite_boxed(expr, rewriter)?, name)
}
Expr::Column(_) => self.clone(),
Expr::ScalarVariable(names) => Expr::ScalarVariable(names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
left: rewrite_boxed(left, rewriter)?,
op,
right: rewrite_boxed(right, rewriter)?,
},
Expr::Not(expr) => Expr::Not(rewrite_boxed(expr, rewriter)?),
Expr::IsNotNull(expr) => Expr::IsNotNull(rewrite_boxed(expr, rewriter)?),
Expr::IsNull(expr) => Expr::IsNull(rewrite_boxed(expr, rewriter)?),
Expr::Negative(expr) => Expr::Negative(rewrite_boxed(expr, rewriter)?),
Expr::Between {
expr,
low,
high,
negated,
} => Expr::Between {
expr: rewrite_boxed(expr, rewriter)?,
low: rewrite_boxed(low, rewriter)?,
high: rewrite_boxed(high, rewriter)?,
negated,
},
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let expr = rewrite_option_box(expr, rewriter)?;
let when_then_expr = when_then_expr
.into_iter()
.map(|(when, then)| {
Ok((
rewrite_boxed(when, rewriter)?,
rewrite_boxed(then, rewriter)?,
))
})
.collect::<Result<Vec<_>>>()?;

let else_expr = rewrite_option_box(else_expr, rewriter)?;

Expr::Case {
expr,
when_then_expr,
else_expr,
}
}
}
Expr::Cast { expr, data_type } => Expr::Cast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::TryCast { expr, data_type } => Expr::TryCast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::Sort {
expr,
asc,
nulls_first,
} => Expr::Sort {
expr: rewrite_boxed(expr, rewriter)?,
asc,
nulls_first,
},
Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::WindowFunction {
args,
fun,
partition_by,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
partition_by: rewrite_vec(partition_by, rewriter)?,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expr::AggregateFunction {
args,
fun,
distinct,
} => Expr::AggregateFunction {
args: rewrite_vec(args, rewriter)?,
fun,
distinct,
},
Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::InList {
expr,
list,
negated,
} => Expr::InList {
expr: rewrite_boxed(expr, rewriter)?,
list: rewrite_vec(list, rewriter)?,
negated,
},
Expr::Wildcard => Expr::Wildcard,
Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
expr: rewrite_boxed(expr, rewriter)?,
key,
},
};
Expr::Cast { expr, data_type } => Expr::Cast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::TryCast { expr, data_type } => Expr::TryCast {
expr: rewrite_boxed(expr, rewriter)?,
data_type,
},
Expr::Sort {
expr,
asc,
nulls_first,
} => Expr::Sort {
expr: rewrite_boxed(expr, rewriter)?,
asc,
nulls_first,
},
Expr::ScalarFunction { args, fun } => Expr::ScalarFunction {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::ScalarUDF { args, fun } => Expr::ScalarUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::WindowFunction {
args,
fun,
partition_by,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
partition_by: rewrite_vec(partition_by, rewriter)?,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expr::AggregateFunction {
args,
fun,
distinct,
} => Expr::AggregateFunction {
args: rewrite_vec(args, rewriter)?,
fun,
distinct,
},
Expr::AggregateUDF { args, fun } => Expr::AggregateUDF {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::InList {
expr,
list,
negated,
} => Expr::InList {
expr: rewrite_boxed(expr, rewriter)?,
list: rewrite_vec(list, rewriter)?,
negated,
},
Expr::Wildcard => Expr::Wildcard,
Expr::GetIndexedField { expr, key } => Expr::GetIndexedField {
expr: rewrite_boxed(expr, rewriter)?,
key,
},
})
})?;

// now rewrite this expression itself
if need_mutate {
Expand Down Expand Up @@ -1722,7 +1727,7 @@ fn fmt_function(

impl fmt::Debug for Expr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
maybe_grow(|| match self {
Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
Expr::Column(c) => write!(f, "{}", c),
Expr::ScalarVariable(var_names) => write!(f, "{}", var_names.join(".")),
Expand Down Expand Up @@ -1841,7 +1846,7 @@ impl fmt::Debug for Expr {
Expr::GetIndexedField { ref expr, key } => {
write!(f, "({:?})[{}]", expr, key)
}
}
})
}
}

Expand All @@ -1865,7 +1870,7 @@ fn create_function_name(
/// Returns a readable name of an expression based on the input schema.
/// This function recursively transverses the expression for names such as "CAST(a > 2)".
fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
match e {
maybe_grow(|| match e {
Expr::Alias(_, name) => Ok(name.clone()),
Expr::Column(c) => Ok(c.flat_name()),
Expr::ScalarVariable(variable_names) => Ok(variable_names.join(".")),
Expand Down Expand Up @@ -2002,7 +2007,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Expr::Wildcard => Err(DataFusionError::Internal(
"Create name does not support wildcard".to_string(),
)),
}
})
}

/// Create field meta-data from an expression, for use in a result set schema
Expand Down
Loading