From 363ee1ffa8f0faa4c775cdd5c39a963e04a9758c Mon Sep 17 00:00:00 2001 From: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> Date: Tue, 8 Jul 2025 23:58:51 +0400 Subject: [PATCH] fix: Fix adding missing columns for `ORDER BY` clause Signed-off-by: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> --- datafusion/core/src/logical_plan/builder.rs | 61 +++++++++++++------ .../core/src/logical_plan/expr_rewriter.rs | 17 +++--- datafusion/core/src/sql/utils.rs | 8 +++ 3 files changed, 58 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index c378a9428e06..557472abb62c 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -51,7 +51,7 @@ use crate::logical_plan::{ CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, SubqueryType, Values, }; -use crate::sql::utils::group_window_expr_by_sort_keys; +use crate::sql::utils::{group_window_expr_by_sort_keys, resolve_exprs_to_aliases}; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -549,12 +549,13 @@ impl LogicalPlanBuilder { &self, curr_plan: LogicalPlan, missing_cols: &[Column], + alias_map: &mut HashMap, ) -> Result { match curr_plan { LogicalPlan::Projection(Projection { input, mut expr, - schema: _, + schema, alias, }) if missing_cols .iter() @@ -562,10 +563,22 @@ impl LogicalPlanBuilder { { let input_schema = input.schema(); - let missing_exprs = missing_cols - .iter() - .map(|c| normalize_col(Expr::Column(c.clone()), &input)) - .collect::>>()?; + let mut missing_exprs = Vec::with_capacity(missing_cols.len()); + for missing_col in missing_cols { + let mut normalized_col = + normalize_col(Expr::Column(missing_col.clone()), &input)?; + if let Ok(old_field) = + schema.field_with_unqualified_name(&missing_col.name) + { + if old_field.qualifier().is_none() { + let expr_name = normalized_col.name(input_schema)?; + let alias = missing_col.flat_name(); + normalized_col = normalized_col.alias(&alias); + alias_map.insert(expr_name, alias); + } + } + missing_exprs.push(normalized_col); + } expr.extend(missing_exprs); @@ -586,7 +599,11 @@ impl LogicalPlanBuilder { .inputs() .into_iter() .map(|input_plan| { - self.add_missing_columns((*input_plan).clone(), missing_cols) + self.add_missing_columns( + (*input_plan).clone(), + missing_cols, + alias_map, + ) }) .collect::>>()?; @@ -607,21 +624,18 @@ impl LogicalPlanBuilder { // Collect sort columns that are missing in the input plan's schema let mut missing_cols: Vec = vec![]; + let mut columns: HashSet = HashSet::new(); exprs .clone() .into_iter() .try_for_each::<_, Result<()>>(|expr| { - let mut columns: HashSet = HashSet::new(); - utils::expr_to_columns(&expr, &mut columns)?; - - columns.into_iter().for_each(|c| { - if schema.field_from_column(&c).is_err() { - missing_cols.push(c); - } - }); - - Ok(()) + utils::expr_to_columns(&expr, &mut columns) })?; + columns.into_iter().for_each(|c| { + if schema.field_from_column(&c).is_err() { + missing_cols.push(c); + } + }); if missing_cols.is_empty() { return Ok(Self::from(LogicalPlan::Sort(Sort { @@ -630,7 +644,18 @@ impl LogicalPlanBuilder { }))); } - let plan = self.add_missing_columns(self.plan.clone(), &missing_cols)?; + let mut alias_map = HashMap::new(); + let plan = + self.add_missing_columns(self.plan.clone(), &missing_cols, &mut alias_map)?; + let exprs = if alias_map.is_empty() { + exprs + } else { + exprs + .into_iter() + .map(|expr| resolve_exprs_to_aliases(&expr, &alias_map, plan.schema())) + .collect::>>()? + }; + let sort_plan = LogicalPlan::Sort(Sort { expr: normalize_cols(exprs, &plan)?, input: Arc::new(plan.clone()), diff --git a/datafusion/core/src/logical_plan/expr_rewriter.rs b/datafusion/core/src/logical_plan/expr_rewriter.rs index e7801ec702e7..b2c8138083a7 100644 --- a/datafusion/core/src/logical_plan/expr_rewriter.rs +++ b/datafusion/core/src/logical_plan/expr_rewriter.rs @@ -399,19 +399,16 @@ fn rewrite_sort_col_by_aggs(expr: Expr, plan: &LogicalPlan) -> Result { LogicalPlan::Projection(Projection { input, expr: projection_expr, + alias, .. }) => { - let alias_map = - extract_aliased_expr_names(projection_expr, input.schema()); + let alias_map = extract_aliased_expr_names( + projection_expr, + input.schema(), + alias.is_some(), + ); let res = resolve_exprs_to_aliases(&expr, &alias_map, input.schema())?; - let res = normalize_col( - unnormalize_col(rebase_expr( - &res, - projection_expr.as_slice(), - input, - )?), - plan, - )?; + let res = rebase_expr(&res, projection_expr.as_slice(), input)?; Ok(if let LogicalPlan::Aggregate(_) = **input { rewrite_sort_col(res, input)? diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs index 651869d38dd7..033b7ff8729c 100644 --- a/datafusion/core/src/sql/utils.rs +++ b/datafusion/core/src/sql/utils.rs @@ -573,6 +573,7 @@ pub(crate) fn extract_aliases(exprs: &[Expr]) -> HashMap { pub(crate) fn extract_aliased_expr_names( exprs: &[Expr], input_schema: &DFSchema, + aliased_projection: bool, ) -> HashMap { exprs .iter() @@ -584,6 +585,13 @@ pub(crate) fn extract_aliased_expr_names( None } } + Expr::Column(column) if aliased_projection => { + if let Ok(expr_name) = expr.name(input_schema) { + Some((expr_name, column.name.clone())) + } else { + None + } + } _ => None, }) .collect::>()