From 0557ce45613797f324796eb5c6f40951b671a423 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Mon, 18 May 2026 10:48:57 -0400 Subject: [PATCH] refactor: thread SubqueryContext explicitly through physical planning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes the SessionState.clone() + execution_props_mut() trick used by DefaultPhysicalPlanner to register scalar-subquery state for expression lowering. That side channel relied on stashing per-plan state in ExecutionProps, which forced physical planning to hold a mutable SessionState and blocked moving QueryPlanner / PhysicalPlanner to &dyn Session. Introduces SubqueryContext in datafusion-expr and threads it explicitly through create_initial_plan_inner, task_helper, map_logical_node_to_physical, and the standalone planning helpers. Adds *_with_subquery_context dual entry points for create_physical_expr, create_physical_exprs, create_physical_sort_expr(s), create_window_expr(_with_name), and LoweredAggregateBuilder.with_subquery_context — original public signatures are preserved and delegate with SubqueryContext::default(), so downstream callers are unaffected. Drops the unreleased subquery_indexes and subquery_results fields from ExecutionProps. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/core/src/physical_planner.rs | 255 ++++++++++++--- datafusion/expr/src/execution_props.rs | 61 +++- datafusion/physical-expr/src/aggregate.rs | 52 ++- datafusion/physical-expr/src/lib.rs | 11 +- datafusion/physical-expr/src/physical_expr.rs | 53 +++- datafusion/physical-expr/src/planner.rs | 299 ++++++++++++++---- 6 files changed, 602 insertions(+), 129 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d225cff1deafc..d11b6482dc18b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -35,7 +35,10 @@ use crate::logical_expr::{ Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, UserDefinedLogicalNode, }; -use crate::physical_expr::{create_physical_expr, create_physical_exprs}; +use crate::physical_expr::{ + create_physical_expr, create_physical_expr_with_subquery_context, + create_physical_exprs_with_subquery_context, +}; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; @@ -79,7 +82,9 @@ use datafusion_common::{ use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::memory::MemorySourceConfig; use datafusion_expr::dml::{CopyTo, InsertOp}; -use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; +use datafusion_expr::execution_props::{ + ScalarSubqueryResults, SubqueryContext, SubqueryIndex, +}; use datafusion_expr::expr::{ Alias, GroupingSet, NullTreatment, WindowFunction, WindowFunctionParams, physical_name, @@ -98,7 +103,7 @@ use datafusion_physical_expr::aggregate::{ }; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::{ - LexOrdering, PhysicalSortExpr, create_physical_sort_exprs, + LexOrdering, PhysicalSortExpr, create_physical_sort_exprs_with_subquery_context, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; @@ -444,27 +449,25 @@ impl DefaultPhysicalPlanner { if links.is_empty() { return self - .create_initial_plan_inner(logical_plan, session_state) + .create_initial_plan_inner( + logical_plan, + session_state, + &SubqueryContext::default(), + ) .await; } - // Create the shared `ScalarSubqueryResults` container and register - // it in `ExecutionProps` so that `create_physical_expr` can resolve - // `Expr::ScalarSubquery` into `ScalarSubqueryExpr` nodes. We clone - // the `SessionState` so these are available throughout physical - // planning without mutating the caller's state. - // - // Ideally, the subquery state would live in a dedicated planning - // context rather than in `ExecutionProps`. It's here because - // `create_physical_expr` only receives `&ExecutionProps`. + // Build a `SubqueryContext` that carries the index map and shared + // results container into expression lowering. The context is + // threaded explicitly through physical planning rather than being + // stashed in `ExecutionProps`, so that the planner does not need + // a mutable `SessionState` and external callers of + // `create_physical_expr` are unaffected. let results = ScalarSubqueryResults::new(links.len()); - let mut owned = session_state.clone(); - owned.execution_props_mut().subquery_indexes = index_map; - owned.execution_props_mut().subquery_results = results.clone(); - let session_state = Cow::Owned(owned); + let subquery_ctx = SubqueryContext::new(index_map, results.clone()); let plan = self - .create_initial_plan_inner(logical_plan, &session_state) + .create_initial_plan_inner(logical_plan, session_state, &subquery_ctx) .await?; Ok(Arc::new(ScalarSubqueryExec::new(plan, links, results))) }) @@ -476,6 +479,7 @@ impl DefaultPhysicalPlanner { &self, logical_plan: &LogicalPlan, session_state: &SessionState, + subquery_ctx: &SubqueryContext, ) -> Result> { // DFS the tree to flatten it into a Vec. // This will allow us to build the Physical Plan from the leaves up @@ -526,9 +530,9 @@ impl DefaultPhysicalPlanner { let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len()); // Spawning tasks which will traverse leaf up to the root. - let tasks = flat_tree_leaf_indices - .into_iter() - .map(|index| self.task_helper(index, Arc::clone(&flat_tree), session_state)); + let tasks = flat_tree_leaf_indices.into_iter().map(|index| { + self.task_helper(index, Arc::clone(&flat_tree), session_state, subquery_ctx) + }); let mut outputs = futures::stream::iter(tasks) .buffer_unordered(max_concurrency) .try_collect::>() @@ -556,6 +560,7 @@ impl DefaultPhysicalPlanner { leaf_starter_index: usize, flat_tree: Arc>>, session_state: &'a SessionState, + subquery_ctx: &'a SubqueryContext, ) -> Result>> { // We always start with a leaf, so can ignore status and pass empty children let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| { @@ -567,6 +572,7 @@ impl DefaultPhysicalPlanner { .map_logical_node_to_physical( node.node, session_state, + subquery_ctx, ChildrenContainer::None, ) .await?; @@ -584,6 +590,7 @@ impl DefaultPhysicalPlanner { .map_logical_node_to_physical( node.node, session_state, + subquery_ctx, ChildrenContainer::One(plan), ) .await?; @@ -620,7 +627,12 @@ impl DefaultPhysicalPlanner { let children = children.into_iter().map(|epc| epc.plan).collect(); let children = ChildrenContainer::Multiple(children); plan = self - .map_logical_node_to_physical(node.node, session_state, children) + .map_logical_node_to_physical( + node.node, + session_state, + subquery_ctx, + children, + ) .await?; } } @@ -635,6 +647,7 @@ impl DefaultPhysicalPlanner { &self, node: &LogicalPlan, session_state: &SessionState, + subquery_ctx: &SubqueryContext, children: ChildrenContainer, ) -> Result> { let execution_props = session_state.execution_props(); @@ -694,7 +707,12 @@ impl DefaultPhysicalPlanner { .map(|row| { row.iter() .map(|expr| { - create_physical_expr(expr, schema, execution_props) + create_physical_expr_with_subquery_context( + expr, + schema, + execution_props, + subquery_ctx, + ) }) .collect::>>>() }) @@ -953,7 +971,14 @@ impl DefaultPhysicalPlanner { let logical_schema = node.schema(); let window_expr = window_expr .iter() - .map(|e| create_window_expr(e, logical_schema, execution_props)) + .map(|e| { + create_window_expr_with_subquery_context( + e, + logical_schema, + execution_props, + subquery_ctx, + ) + }) .collect::>>()?; let can_repartition = session_state.config().target_partitions() > 1 @@ -1059,6 +1084,7 @@ impl DefaultPhysicalPlanner { logical_input_schema, &physical_input_schema, execution_props, + subquery_ctx, )?; let agg_filter = aggr_expr @@ -1070,6 +1096,7 @@ impl DefaultPhysicalPlanner { &physical_input_schema, execution_props, ) + .with_subquery_context(subquery_ctx) .build() .map(lowered_aggregate_to_tuple) }) @@ -1166,6 +1193,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec_with_props( execution_props, + subquery_ctx, children.one()?, input, expr, @@ -1175,8 +1203,12 @@ impl DefaultPhysicalPlanner { }) => { let physical_input = children.one()?; let input_dfschema = input.schema(); - let runtime_expr = - create_physical_expr(predicate, input_dfschema, execution_props)?; + let runtime_expr = create_physical_expr_with_subquery_context( + predicate, + input_dfschema, + execution_props, + subquery_ctx, + )?; let input_schema = input.schema(); let filter = match self.try_plan_async_exprs( @@ -1242,7 +1274,12 @@ impl DefaultPhysicalPlanner { let runtime_expr = expr .iter() .map(|e| { - create_physical_expr(e, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + e, + input_dfschema, + execution_props, + subquery_ctx, + ) }) .collect::>>()?; Partitioning::Hash(runtime_expr, *n) @@ -1263,8 +1300,12 @@ impl DefaultPhysicalPlanner { }) => { let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); - let sort_exprs = - create_physical_sort_exprs(expr, input_dfschema, execution_props)?; + let sort_exprs = create_physical_sort_exprs_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; let Some(ordering) = LexOrdering::new(sort_exprs) else { return internal_err!( "SortExec requires at least one sort expression" @@ -1393,6 +1434,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Projection(Projection { input, expr, .. }), ) => self.create_project_physical_exec_with_props( execution_props, + subquery_ctx, physical_left, input, expr, @@ -1406,6 +1448,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Projection(Projection { input, expr, .. }), ) => self.create_project_physical_exec_with_props( execution_props, + subquery_ctx, physical_right, input, expr, @@ -1473,9 +1516,18 @@ impl DefaultPhysicalPlanner { let join_on = keys .iter() .map(|(l, r)| { - let l = create_physical_expr(l, left_df_schema, execution_props)?; - let r = - create_physical_expr(r, right_df_schema, execution_props)?; + let l = create_physical_expr_with_subquery_context( + l, + left_df_schema, + execution_props, + subquery_ctx, + )?; + let r = create_physical_expr_with_subquery_context( + r, + right_df_schema, + execution_props, + subquery_ctx, + )?; Ok((l, r)) }) .collect::>()?; @@ -1572,10 +1624,11 @@ impl DefaultPhysicalPlanner { let filter_schema = Schema::new_with_metadata(filter_fields, metadata); - let filter_expr = create_physical_expr( + let filter_expr = create_physical_expr_with_subquery_context( expr, &filter_df_schema, execution_props, + subquery_ctx, )?; let column_indices = join_utils::JoinFilter::build_column_indices( left_field_indices, @@ -1692,15 +1745,17 @@ impl DefaultPhysicalPlanner { ); } - let on_left = create_physical_expr( + let on_left = create_physical_expr_with_subquery_context( lhs_logical, left_df_schema, execution_props, + subquery_ctx, )?; - let on_right = create_physical_expr( + let on_right = create_physical_expr_with_subquery_context( rhs_logical, right_df_schema, execution_props, + subquery_ctx, )?; Arc::new(PiecewiseMergeJoinExec::try_new( @@ -1772,6 +1827,7 @@ impl DefaultPhysicalPlanner { if let Some((input, expr)) = new_project { self.create_project_physical_exec_with_props( execution_props, + subquery_ctx, join, input, expr, @@ -1870,6 +1926,7 @@ impl DefaultPhysicalPlanner { input_dfschema: &DFSchema, input_schema: &Schema, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result { if group_expr.len() == 1 { match &group_expr[0] { @@ -1879,6 +1936,7 @@ impl DefaultPhysicalPlanner { input_dfschema, input_schema, execution_props, + subquery_ctx, ) } Expr::GroupingSet(GroupingSet::Cube(exprs)) => create_cube_physical_expr( @@ -1886,6 +1944,7 @@ impl DefaultPhysicalPlanner { input_dfschema, input_schema, execution_props, + subquery_ctx, ), Expr::GroupingSet(GroupingSet::Rollup(exprs)) => { create_rollup_physical_expr( @@ -1893,10 +1952,16 @@ impl DefaultPhysicalPlanner { input_dfschema, input_schema, execution_props, + subquery_ctx, ) } expr => Ok(PhysicalGroupBy::new_single(vec![tuple_err(( - create_physical_expr(expr, input_dfschema, execution_props), + create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + ), physical_name(expr), ))?])), } @@ -1910,7 +1975,12 @@ impl DefaultPhysicalPlanner { .iter() .map(|e| { tuple_err(( - create_physical_expr(e, input_dfschema, execution_props), + create_physical_expr_with_subquery_context( + e, + input_dfschema, + execution_props, + subquery_ctx, + ), physical_name(e), )) }) @@ -1935,6 +2005,7 @@ fn merge_grouping_set_physical_expr( input_dfschema: &DFSchema, input_schema: &Schema, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result { let num_groups = grouping_sets.len(); let mut all_exprs: Vec = vec![]; @@ -1949,6 +2020,7 @@ fn merge_grouping_set_physical_expr( expr, input_dfschema, execution_props, + subquery_ctx, )?); null_exprs.push(get_null_physical_expr_pair( @@ -1956,6 +2028,7 @@ fn merge_grouping_set_physical_expr( input_dfschema, input_schema, execution_props, + subquery_ctx, )?); } } @@ -1986,6 +2059,7 @@ fn create_cube_physical_expr( input_dfschema: &DFSchema, input_schema: &Schema, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result { let num_of_exprs = exprs.len(); let num_groups = num_of_exprs * num_of_exprs; @@ -2001,12 +2075,14 @@ fn create_cube_physical_expr( input_dfschema, input_schema, execution_props, + subquery_ctx, )?); all_exprs.push(get_physical_expr_pair( expr, input_dfschema, execution_props, + subquery_ctx, )?) } @@ -2032,6 +2108,7 @@ fn create_rollup_physical_expr( input_dfschema: &DFSchema, input_schema: &Schema, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result { let num_of_exprs = exprs.len(); @@ -2048,12 +2125,14 @@ fn create_rollup_physical_expr( input_dfschema, input_schema, execution_props, + subquery_ctx, )?); all_exprs.push(get_physical_expr_pair( expr, input_dfschema, execution_props, + subquery_ctx, )?) } @@ -2080,8 +2159,14 @@ fn get_null_physical_expr_pair( input_dfschema: &DFSchema, input_schema: &Schema, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result<(Arc, String)> { - let physical_expr = create_physical_expr(expr, input_dfschema, execution_props)?; + let physical_expr = create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; let physical_name = physical_name(&expr.clone())?; let data_type = physical_expr.data_type(input_schema)?; @@ -2151,8 +2236,14 @@ fn get_physical_expr_pair( expr: &Expr, input_dfschema: &DFSchema, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result<(Arc, String)> { - let physical_expr = create_physical_expr(expr, input_dfschema, execution_props)?; + let physical_expr = create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; let physical_name = physical_name(expr)?; Ok((physical_expr, physical_name)) } @@ -2410,6 +2501,24 @@ pub fn create_window_expr_with_name( name: impl Into, logical_schema: &DFSchema, execution_props: &ExecutionProps, +) -> Result> { + create_window_expr_with_name_and_subquery_context( + e, + name, + logical_schema, + execution_props, + &SubqueryContext::default(), + ) +} + +/// Create a window expression with a name from a logical expression, threading +/// an explicit [`SubqueryContext`] for scalar-subquery lowering. +pub fn create_window_expr_with_name_and_subquery_context( + e: &Expr, + name: impl Into, + logical_schema: &DFSchema, + execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result> { let name = name.into(); let physical_schema = Arc::clone(logical_schema.inner()); @@ -2428,12 +2537,24 @@ pub fn create_window_expr_with_name( filter, }, } = window_fun.as_ref(); - let physical_args = - create_physical_exprs(args, logical_schema, execution_props)?; - let partition_by = - create_physical_exprs(partition_by, logical_schema, execution_props)?; - let order_by = - create_physical_sort_exprs(order_by, logical_schema, execution_props)?; + let physical_args = create_physical_exprs_with_subquery_context( + args, + logical_schema, + execution_props, + subquery_ctx, + )?; + let partition_by = create_physical_exprs_with_subquery_context( + partition_by, + logical_schema, + execution_props, + subquery_ctx, + )?; + let order_by = create_physical_sort_exprs_with_subquery_context( + order_by, + logical_schema, + execution_props, + subquery_ctx, + )?; if !is_window_frame_bound_valid(window_frame) { return plan_err!( @@ -2448,7 +2569,14 @@ pub fn create_window_expr_with_name( == NullTreatment::IgnoreNulls; let physical_filter = filter .as_ref() - .map(|f| create_physical_expr(f, logical_schema, execution_props)) + .map(|f| { + create_physical_expr_with_subquery_context( + f, + logical_schema, + execution_props, + subquery_ctx, + ) + }) .transpose()?; windows::create_window_expr( @@ -2473,6 +2601,22 @@ pub fn create_window_expr( e: &Expr, logical_schema: &DFSchema, execution_props: &ExecutionProps, +) -> Result> { + create_window_expr_with_subquery_context( + e, + logical_schema, + execution_props, + &SubqueryContext::default(), + ) +} + +/// Like [`create_window_expr`] but with an explicit [`SubqueryContext`] for +/// lowering scalar subqueries inside the window expression. +pub fn create_window_expr_with_subquery_context( + e: &Expr, + logical_schema: &DFSchema, + execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result> { // unpack aliased logical expressions, e.g. "sum(col) over () as total" let (name, e) = match e { @@ -2482,7 +2626,13 @@ pub fn create_window_expr( ), _ => (e.schema_name().to_string(), e.clone()), }; - create_window_expr_with_name(&e, name, logical_schema, execution_props) + create_window_expr_with_name_and_subquery_context( + &e, + name, + logical_schema, + execution_props, + subquery_ctx, + ) } type AggregateExprWithOptionalArgs = ( @@ -2935,6 +3085,7 @@ impl DefaultPhysicalPlanner { fn create_project_physical_exec_with_props( &self, execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, input_exec: Arc, input: &Arc, expr: &[Expr], @@ -2972,8 +3123,12 @@ impl DefaultPhysicalPlanner { physical_name(e) }; - let physical_expr = - create_physical_expr(e, input_logical_schema, execution_props); + let physical_expr = create_physical_expr_with_subquery_context( + e, + input_logical_schema, + execution_props, + subquery_ctx, + ); tuple_err((physical_expr, physical_name)) }) @@ -3555,6 +3710,7 @@ mod tests { logical_input_schema, physical_input_schema, session_state.execution_props(), + &SubqueryContext::default(), ); insta::assert_debug_snapshot!(cube, @r#" @@ -3686,6 +3842,7 @@ mod tests { logical_input_schema, physical_input_schema, session_state.execution_props(), + &SubqueryContext::default(), ); insta::assert_debug_snapshot!(rollup, @r#" diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 649f74ed3997c..b6a5d88630da5 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -64,12 +64,6 @@ pub struct ExecutionProps { pub config_options: Option>, /// Providers for scalar variables pub var_providers: Option>>, - /// Maps each logical `Subquery` to its index in `subquery_results`. - /// Populated by the physical planner before calling `create_physical_expr`. - pub subquery_indexes: HashMap, - /// Shared results container for uncorrelated scalar subquery values. - /// Populated at execution time by `ScalarSubqueryExec`. - pub subquery_results: ScalarSubqueryResults, /// Maps each lambda variable name to its lambda qualifier generated /// during physical planning. Populated by the physical planner for /// each lambda before calling `create_physical_expr`. @@ -90,8 +84,6 @@ impl ExecutionProps { alias_generator: Arc::new(AliasGenerator::new()), config_options: None, var_providers: None, - subquery_indexes: HashMap::new(), - subquery_results: ScalarSubqueryResults::default(), lambda_variable_qualifier: HashMap::new(), } } @@ -169,6 +161,57 @@ impl ExecutionProps { } } +/// Per-plan context used by the physical planner to lower +/// [`Expr::ScalarSubquery`] expressions into physical expressions that +/// read from a shared [`ScalarSubqueryResults`] container. +/// +/// The physical planner builds this from the set of uncorrelated scalar +/// subqueries it has scheduled for execution. It is then passed +/// explicitly through `create_physical_expr_with_subquery_context` so +/// the lowering can find the slot index for each [`Subquery`]. +/// +/// An empty [`SubqueryContext`] (the [`Default`]) is what every +/// non-physical-planner caller passes; if such a caller encounters a +/// scalar subquery the lowering returns a `not_impl_err`. +/// +/// [`Expr::ScalarSubquery`]: crate::Expr::ScalarSubquery +/// [`Subquery`]: crate::logical_plan::Subquery +#[derive(Clone, Debug, Default)] +pub struct SubqueryContext { + indexes: HashMap, + results: ScalarSubqueryResults, +} + +impl SubqueryContext { + /// Create a [`SubqueryContext`] from an index map and a shared results + /// container. The index map must use the same indices as slots in + /// `results`. + pub fn new( + indexes: HashMap, + results: ScalarSubqueryResults, + ) -> Self { + Self { indexes, results } + } + + /// Returns the slot index assigned to `subquery`, if any. + pub fn index_of( + &self, + subquery: &crate::logical_plan::Subquery, + ) -> Option { + self.indexes.get(subquery).copied() + } + + /// Returns the shared results container. + pub fn results(&self) -> &ScalarSubqueryResults { + &self.results + } + + /// Returns true if no subqueries are registered. + pub fn is_empty(&self) -> bool { + self.indexes.is_empty() + } +} + /// Index of a scalar subquery within a [`ScalarSubqueryResults`] container. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct SubqueryIndex(usize); @@ -274,7 +317,7 @@ mod test { fn debug() { let props = ExecutionProps::new(); assert_eq!( - "ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [], lambda_variable_qualifier: {} }", + "ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, lambda_variable_qualifier: {} }", format!("{props:?}") ); } diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index e5d55aba4f51c..562ff36fc9160 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -38,8 +38,11 @@ use std::fmt::Debug; use std::sync::Arc; use crate::expressions::Column; -use crate::physical_expr::create_physical_sort_exprs; -use crate::planner::{create_physical_expr, create_physical_exprs}; +use crate::physical_expr::create_physical_sort_exprs_with_subquery_context; +use crate::planner::{ + create_physical_expr_with_subquery_context, + create_physical_exprs_with_subquery_context, +}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; @@ -47,7 +50,7 @@ use datafusion_common::metadata::FieldMetadata; use datafusion_common::{ DFSchema, Result, ScalarValue, assert_or_internal_err, internal_err, not_impl_err, }; -use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::execution_props::{ExecutionProps, SubqueryContext}; use datafusion_expr::expr::{ AggregateFunction, AggregateFunctionParams, NullTreatment, physical_name, }; @@ -423,6 +426,7 @@ pub struct LoweredAggregateBuilder<'a> { logical_input_schema: &'a DFSchema, physical_input_schema: &'a Schema, execution_props: &'a ExecutionProps, + subquery_ctx: Option<&'a SubqueryContext>, } impl<'a> LoweredAggregateBuilder<'a> { @@ -446,9 +450,18 @@ impl<'a> LoweredAggregateBuilder<'a> { logical_input_schema, physical_input_schema, execution_props, + subquery_ctx: None, } } + /// Provide a [`SubqueryContext`] to be used when lowering expressions that + /// reference uncorrelated scalar subqueries. If unset, scalar-subquery + /// expressions will fail to lower. + pub fn with_subquery_context(mut self, subquery_ctx: &'a SubqueryContext) -> Self { + self.subquery_ctx = Some(subquery_ctx); + self + } + /// Override the output column name for the aggregate. /// /// If this is not set, the builder uses the alias from `expr` when present, @@ -484,8 +497,18 @@ impl<'a> LoweredAggregateBuilder<'a> { logical_input_schema, physical_input_schema, execution_props, + subquery_ctx, } = self; + let default_ctx; + let subquery_ctx = match subquery_ctx { + Some(ctx) => ctx, + None => { + default_ctx = SubqueryContext::default(); + &default_ctx + } + }; + let (name, human_display, output_metadata, expr) = lower_aggregate_display( expr, name, @@ -515,16 +538,29 @@ impl<'a> LoweredAggregateBuilder<'a> { physical_name(&expr)? }; - let physical_args = - create_physical_exprs(args, logical_input_schema, execution_props)?; + let physical_args = create_physical_exprs_with_subquery_context( + args, + logical_input_schema, + execution_props, + subquery_ctx, + )?; let filter = filter .as_ref() .map(|filter| { - create_physical_expr(filter, logical_input_schema, execution_props) + create_physical_expr_with_subquery_context( + filter, + logical_input_schema, + execution_props, + subquery_ctx, + ) }) .transpose()?; - let order_bys = - create_physical_sort_exprs(order_by, logical_input_schema, execution_props)?; + let order_bys = create_physical_sort_exprs_with_subquery_context( + order_by, + logical_input_schema, + execution_props, + subquery_ctx, + )?; let ignore_nulls = null_treatment.unwrap_or(NullTreatment::RespectNulls) == NullTreatment::IgnoreNulls; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 848bf81d15979..563b51ffd5a0d 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -61,8 +61,10 @@ pub use equivalence::{ pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering, - create_ordering, create_physical_sort_expr, create_physical_sort_exprs, - physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, + create_ordering, create_physical_sort_expr, + create_physical_sort_expr_with_subquery_context, create_physical_sort_exprs, + create_physical_sort_exprs_with_subquery_context, physical_exprs_bag_equal, + physical_exprs_contains, physical_exprs_equal, }; pub use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, PhysicalExprRef}; @@ -72,7 +74,10 @@ pub use datafusion_physical_expr_common::sort_expr::{ }; pub use higher_order_function::HigherOrderFunctionExpr; -pub use planner::{create_physical_expr, create_physical_exprs}; +pub use planner::{ + create_physical_expr, create_physical_expr_with_subquery_context, + create_physical_exprs, create_physical_exprs_with_subquery_context, +}; pub use scalar_function::ScalarFunctionExpr; pub use simplifier::PhysicalExprSimplifier; pub use utils::{conjunction, conjunction_opt, split_conjunction}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 77ede76e1daa8..46ca57e720b53 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -18,14 +18,14 @@ use std::sync::Arc; use crate::expressions::{self, Column}; -use crate::{LexOrdering, PhysicalSortExpr, create_physical_expr}; +use crate::{LexOrdering, PhysicalSortExpr, create_physical_expr_with_subquery_context}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{DFSchema, HashMap}; use datafusion_common::{Result, plan_err}; -use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::execution_props::{ExecutionProps, SubqueryContext}; use datafusion_expr::{Expr, SortExpr}; use itertools::izip; @@ -198,7 +198,29 @@ pub fn create_physical_sort_expr( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result { - create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| { + create_physical_sort_expr_with_subquery_context( + e, + input_dfschema, + execution_props, + &SubqueryContext::default(), + ) +} + +/// Create a physical sort expression from a logical expression, threading an +/// explicit [`SubqueryContext`] for scalar-subquery lowering. +pub fn create_physical_sort_expr_with_subquery_context( + e: &SortExpr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, +) -> Result { + create_physical_expr_with_subquery_context( + &e.expr, + input_dfschema, + execution_props, + subquery_ctx, + ) + .map(|expr| { let options = SortOptions::new(!e.asc, e.nulls_first); PhysicalSortExpr::new(expr, options) }) @@ -209,10 +231,33 @@ pub fn create_physical_sort_exprs( exprs: &[SortExpr], input_dfschema: &DFSchema, execution_props: &ExecutionProps, +) -> Result> { + create_physical_sort_exprs_with_subquery_context( + exprs, + input_dfschema, + execution_props, + &SubqueryContext::default(), + ) +} + +/// Create vector of physical sort expressions from a vector of logical +/// expressions with an explicit [`SubqueryContext`]. +pub fn create_physical_sort_exprs_with_subquery_context( + exprs: &[SortExpr], + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result> { exprs .iter() - .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props)) + .map(|e| { + create_physical_sort_expr_with_subquery_context( + e, + input_dfschema, + execution_props, + subquery_ctx, + ) + }) .collect() } diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index d0d0508a106a5..cc735244ede52 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -32,7 +32,7 @@ use datafusion_common::{ DFSchema, Result, ScalarValue, TableReference, ToDFSchema, exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, }; -use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::execution_props::{ExecutionProps, SubqueryContext}; use datafusion_expr::expr::{ Alias, Cast, HigherOrderFunction, InList, Lambda, LambdaVariable, Placeholder, ScalarFunction, @@ -111,11 +111,32 @@ use datafusion_expr::{ /// * `e` - The logical expression /// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references /// to qualified or unqualified fields by name. -#[cfg_attr(feature = "recursive_protection", recursive::recursive)] pub fn create_physical_expr( e: &Expr, input_dfschema: &DFSchema, execution_props: &ExecutionProps, +) -> Result> { + create_physical_expr_with_subquery_context( + e, + input_dfschema, + execution_props, + &SubqueryContext::default(), + ) +} + +/// Create a physical expression from a logical expression with an explicit +/// [`SubqueryContext`] used to resolve `Expr::ScalarSubquery` nodes. +/// +/// Most callers should use [`create_physical_expr`], which is equivalent to +/// passing an empty context. The physical planner uses this variant to thread +/// the subquery index map and shared results container from its +/// `ScalarSubqueryExec` construction down into expression lowering. +#[cfg_attr(feature = "recursive_protection", recursive::recursive)] +pub fn create_physical_expr_with_subquery_context( + e: &Expr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, ) -> Result> { let input_schema = input_dfschema.as_arrow(); @@ -131,7 +152,12 @@ pub fn create_physical_expr( new_metadata, ))) } else { - Ok(create_physical_expr(expr, input_dfschema, execution_props)?) + Ok(create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?) } } Expr::Column(c) => { @@ -167,12 +193,22 @@ pub fn create_physical_expr( Operator::IsNotDistinctFrom, lit(true), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + &binary_op, + input_dfschema, + execution_props, + subquery_ctx, + ) } Expr::IsNotTrue(expr) => { let binary_op = binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true)); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + &binary_op, + input_dfschema, + execution_props, + subquery_ctx, + ) } Expr::IsFalse(expr) => { let binary_op = binary_expr( @@ -180,12 +216,22 @@ pub fn create_physical_expr( Operator::IsNotDistinctFrom, lit(false), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + &binary_op, + input_dfschema, + execution_props, + subquery_ctx, + ) } Expr::IsNotFalse(expr) => { let binary_op = binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false)); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + &binary_op, + input_dfschema, + execution_props, + subquery_ctx, + ) } Expr::IsUnknown(expr) => { let binary_op = binary_expr( @@ -193,7 +239,12 @@ pub fn create_physical_expr( Operator::IsNotDistinctFrom, Expr::Literal(ScalarValue::Boolean(None), None), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + &binary_op, + input_dfschema, + execution_props, + subquery_ctx, + ) } Expr::IsNotUnknown(expr) => { let binary_op = binary_expr( @@ -201,12 +252,27 @@ pub fn create_physical_expr( Operator::IsDistinctFrom, Expr::Literal(ScalarValue::Boolean(None), None), ); - create_physical_expr(&binary_op, input_dfschema, execution_props) + create_physical_expr_with_subquery_context( + &binary_op, + input_dfschema, + execution_props, + subquery_ctx, + ) } Expr::BinaryExpr(BinaryExpr { left, op, right }) => { // Create physical expressions for left and right operands - let lhs = create_physical_expr(left, input_dfschema, execution_props)?; - let rhs = create_physical_expr(right, input_dfschema, execution_props)?; + let lhs = create_physical_expr_with_subquery_context( + left, + input_dfschema, + execution_props, + subquery_ctx, + )?; + let rhs = create_physical_expr_with_subquery_context( + right, + input_dfschema, + execution_props, + subquery_ctx, + )?; // Note that the logical planner is responsible // for type coercion on the arguments (e.g. if one // argument was originally Int32 and one was @@ -229,10 +295,18 @@ pub fn create_physical_expr( "LIKE does not support escape_char other than the backslash (\\)" ); } - let physical_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - let physical_pattern = - create_physical_expr(pattern, input_dfschema, execution_props)?; + let physical_expr = create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; + let physical_pattern = create_physical_expr_with_subquery_context( + pattern, + input_dfschema, + execution_props, + subquery_ctx, + )?; like( *negated, *case_insensitive, @@ -251,18 +325,27 @@ pub fn create_physical_expr( if escape_char.is_some() { return exec_err!("SIMILAR TO does not support escape_char yet"); } - let physical_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; - let physical_pattern = - create_physical_expr(pattern, input_dfschema, execution_props)?; + let physical_expr = create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; + let physical_pattern = create_physical_expr_with_subquery_context( + pattern, + input_dfschema, + execution_props, + subquery_ctx, + )?; similar_to(*negated, *case_insensitive, physical_expr, physical_pattern) } Expr::Case(case) => { let expr: Option> = if let Some(e) = &case.expr { - Some(create_physical_expr( + Some(create_physical_expr_with_subquery_context( e.as_ref(), input_dfschema, execution_props, + subquery_ctx, )?) } else { None @@ -272,10 +355,18 @@ pub fn create_physical_expr( .iter() .map(|(w, t)| (w.as_ref(), t.as_ref())) .unzip(); - let when_expr = - create_physical_exprs(when_expr, input_dfschema, execution_props)?; - let then_expr = - create_physical_exprs(then_expr, input_dfschema, execution_props)?; + let when_expr = create_physical_exprs_with_subquery_context( + when_expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; + let then_expr = create_physical_exprs_with_subquery_context( + then_expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; let when_then_expr: Vec<(Arc, Arc)> = when_expr .iter() @@ -284,10 +375,11 @@ pub fn create_physical_expr( .collect(); let else_expr: Option> = if let Some(e) = &case.else_expr { - Some(create_physical_expr( + Some(create_physical_expr_with_subquery_context( e.as_ref(), input_dfschema, execution_props, + subquery_ctx, )?) } else { None @@ -295,7 +387,12 @@ pub fn create_physical_expr( Ok(expressions::case(expr, when_then_expr, else_expr)?) } Expr::Cast(Cast { expr, field }) => expressions::cast_with_target_field( - create_physical_expr(expr, input_dfschema, execution_props)?, + create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?, input_schema, Arc::clone(field), None, @@ -314,31 +411,54 @@ pub fn create_physical_expr( } expressions::try_cast( - create_physical_expr(expr, input_dfschema, execution_props)?, + create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?, input_schema, field.data_type().clone(), ) } - Expr::Not(expr) => { - expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?) - } - Expr::Negative(expr) => expressions::negative( - create_physical_expr(expr, input_dfschema, execution_props)?, - input_schema, - ), - Expr::IsNull(expr) => expressions::is_null(create_physical_expr( - expr, - input_dfschema, - execution_props, - )?), - Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr( + Expr::Not(expr) => expressions::not(create_physical_expr_with_subquery_context( expr, input_dfschema, execution_props, + subquery_ctx, )?), + Expr::Negative(expr) => expressions::negative( + create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?, + input_schema, + ), + Expr::IsNull(expr) => { + expressions::is_null(create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?) + } + Expr::IsNotNull(expr) => { + expressions::is_not_null(create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?) + } Expr::ScalarFunction(ScalarFunction { func, args }) => { - let physical_args = - create_physical_exprs(args, input_dfschema, execution_props)?; + let physical_args = create_physical_exprs_with_subquery_context( + args, + input_dfschema, + execution_props, + subquery_ctx, + )?; let config_options = match execution_props.config_options.as_ref() { Some(config_options) => Arc::clone(config_options), None => Arc::new(ConfigOptions::default()), @@ -357,9 +477,24 @@ pub fn create_physical_expr( low, high, }) => { - let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?; - let low_expr = create_physical_expr(low, input_dfschema, execution_props)?; - let high_expr = create_physical_expr(high, input_dfschema, execution_props)?; + let value_expr = create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; + let low_expr = create_physical_expr_with_subquery_context( + low, + input_dfschema, + execution_props, + subquery_ctx, + )?; + let high_expr = create_physical_expr_with_subquery_context( + high, + input_dfschema, + execution_props, + subquery_ctx, + )?; // rewrite the between into the two binary operators let binary_expr = binary( @@ -394,17 +529,25 @@ pub fn create_physical_expr( Ok(expressions::lit(ScalarValue::Boolean(None))) } _ => { - let value_expr = - create_physical_expr(expr, input_dfschema, execution_props)?; + let value_expr = create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + )?; - let list_exprs = - create_physical_exprs(list, input_dfschema, execution_props)?; + let list_exprs = create_physical_exprs_with_subquery_context( + list, + input_dfschema, + execution_props, + subquery_ctx, + )?; expressions::in_list(value_expr, list_exprs, negated, input_schema) } }, Expr::ScalarSubquery(sq) => { - match execution_props.subquery_indexes.get(sq) { - Some(&index) => { + match subquery_ctx.index_of(sq) { + Some(index) => { let schema = sq.subquery.schema(); if schema.fields().len() != 1 { return plan_err!( @@ -418,7 +561,7 @@ pub fn create_physical_expr( dt, nullable, index, - execution_props.subquery_results.clone(), + subquery_ctx.results().clone(), ))) } None => { @@ -495,9 +638,19 @@ pub fn create_physical_expr( .clone() .with_qualified_lambda_variables(&qualifier, &lambda.params); - create_physical_expr(arg, &lambda_schema, &execution_props) + create_physical_expr_with_subquery_context( + arg, + &lambda_schema, + &execution_props, + subquery_ctx, + ) } - _ => create_physical_expr(arg, input_dfschema, execution_props), + _ => create_physical_expr_with_subquery_context( + arg, + input_dfschema, + execution_props, + subquery_ctx, + ), }) .collect::>()?; @@ -515,7 +668,12 @@ pub fn create_physical_expr( } Expr::Lambda(Lambda { params, body }) => expressions::lambda( params, - create_physical_expr(body, input_dfschema, execution_props)?, + create_physical_expr_with_subquery_context( + body, + input_dfschema, + execution_props, + subquery_ctx, + )?, ), Expr::LambdaVariable(LambdaVariable { name, @@ -577,12 +735,41 @@ pub fn create_physical_exprs<'a, I>( input_dfschema: &DFSchema, execution_props: &ExecutionProps, ) -> Result>> +where + I: IntoIterator, +{ + create_physical_exprs_with_subquery_context( + exprs, + input_dfschema, + execution_props, + &SubqueryContext::default(), + ) +} + +/// Create vector of Physical Expression from a vector of logical expression +/// with an explicit [`SubqueryContext`]. +/// +/// See [`create_physical_expr_with_subquery_context`] for details on when to +/// use this variant rather than [`create_physical_exprs`]. +pub fn create_physical_exprs_with_subquery_context<'a, I>( + exprs: I, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, + subquery_ctx: &SubqueryContext, +) -> Result>> where I: IntoIterator, { exprs .into_iter() - .map(|expr| create_physical_expr(expr, input_dfschema, execution_props)) + .map(|expr| { + create_physical_expr_with_subquery_context( + expr, + input_dfschema, + execution_props, + subquery_ctx, + ) + }) .collect() }