From 306e85b153a70a817f7954f8fbb3590db44ab369 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Sun, 8 Sep 2024 20:53:40 +1000 Subject: [PATCH 1/5] Refactor sql_expr_to_logical_expr_internal to reduce stack size --- datafusion/sql/src/expr/mod.rs | 232 ++++++++++--------- datafusion/sqllogictest/bin/sqllogictests.rs | 2 - 2 files changed, 120 insertions(+), 114 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 035fd3816c6c..4a331bc6d37a 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -21,8 +21,8 @@ use datafusion_expr::planner::{ PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr, }; use sqlparser::ast::{ - BinaryOperator, CastKind, DictionaryField, Expr as SQLExpr, MapEntry, StructField, - Subscript, TrimWhereField, Value, + BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, DictionaryField, + Expr as SQLExpr, MapEntry, StructField, Subscript, TrimWhereField, Value, }; use datafusion_common::{ @@ -174,6 +174,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { + // NOTE: This function is called recusively, so each match arm body should be as + // small as possible to avoid stack overflows in debug builds. Follow the + // common pattern of extracting into a separate function for non-trivial + // arms. match sql { SQLExpr::Value(value) => { self.parse_value(value, planner_context.prepare_param_data_types()) @@ -210,91 +214,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // ["foo"], [4] or [4:5] SQLExpr::Subscript { expr, subscript } => { - let expr = - self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; - - let field_access = match *subscript { - Subscript::Index { index } => { - // index can be a name, in which case it is a named field access - match index { - SQLExpr::Value( - Value::SingleQuotedString(s) - | Value::DoubleQuotedString(s), - ) => GetFieldAccess::NamedStructField { - name: ScalarValue::from(s), - }, - SQLExpr::JsonAccess { .. } => { - return not_impl_err!("JsonAccess"); - } - // otherwise treat like a list index - _ => GetFieldAccess::ListIndex { - key: Box::new(self.sql_expr_to_logical_expr( - index, - schema, - planner_context, - )?), - }, - } - } - Subscript::Slice { - lower_bound, - upper_bound, - stride, - } => { - // Means access like [:2] - let lower_bound = if let Some(lower_bound) = lower_bound { - self.sql_expr_to_logical_expr( - lower_bound, - schema, - planner_context, - ) - } else { - not_impl_err!("Slice subscript requires a lower bound") - }?; - - // means access like [2:] - let upper_bound = if let Some(upper_bound) = upper_bound { - self.sql_expr_to_logical_expr( - upper_bound, - schema, - planner_context, - ) - } else { - not_impl_err!("Slice subscript requires an upper bound") - }?; - - // stride, default to 1 - let stride = if let Some(stride) = stride { - self.sql_expr_to_logical_expr( - stride, - schema, - planner_context, - )? - } else { - lit(1i64) - }; - - GetFieldAccess::ListRange { - start: Box::new(lower_bound), - stop: Box::new(upper_bound), - stride: Box::new(stride), - } - } - }; - - let mut field_access_expr = RawFieldAccessExpr { expr, field_access }; - for planner in self.context_provider.get_expr_planners() { - match planner.plan_field_access(field_access_expr, schema)? { - PlannerResult::Planned(expr) => return Ok(expr), - PlannerResult::Original(expr) => { - field_access_expr = expr; - } - } - } - - not_impl_err!( - "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}" - ) + self.sql_subscript_to_expr(*expr, subscript, schema, planner_context) } SQLExpr::CompoundIdentifier(ids) => { @@ -320,31 +240,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { expr, data_type, format, - } => { - if let Some(format) = format { - return not_impl_err!("CAST with format is not supported: {format}"); - } - - let dt = self.convert_data_type(&data_type)?; - let expr = - self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; - - // numeric constants are treated as seconds (rather as nanoseconds) - // to align with postgres / duckdb semantics - let expr = match &dt { - DataType::Timestamp(TimeUnit::Nanosecond, tz) - if expr.get_type(schema)? == DataType::Int64 => - { - Expr::Cast(Cast::new( - Box::new(expr), - DataType::Timestamp(TimeUnit::Second, tz.clone()), - )) - } - _ => expr, - }; - - Ok(Expr::Cast(Cast::new(Box::new(expr), dt))) - } + } => self.sql_cast_to_expr(expr, data_type, format, schema, planner_context), SQLExpr::Cast { kind: CastKind::TryCast | CastKind::SafeCast, @@ -1016,6 +912,118 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}") } + + fn sql_cast_to_expr( + &self, + expr: Box, + data_type: SQLDataType, + format: Option, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + if let Some(format) = format { + return not_impl_err!("CAST with format is not supported: {format}"); + } + + let dt = self.convert_data_type(&data_type)?; + let expr = self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; + + // numeric constants are treated as seconds (rather as nanoseconds) + // to align with postgres / duckdb semantics + let expr = match &dt { + DataType::Timestamp(TimeUnit::Nanosecond, tz) + if expr.get_type(schema)? == DataType::Int64 => + { + Expr::Cast(Cast::new( + Box::new(expr), + DataType::Timestamp(TimeUnit::Second, tz.clone()), + )) + } + _ => expr, + }; + + Ok(Expr::Cast(Cast::new(Box::new(expr), dt))) + } + + fn sql_subscript_to_expr( + &self, + expr: SQLExpr, + subscript: Box, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?; + + let field_access = match *subscript { + Subscript::Index { index } => { + // index can be a name, in which case it is a named field access + match index { + SQLExpr::Value( + Value::SingleQuotedString(s) | Value::DoubleQuotedString(s), + ) => GetFieldAccess::NamedStructField { + name: ScalarValue::from(s), + }, + SQLExpr::JsonAccess { .. } => { + return not_impl_err!("JsonAccess"); + } + // otherwise treat like a list index + _ => GetFieldAccess::ListIndex { + key: Box::new(self.sql_expr_to_logical_expr( + index, + schema, + planner_context, + )?), + }, + } + } + Subscript::Slice { + lower_bound, + upper_bound, + stride, + } => { + // Means access like [:2] + let lower_bound = if let Some(lower_bound) = lower_bound { + self.sql_expr_to_logical_expr(lower_bound, schema, planner_context) + } else { + not_impl_err!("Slice subscript requires a lower bound") + }?; + + // means access like [2:] + let upper_bound = if let Some(upper_bound) = upper_bound { + self.sql_expr_to_logical_expr(upper_bound, schema, planner_context) + } else { + not_impl_err!("Slice subscript requires an upper bound") + }?; + + // stride, default to 1 + let stride = if let Some(stride) = stride { + self.sql_expr_to_logical_expr(stride, schema, planner_context)? + } else { + lit(1i64) + }; + + GetFieldAccess::ListRange { + start: Box::new(lower_bound), + stop: Box::new(upper_bound), + stride: Box::new(stride), + } + } + }; + + let mut field_access_expr = RawFieldAccessExpr { expr, field_access }; + for planner in self.context_provider.get_expr_planners() { + match planner.plan_field_access(field_access_expr, schema)? { + PlannerResult::Planned(expr) => return Ok(expr), + PlannerResult::Original(expr) => { + field_access_expr = expr; + } + } + } + + not_impl_err!( + "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}" + ) + } } #[cfg(test)] diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 8c8ed2e58743..baa49057e1b9 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -30,11 +30,9 @@ use datafusion_common_runtime::SpawnedTask; const TEST_DIRECTORY: &str = "test_files/"; const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; -const STACK_SIZE: usize = 2 * 1024 * 1024 + 512 * 1024; // 2.5 MBs, the default 2 MBs is currently too small pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() - .thread_stack_size(STACK_SIZE) .enable_all() .build() .unwrap() From 130e6918ea98036df0534f48bf0a3bf23269b3ca Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Sun, 8 Sep 2024 21:03:08 +1000 Subject: [PATCH 2/5] Pass Expr by value instead of via Box --- datafusion/sql/src/expr/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 4a331bc6d37a..1b49fad0b8e6 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -240,7 +240,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { expr, data_type, format, - } => self.sql_cast_to_expr(expr, data_type, format, schema, planner_context), + } => self.sql_cast_to_expr(*expr, data_type, format, schema, planner_context), SQLExpr::Cast { kind: CastKind::TryCast | CastKind::SafeCast, @@ -915,7 +915,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn sql_cast_to_expr( &self, - expr: Box, + expr: SQLExpr, data_type: SQLDataType, format: Option, schema: &DFSchema, @@ -926,7 +926,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } let dt = self.convert_data_type(&data_type)?; - let expr = self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; + let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?; // numeric constants are treated as seconds (rather as nanoseconds) // to align with postgres / duckdb semantics From 7b091ea790d48d11757293f7e435c2b9cfe44822 Mon Sep 17 00:00:00 2001 From: Jeffrey Vo Date: Mon, 9 Sep 2024 07:37:05 +1000 Subject: [PATCH 3/5] Update datafusion/sql/src/expr/mod.rs Co-authored-by: Andrew Lamb --- datafusion/sql/src/expr/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 1b49fad0b8e6..02176ee4812a 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -178,6 +178,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // small as possible to avoid stack overflows in debug builds. Follow the // common pattern of extracting into a separate function for non-trivial // arms. + // + // See https://github.com/apache/datafusion/pull/12384 for more context match sql { SQLExpr::Value(value) => { self.parse_value(value, planner_context.prepare_param_data_types()) From fd0b8193a2db4d775b5203d090642c5656444d40 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Mon, 9 Sep 2024 07:51:08 +1000 Subject: [PATCH 4/5] Cargo fmt --- datafusion/sql/src/expr/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 02176ee4812a..80ab60807903 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -178,7 +178,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // small as possible to avoid stack overflows in debug builds. Follow the // common pattern of extracting into a separate function for non-trivial // arms. - // + // // See https://github.com/apache/datafusion/pull/12384 for more context match sql { SQLExpr::Value(value) => { From 7a3602ae0d9b7552a79f8b128b555c9311a33319 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Mon, 9 Sep 2024 07:51:49 +1000 Subject: [PATCH 5/5] Formatting --- datafusion/sql/src/expr/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 80ab60807903..6e975d8557dc 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -177,9 +177,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // NOTE: This function is called recusively, so each match arm body should be as // small as possible to avoid stack overflows in debug builds. Follow the // common pattern of extracting into a separate function for non-trivial - // arms. - // - // See https://github.com/apache/datafusion/pull/12384 for more context + // arms. See https://github.com/apache/datafusion/pull/12384 for more context. match sql { SQLExpr::Value(value) => { self.parse_value(value, planner_context.prepare_param_data_types())