From 4d828756b57000f32289a07b4b9b0f80d103a5f8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 31 Dec 2022 08:57:26 -0500 Subject: [PATCH 1/5] fix: stackoverflow in planning tpcds queries --- datafusion/core/tests/tpcds_planning.rs | 1 - datafusion/sql/src/planner.rs | 722 +++++++++++++----------- 2 files changed, 404 insertions(+), 319 deletions(-) diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index ddd8898e4d26..424ecea8c9de 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -43,7 +43,6 @@ async fn tpcds_logical_q3() -> Result<()> { create_logical_plan(3).await } -#[ignore] // thread 'q4' has overflowed its stack #[tokio::test] async fn tpcds_logical_q4() -> Result<()> { create_logical_plan(4).await diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0ee5913cb64b..09af12160f30 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -25,11 +25,11 @@ use std::{convert::TryInto, vec}; use arrow_schema::*; use sqlparser::ast::{ArrayAgg, ExactNumberInfo, SetQuantifier}; use sqlparser::ast::{ - BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, - FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, - Offset as SQLOffset, Query, Select, SelectItem, SetExpr, SetOperator, - ShowCreateObject, ShowStatementFilter, TableAlias, TableFactor, TableWithJoins, - TrimWhereField, UnaryOperator, Value, Values as SQLValues, + BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, + Function as SQLFunction, FunctionArg, FunctionArgExpr, Ident, Join, JoinConstraint, + JoinOperator, ObjectName, Offset as SQLOffset, Query, Select, SelectItem, SetExpr, + SetOperator, ShowCreateObject, ShowStatementFilter, TableAlias, TableFactor, + TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{ObjectType, OrderByExpr, Statement}; @@ -1866,31 +1866,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { last_field, fractional_seconds_precision, ), - SQLExpr::Identifier(id) => { - if id.value.starts_with('@') { - // TODO: figure out if ScalarVariables should be insensitive. - let var_names = vec![id.value]; - let ty = self - .schema_provider - .get_variable_type(&var_names) - .ok_or_else(|| { - DataFusionError::Execution(format!( - "variable {var_names:?} has no type information" - )) - })?; - Ok(Expr::ScalarVariable(ty, var_names)) - } else { - // Don't use `col()` here because it will try to - // interpret names with '.' as if they were - // compound identifiers, but this is not a compound - // identifier. (e.g. it is "foo.bar" not foo.bar) - - Ok(Expr::Column(Column { - relation: None, - name: normalize_ident(id), - })) - } - } + SQLExpr::Identifier(id) => self.sql_identifier_to_expr(id), SQLExpr::MapAccess { column, keys } => { if let SQLExpr::Identifier(id) = *column { @@ -1907,93 +1883,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan_indexed(expr, indexes) } - SQLExpr::CompoundIdentifier(ids) => { - if ids[0].value.starts_with('@') { - let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect(); - let ty = self - .schema_provider - .get_variable_type(&var_names) - .ok_or_else(|| { - DataFusionError::Execution(format!( - "variable {var_names:?} has no type information" - )) - })?; - Ok(Expr::ScalarVariable(ty, var_names)) - } else { - // only support "schema.table" type identifiers here - let (name, relation) = match idents_to_table_reference(ids)? { - OwnedTableReference::Partial { schema, table } => (table, schema), - r @ OwnedTableReference::Bare { .. } | - r @ OwnedTableReference::Full { .. } => { - return Err(DataFusionError::Plan(format!( - "Unsupported compound identifier '{r:?}'", - ))); - } - }; - - // Try and find the reference in schema - match schema.field_with_qualified_name(&relation, &name) { - Ok(_) => { - // found an exact match on a qualified name so this is a table.column identifier - Ok(Expr::Column(Column { - relation: Some(relation), - name, - })) - } - Err(_) => { - if let Some(field) = schema.fields().iter().find(|f| f.name().eq(&relation)) { - // Access to a field of a column which is a structure, example: SELECT my_struct.key - Ok(Expr::GetIndexedField(GetIndexedField::new( - Box::new(Expr::Column(field.qualified_column())), - ScalarValue::Utf8(Some(name)), - ))) - } else { - // table.column identifier - Ok(Expr::Column(Column { - relation: Some(relation), - name, - })) - } - } - } - } - } + SQLExpr::CompoundIdentifier(ids) => self.sql_compound_identifier_to_expr(ids, schema), SQLExpr::Case { operand, conditions, results, else_result, - } => { - let expr = if let Some(e) = operand { - Some(Box::new(self.sql_expr_to_logical_expr(*e, schema, planner_context)?)) - } else { - None - }; - let when_expr = conditions - .into_iter() - .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) - .collect::>>()?; - let then_expr = results - .into_iter() - .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) - .collect::>>()?; - let else_expr = if let Some(e) = else_result { - Some(Box::new(self.sql_expr_to_logical_expr(*e, schema, planner_context)?)) - } else { - None - }; - - Ok(Expr::Case(Case::new( - expr, - when_expr - .iter() - .zip(then_expr.iter()) - .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned()))) - .collect(), - else_expr, - ))) - } + } => self.sql_case_identifier_to_expr(operand, conditions, results, else_result, schema, planner_context), SQLExpr::Cast { expr, @@ -2069,66 +1966,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { expr, list, negated, - } => { - let list_expr = list - .into_iter() - .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) - .collect::>>()?; + } => self.sql_in_list_to_expr(expr, list, negated, schema, planner_context), - Ok(Expr::InList { - expr: Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), - list: list_expr, - negated, - }) - } + SQLExpr::Like { negated, expr, pattern, escape_char } => self.sql_like_to_expr(negated, expr, pattern, escape_char, schema, planner_context), - SQLExpr::Like { negated, expr, pattern, escape_char } => { - let pattern = self.sql_expr_to_logical_expr(*pattern, schema, planner_context)?; - let pattern_type = pattern.get_type(schema)?; - if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { - return Err(DataFusionError::Plan( - "Invalid pattern in LIKE expression".to_string(), - )); - } - Ok(Expr::Like(Like::new( - negated, - Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), - Box::new(pattern), - escape_char, - ))) - } + SQLExpr::ILike { negated, expr, pattern, escape_char } => self.sql_ilike_to_expr(negated, expr, pattern, escape_char, schema, planner_context), - SQLExpr::ILike { negated, expr, pattern, escape_char } => { - let pattern = self.sql_expr_to_logical_expr(*pattern, schema, planner_context)?; - let pattern_type = pattern.get_type(schema)?; - if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { - return Err(DataFusionError::Plan( - "Invalid pattern in ILIKE expression".to_string(), - )); - } - Ok(Expr::ILike(Like::new( - negated, - Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), - Box::new(pattern), - escape_char, - ))) - } + SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => self.sql_similarto_to_expr(negated, expr, pattern, escape_char, schema, planner_context), - SQLExpr::SimilarTo { negated, expr, pattern, escape_char } => { - let pattern = self.sql_expr_to_logical_expr(*pattern, schema, planner_context)?; - let pattern_type = pattern.get_type(schema)?; - if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { - return Err(DataFusionError::Plan( - "Invalid pattern in SIMILAR TO expression".to_string(), - )); - } - Ok(Expr::SimilarTo(Like::new( - negated, - Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), - Box::new(pattern), - escape_char, - ))) - } SQLExpr::BinaryOp { left, @@ -2193,159 +2038,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )) } - SQLExpr::Trim { expr, trim_where, trim_what } => { - let fun = match trim_where { - Some(TrimWhereField::Leading) => { - BuiltinScalarFunction::Ltrim - } - Some(TrimWhereField::Trailing) => { - BuiltinScalarFunction::Rtrim - } - Some(TrimWhereField::Both) => { - BuiltinScalarFunction::Btrim - } - None => BuiltinScalarFunction::Trim - }; - let arg = self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; - let args = match trim_what { - Some(to_trim) => { - let to_trim = self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?; - vec![arg, to_trim] - } - None => vec![arg], - }; - Ok(Expr::ScalarFunction { fun, args }) - } + SQLExpr::Trim { expr, trim_where, trim_what } => self.sql_trim_to_expr(expr, trim_where, trim_what, schema, planner_context), - SQLExpr::AggregateExpressionWithFilter { expr, filter } => { - match self.sql_expr_to_logical_expr(*expr, schema, planner_context)? { - Expr::AggregateFunction(expr::AggregateFunction { - fun, args, distinct, .. - }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new( fun, args, distinct, Some(Box::new(self.sql_expr_to_logical_expr(*filter, schema, planner_context)?)) ))), - _ => Err(DataFusionError::Internal("AggregateExpressionWithFilter expression was not an AggregateFunction".to_string())) - } - } + SQLExpr::AggregateExpressionWithFilter { expr, filter } => self.sql_agg_with_filter_to_expr(expr, filter, schema, planner_context), - SQLExpr::Function(mut function) => { - let name = if function.name.0.len() > 1 { - // DF doesn't handle compound identifiers - // (e.g. "foo.bar") for function names yet - function.name.to_string() - } else { - normalize_ident(function.name.0[0].clone()) - }; - - // first, check SQL reserved words - if name == "rollup" { - let args = self.function_args_to_expr(function.args, schema)?; - return Ok(Expr::GroupingSet(GroupingSet::Rollup(args))); - } else if name == "cube" { - let args = self.function_args_to_expr(function.args, schema)?; - return Ok(Expr::GroupingSet(GroupingSet::Cube(args))); - } - - // next, scalar built-in - if let Ok(fun) = BuiltinScalarFunction::from_str(&name) { - let args = self.function_args_to_expr(function.args, schema)?; - return Ok(Expr::ScalarFunction { fun, args }); - }; - - // then, window function - if let Some(window) = function.over.take() { - let partition_by = window - .partition_by - .into_iter() - .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) - .collect::>>()?; - let order_by = window - .order_by - .into_iter() - .map(|e| self.order_by_to_sort_expr(e, schema)) - .collect::>>()?; - let window_frame = window - .window_frame - .as_ref() - .map(|window_frame| { - let window_frame: WindowFrame = window_frame.clone().try_into()?; - if WindowFrameUnits::Range == window_frame.units - && order_by.len() != 1 - { - Err(DataFusionError::Plan(format!( - "With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len()))) - } else { - Ok(window_frame) - } - }) - .transpose()?; - let window_frame = if let Some(window_frame) = window_frame { - window_frame - } else { - WindowFrame::new(!order_by.is_empty()) - }; - let fun = self.find_window_func(&name)?; - let expr = match fun { - WindowFunction::AggregateFunction( - aggregate_fun, - ) => { - let (aggregate_fun, args) = self.aggregate_fn_to_expr( - aggregate_fun, - function.args, - schema, - )?; - - Expr::WindowFunction(expr::WindowFunction::new( - WindowFunction::AggregateFunction( - aggregate_fun, - ), - args, - partition_by, - order_by, - window_frame, - )) - } - _ => { - Expr::WindowFunction(expr::WindowFunction::new( - fun, - self.function_args_to_expr(function.args, schema)?, - partition_by, - order_by, - window_frame, - )) - } - }; - return Ok(expr); - } - - // next, aggregate built-ins - if let Ok(fun) = AggregateFunction::from_str(&name) { - let distinct = function.distinct; - let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?; - return Ok(Expr::AggregateFunction(expr::AggregateFunction::new( - fun, - args, - distinct, - None, - ))); - }; - - // finally, user-defined functions (UDF) and UDAF - match self.schema_provider.get_function_meta(&name) { - Some(fm) => { - let args = self.function_args_to_expr(function.args, schema)?; - - Ok(Expr::ScalarUDF { fun: fm, args }) - } - None => match self.schema_provider.get_aggregate_meta(&name) { - Some(fm) => { - let args = self.function_args_to_expr(function.args, schema)?; - Ok(Expr::AggregateUDF { fun: fm, args, filter: None }) - } - _ => Err(DataFusionError::Plan(format!( - "Invalid function '{name}'" - ))), - }, - } - } + SQLExpr::Function(function) => self.sql_function_to_expr(function, schema, planner_context), SQLExpr::Floor { expr, field: _field } => { let fun = BuiltinScalarFunction::Floor; @@ -2375,6 +2072,395 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + fn sql_function_to_expr( + &self, + mut function: SQLFunction, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let name = if function.name.0.len() > 1 { + // DF doesn't handle compound identifiers + // (e.g. "foo.bar") for function names yet + function.name.to_string() + } else { + normalize_ident(function.name.0[0].clone()) + }; + + // first, check SQL reserved words + if name == "rollup" { + let args = self.function_args_to_expr(function.args, schema)?; + return Ok(Expr::GroupingSet(GroupingSet::Rollup(args))); + } else if name == "cube" { + let args = self.function_args_to_expr(function.args, schema)?; + return Ok(Expr::GroupingSet(GroupingSet::Cube(args))); + } + + // next, scalar built-in + if let Ok(fun) = BuiltinScalarFunction::from_str(&name) { + let args = self.function_args_to_expr(function.args, schema)?; + return Ok(Expr::ScalarFunction { fun, args }); + }; + + // then, window function + if let Some(window) = function.over.take() { + let partition_by = window + .partition_by + .into_iter() + .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) + .collect::>>()?; + let order_by = window + .order_by + .into_iter() + .map(|e| self.order_by_to_sort_expr(e, schema)) + .collect::>>()?; + let window_frame = window + .window_frame + .as_ref() + .map(|window_frame| { + let window_frame: WindowFrame = window_frame.clone().try_into()?; + if WindowFrameUnits::Range == window_frame.units + && order_by.len() != 1 + { + Err(DataFusionError::Plan(format!( + "With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len()))) + } else { + Ok(window_frame) + } + }) + .transpose()?; + let window_frame = if let Some(window_frame) = window_frame { + window_frame + } else { + WindowFrame::new(!order_by.is_empty()) + }; + let fun = self.find_window_func(&name)?; + let expr = match fun { + WindowFunction::AggregateFunction(aggregate_fun) => { + let (aggregate_fun, args) = + self.aggregate_fn_to_expr(aggregate_fun, function.args, schema)?; + + Expr::WindowFunction(expr::WindowFunction::new( + WindowFunction::AggregateFunction(aggregate_fun), + args, + partition_by, + order_by, + window_frame, + )) + } + _ => Expr::WindowFunction(expr::WindowFunction::new( + fun, + self.function_args_to_expr(function.args, schema)?, + partition_by, + order_by, + window_frame, + )), + }; + return Ok(expr); + } + + // next, aggregate built-ins + if let Ok(fun) = AggregateFunction::from_str(&name) { + let distinct = function.distinct; + let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?; + return Ok(Expr::AggregateFunction(expr::AggregateFunction::new( + fun, args, distinct, None, + ))); + }; + + // finally, user-defined functions (UDF) and UDAF + match self.schema_provider.get_function_meta(&name) { + Some(fm) => { + let args = self.function_args_to_expr(function.args, schema)?; + + Ok(Expr::ScalarUDF { fun: fm, args }) + } + None => match self.schema_provider.get_aggregate_meta(&name) { + Some(fm) => { + let args = self.function_args_to_expr(function.args, schema)?; + Ok(Expr::AggregateUDF { + fun: fm, + args, + filter: None, + }) + } + _ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))), + }, + } + } + + fn sql_in_list_to_expr( + &self, + expr: Box, + list: Vec, + negated: bool, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let list_expr = list + .into_iter() + .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) + .collect::>>()?; + + Ok(Expr::InList { + expr: Box::new(self.sql_expr_to_logical_expr( + *expr, + schema, + planner_context, + )?), + list: list_expr, + negated, + }) + } + + fn sql_like_to_expr( + &self, + negated: bool, + expr: Box, + pattern: Box, + escape_char: Option, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let pattern = self.sql_expr_to_logical_expr(*pattern, schema, planner_context)?; + let pattern_type = pattern.get_type(schema)?; + if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { + return Err(DataFusionError::Plan( + "Invalid pattern in LIKE expression".to_string(), + )); + } + Ok(Expr::Like(Like::new( + negated, + Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), + Box::new(pattern), + escape_char, + ))) + } + + fn sql_ilike_to_expr( + &self, + negated: bool, + expr: Box, + pattern: Box, + escape_char: Option, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let pattern = self.sql_expr_to_logical_expr(*pattern, schema, planner_context)?; + let pattern_type = pattern.get_type(schema)?; + if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { + return Err(DataFusionError::Plan( + "Invalid pattern in ILIKE expression".to_string(), + )); + } + Ok(Expr::ILike(Like::new( + negated, + Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), + Box::new(pattern), + escape_char, + ))) + } + + fn sql_similarto_to_expr( + &self, + negated: bool, + expr: Box, + pattern: Box, + escape_char: Option, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let pattern = self.sql_expr_to_logical_expr(*pattern, schema, planner_context)?; + let pattern_type = pattern.get_type(schema)?; + if pattern_type != DataType::Utf8 && pattern_type != DataType::Null { + return Err(DataFusionError::Plan( + "Invalid pattern in SIMILAR TO expression".to_string(), + )); + } + Ok(Expr::SimilarTo(Like::new( + negated, + Box::new(self.sql_expr_to_logical_expr(*expr, schema, planner_context)?), + Box::new(pattern), + escape_char, + ))) + } + + fn sql_trim_to_expr( + &self, + expr: Box, + trim_where: Option, + trim_what: Option>, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let fun = match trim_where { + Some(TrimWhereField::Leading) => { + BuiltinScalarFunction::Ltrim + } + Some(TrimWhereField::Trailing) => { + BuiltinScalarFunction::Rtrim + } + Some(TrimWhereField::Both) => { + BuiltinScalarFunction::Btrim + } + None => BuiltinScalarFunction::Trim + }; + let arg = self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; + let args = match trim_what { + Some(to_trim) => { + let to_trim = self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?; + vec![arg, to_trim] + } + None => vec![arg], + }; + Ok(Expr::ScalarFunction { fun, args }) + } + + fn sql_agg_with_filter_to_expr( + &self, + expr: Box, + filter: Box, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + match self.sql_expr_to_logical_expr(*expr, schema, planner_context)? { + Expr::AggregateFunction(expr::AggregateFunction { + fun, args, distinct, .. + }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new( fun, args, distinct, Some(Box::new(self.sql_expr_to_logical_expr(*filter, schema, planner_context)?)) ))), + _ => Err(DataFusionError::Internal("AggregateExpressionWithFilter expression was not an AggregateFunction".to_string())) + } + } + + fn sql_identifier_to_expr(&self, id: Ident) -> Result { + if id.value.starts_with('@') { + // TODO: figure out if ScalarVariables should be insensitive. + let var_names = vec![id.value]; + let ty = self + .schema_provider + .get_variable_type(&var_names) + .ok_or_else(|| { + DataFusionError::Execution(format!( + "variable {var_names:?} has no type information" + )) + })?; + Ok(Expr::ScalarVariable(ty, var_names)) + } else { + // Don't use `col()` here because it will try to + // interpret names with '.' as if they were + // compound identifiers, but this is not a compound + // identifier. (e.g. it is "foo.bar" not foo.bar) + + Ok(Expr::Column(Column { + relation: None, + name: normalize_ident(id), + })) + } + } + + fn sql_compound_identifier_to_expr( + &self, + ids: Vec, + schema: &DFSchema, + ) -> Result { + if ids[0].value.starts_with('@') { + let var_names: Vec<_> = ids.into_iter().map(normalize_ident).collect(); + let ty = self + .schema_provider + .get_variable_type(&var_names) + .ok_or_else(|| { + DataFusionError::Execution(format!( + "variable {var_names:?} has no type information" + )) + })?; + Ok(Expr::ScalarVariable(ty, var_names)) + } else { + // only support "schema.table" type identifiers here + let (name, relation) = match idents_to_table_reference(ids)? { + OwnedTableReference::Partial { schema, table } => (table, schema), + r @ OwnedTableReference::Bare { .. } + | r @ OwnedTableReference::Full { .. } => { + return Err(DataFusionError::Plan(format!( + "Unsupported compound identifier '{r:?}'", + ))); + } + }; + + // Try and find the reference in schema + match schema.field_with_qualified_name(&relation, &name) { + Ok(_) => { + // found an exact match on a qualified name so this is a table.column identifier + Ok(Expr::Column(Column { + relation: Some(relation), + name, + })) + } + Err(_) => { + if let Some(field) = + schema.fields().iter().find(|f| f.name().eq(&relation)) + { + // Access to a field of a column which is a structure, example: SELECT my_struct.key + Ok(Expr::GetIndexedField(GetIndexedField::new( + Box::new(Expr::Column(field.qualified_column())), + ScalarValue::Utf8(Some(name)), + ))) + } else { + // table.column identifier + Ok(Expr::Column(Column { + relation: Some(relation), + name, + })) + } + } + } + } + } + + fn sql_case_identifier_to_expr( + &self, + operand: Option>, + conditions: Vec, + results: Vec, + else_result: Option>, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let expr = if let Some(e) = operand { + Some(Box::new(self.sql_expr_to_logical_expr( + *e, + schema, + planner_context, + )?)) + } else { + None + }; + let when_expr = conditions + .into_iter() + .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) + .collect::>>()?; + let then_expr = results + .into_iter() + .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context)) + .collect::>>()?; + let else_expr = if let Some(e) = else_result { + Some(Box::new(self.sql_expr_to_logical_expr( + *e, + schema, + planner_context, + )?)) + } else { + None + }; + + Ok(Expr::Case(Case::new( + expr, + when_expr + .iter() + .zip(then_expr.iter()) + .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned()))) + .collect(), + else_expr, + ))) + } + fn find_window_func(&self, name: &str) -> Result { window_function::find_df_window_func(name) .or_else(|| { From 998eee48761c298b23ea75b6fb3ac9aab0134a61 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 31 Dec 2022 09:40:08 -0500 Subject: [PATCH 2/5] uncomment some other tests --- datafusion/core/tests/tpcds_planning.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index 424ecea8c9de..1e3cea8beecd 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -178,7 +178,6 @@ async fn tpcds_logical_q30() -> Result<()> { create_logical_plan(30).await } -#[ignore] // thread 'q31' has overflowed its stack #[tokio::test] async fn tpcds_logical_q31() -> Result<()> { create_logical_plan(31).await @@ -344,7 +343,6 @@ async fn tpcds_logical_q63() -> Result<()> { create_logical_plan(63).await } -#[ignore] // thread 'q64' has overflowed its stack #[tokio::test] async fn tpcds_logical_q64() -> Result<()> { create_logical_plan(64).await @@ -540,7 +538,6 @@ async fn tpcds_physical_q3() -> Result<()> { create_physical_plan(3).await } -#[ignore] // thread 'q4' has overflowed its stack #[tokio::test] async fn tpcds_physical_q4() -> Result<()> { create_physical_plan(4).await @@ -682,7 +679,6 @@ async fn tpcds_physical_q30() -> Result<()> { create_physical_plan(30).await } -#[ignore] // thread 'q31' has overflowed its stack #[tokio::test] async fn tpcds_physical_q31() -> Result<()> { create_physical_plan(31).await @@ -854,7 +850,6 @@ async fn tpcds_physical_q63() -> Result<()> { create_physical_plan(63).await } -#[ignore] // thread 'q64' has overflowed its stack #[tokio::test] async fn tpcds_physical_q64() -> Result<()> { create_physical_plan(64).await From cefbbfc2da9ea181075c288b163e59f4d94eda18 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 31 Dec 2022 09:49:11 -0500 Subject: [PATCH 3/5] fix: fmt --- datafusion/sql/src/planner.rs | 44 +++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 09af12160f30..7c9913239748 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -2293,21 +2293,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { planner_context: &mut PlannerContext, ) -> Result { let fun = match trim_where { - Some(TrimWhereField::Leading) => { - BuiltinScalarFunction::Ltrim - } - Some(TrimWhereField::Trailing) => { - BuiltinScalarFunction::Rtrim - } - Some(TrimWhereField::Both) => { - BuiltinScalarFunction::Btrim - } - None => BuiltinScalarFunction::Trim + Some(TrimWhereField::Leading) => BuiltinScalarFunction::Ltrim, + Some(TrimWhereField::Trailing) => BuiltinScalarFunction::Rtrim, + Some(TrimWhereField::Both) => BuiltinScalarFunction::Btrim, + None => BuiltinScalarFunction::Trim, }; let arg = self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; let args = match trim_what { Some(to_trim) => { - let to_trim = self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?; + let to_trim = + self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?; vec![arg, to_trim] } None => vec![arg], @@ -2322,12 +2317,27 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - match self.sql_expr_to_logical_expr(*expr, schema, planner_context)? { - Expr::AggregateFunction(expr::AggregateFunction { - fun, args, distinct, .. - }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new( fun, args, distinct, Some(Box::new(self.sql_expr_to_logical_expr(*filter, schema, planner_context)?)) ))), - _ => Err(DataFusionError::Internal("AggregateExpressionWithFilter expression was not an AggregateFunction".to_string())) - } + match self.sql_expr_to_logical_expr(*expr, schema, planner_context)? { + Expr::AggregateFunction(expr::AggregateFunction { + fun, + args, + distinct, + .. + }) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new( + fun, + args, + distinct, + Some(Box::new(self.sql_expr_to_logical_expr( + *filter, + schema, + planner_context, + )?)), + ))), + _ => Err(DataFusionError::Internal( + "AggregateExpressionWithFilter expression was not an AggregateFunction" + .to_string(), + )), + } } fn sql_identifier_to_expr(&self, id: Ident) -> Result { From bbdcfe53105d49601d214b39d5984a04b65f1588 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 31 Dec 2022 10:18:59 -0500 Subject: [PATCH 4/5] Apply suggestions from code review --- datafusion/core/tests/tpcds_planning.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index 1e3cea8beecd..e55a03e4fc29 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -343,7 +343,7 @@ async fn tpcds_logical_q63() -> Result<()> { create_logical_plan(63).await } -#[tokio::test] +#[ignore] // thread 'q64' has overflowed its stack async fn tpcds_logical_q64() -> Result<()> { create_logical_plan(64).await } @@ -850,6 +850,7 @@ async fn tpcds_physical_q63() -> Result<()> { create_physical_plan(63).await } +#[ignore] // thread 'q64' has overflowed its stack #[tokio::test] async fn tpcds_physical_q64() -> Result<()> { create_physical_plan(64).await From e45065eecfd7857d599faa4c14f6fd61658501a8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 31 Dec 2022 10:19:45 -0500 Subject: [PATCH 5/5] Update datafusion/core/tests/tpcds_planning.rs --- datafusion/core/tests/tpcds_planning.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index e55a03e4fc29..7359f3906e2b 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -343,7 +343,8 @@ async fn tpcds_logical_q63() -> Result<()> { create_logical_plan(63).await } -#[ignore] // thread 'q64' has overflowed its stack +#[ignore] // thread 'q64' has overflowed its stack] +#[tokio::test] async fn tpcds_logical_q64() -> Result<()> { create_logical_plan(64).await }