From ba485f439370f1821c5132e93e3c5940a6c86e81 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Thu, 23 Apr 2026 00:25:48 -0400 Subject: [PATCH 1/2] fix missing outer projection reconstruction for window expressions --- datafusion/sql/src/unparser/plan.rs | 43 +++++++- datafusion/sql/tests/cases/plan_to_sql.rs | 123 ++++++++++++++++++++++ 2 files changed, 164 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 9f770f9f45e1..ac2a83a7f8c6 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -912,13 +912,52 @@ impl Unparser<'_> { Ok(()) } LogicalPlan::Window(window) => { - // Window nodes are handled simultaneously with Projection nodes + // Window nodes are usually handled simultaneously with Projection + // nodes, where projected columns are unprojected back into their + // corresponding window expressions. Manually built plans can have + // Window nodes without an enclosing Projection, so in that case + // the Window node itself must contribute its output expressions. + let project_window_output = !select.already_projected(); + let agg = if project_window_output { + find_agg_node_within_select(plan, false) + } else { + None + }; + self.select_to_sql_recursively( window.input.as_ref(), query, select, relation, - ) + )?; + + if project_window_output { + let mut items = if select.already_projected() { + select.pop_projections() + } else { + vec![ast::SelectItem::Wildcard( + ast::WildcardAdditionalOptions::default(), + )] + }; + + items.extend( + window + .window_expr + .iter() + .map(|expr| { + let expr = if let Some(agg) = agg { + unproject_agg_exprs(expr.clone(), agg, None)? + } else { + expr.clone() + }; + self.select_item_to_sql(&expr) + }) + .collect::>>()?, + ); + select.projection(items); + } + + Ok(()) } LogicalPlan::EmptyRelation(_) => { // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor, diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 4717b843abb5..4b49b64db027 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -35,6 +35,7 @@ use datafusion_functions_aggregate::grouping::grouping_udaf; use datafusion_functions_nested::make_array::make_array_udf; use datafusion_functions_nested::map::map_udf; use datafusion_functions_window::rank::rank_udwf; +use datafusion_functions_window::row_number::row_number_udwf; use datafusion_sql::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_sql::unparser::dialect::{ BigQueryDialect, CustomDialectBuilder, DefaultDialect as UnparserDefaultDialect, @@ -2681,6 +2682,128 @@ fn test_unparse_window() -> Result<()> { Ok(()) } +#[test] +fn test_unparse_window_over_aggregate_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("time", DataType::Int64, false), + Field::new("value", DataType::Float64, true), + ]); + let window_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![col("time").sort(true, true)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let plan = table_scan(Some("gas"), &schema, None)? + .aggregate(vec![col("time")], vec![sum(col("value")).alias("sum_n")])? + .window(vec![window_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @r#"SELECT sum(gas."value") AS sum_n, gas."time", row_number() OVER (ORDER BY gas."time" ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx FROM gas GROUP BY gas."time""# + ); + + Ok(()) +} + +#[test] +fn test_unparse_window_over_table_scan_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("k", DataType::Int32, false), + Field::new("v", DataType::Int32, false), + ]); + let window_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![col("k")], + order_by: vec![col("v").sort(true, true)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let plan = table_scan(Some("test"), &schema, None)? + .window(vec![window_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @"SELECT *, row_number() OVER (PARTITION BY test.k ORDER BY test.v ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx FROM test" + ); + + Ok(()) +} + +#[test] +fn test_unparse_stacked_windows_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("k", DataType::Int32, false), + Field::new("v", DataType::Int32, false), + ]); + let row_number_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![col("k")], + order_by: vec![col("v").sort(true, true)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let rank_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(rank_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![col("v").sort(false, false)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("rank_idx"); + let plan = table_scan(Some("test"), &schema, None)? + .window(vec![row_number_expr])? + .window(vec![rank_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @"SELECT *, row_number() OVER (PARTITION BY test.k ORDER BY test.v ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx, rank() OVER (ORDER BY test.v DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS rank_idx FROM test" + ); + + Ok(()) +} + +#[test] +fn test_array_to_sql_postgres() -> Result<(), DataFusionError> { + roundtrip_statement_with_dialect_helper!( + sql: "SELECT [1, 2, 3, 4, 5]", + parser_dialect: GenericDialect {}, + unparser_dialect: UnparserPostgreSqlDialect {}, + expected: @"SELECT ARRAY[1, 2, 3, 4, 5]", + ); + Ok(()) +} + #[test] fn test_like_filter() { let statement = generate_round_trip_statement( From f521835341ba57cf19771c81ed1076639369ec6f Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Fri, 24 Apr 2026 16:29:32 -0400 Subject: [PATCH 2/2] fix: preserve window input boundaries in plan_to_sql --- datafusion/sql/src/unparser/plan.rs | 109 +++++++++++++---- datafusion/sql/src/unparser/utils.rs | 7 +- datafusion/sql/tests/cases/plan_to_sql.rs | 136 ++++++++++++++++++++++ 3 files changed, 226 insertions(+), 26 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index dfaeb7cb71bd..f53ebf3aacbd 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -48,9 +48,9 @@ use datafusion_common::{ }; use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX}; use datafusion_expr::{ - BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, + Aggregate, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest, - UserDefinedLogicalNode, expr::Alias, + UserDefinedLogicalNode, Window, expr::Alias, }; use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef}; use std::{sync::Arc, vec}; @@ -477,6 +477,80 @@ impl Unparser<'_> { Ok(false) } + fn project_window_output( + &self, + window_expr: &[Expr], + select: &mut SelectBuilder, + agg: Option<&Aggregate>, + ) -> Result<()> { + let mut items = if select.already_projected() { + select.pop_projections() + } else { + vec![ast::SelectItem::Wildcard( + ast::WildcardAdditionalOptions::default(), + )] + }; + + items.extend( + window_expr + .iter() + .map(|expr| { + let expr = if let Some(agg) = agg { + unproject_agg_exprs(expr.clone(), agg, None)? + } else { + expr.clone() + }; + self.select_item_to_sql(&expr) + }) + .collect::>>()?, + ); + select.projection(items); + + Ok(()) + } + + fn window_input_requires_derived_subquery(plan: &LogicalPlan) -> bool { + // These operators either produce a SELECT list or apply SQL clauses + // that are evaluated after window functions in a single SELECT block. + // Keep them below the Window node by emitting a derived table. + matches!( + plan, + LogicalPlan::Projection(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Union(_) + ) + } + + fn window_to_sql_with_derived_input( + &self, + window: &Window, + select: &mut SelectBuilder, + relation: &mut RelationBuilder, + ) -> Result<()> { + let input_alias = "derived_window_input"; + self.derive( + window.input.as_ref(), + relation, + Some(self.new_table_alias(input_alias.to_string(), vec![])), + false, + )?; + + let input_schema = window.input.schema(); + let mut alias_rewriter = TableAliasRewriter { + table_schema: input_schema.as_arrow(), + alias_name: TableReference::bare(input_alias), + }; + let window_expr = window + .window_expr + .iter() + .map(|expr| expr.clone().rewrite(&mut alias_rewriter).data()) + .collect::>>()?; + + self.project_window_output(&window_expr, select, None) + } + #[cfg_attr(feature = "recursive_protection", recursive::recursive)] fn select_to_sql_recursively( &self, @@ -1165,6 +1239,13 @@ impl Unparser<'_> { // Window nodes without an enclosing Projection, so in that case // the Window node itself must contribute its output expressions. let project_window_output = !select.already_projected(); + if project_window_output + && Self::window_input_requires_derived_subquery(window.input.as_ref()) + { + return self + .window_to_sql_with_derived_input(window, select, relation); + } + let agg = if project_window_output { find_agg_node_within_select(plan, false) } else { @@ -1179,29 +1260,7 @@ impl Unparser<'_> { )?; if project_window_output { - let mut items = if select.already_projected() { - select.pop_projections() - } else { - vec![ast::SelectItem::Wildcard( - ast::WildcardAdditionalOptions::default(), - )] - }; - - items.extend( - window - .window_expr - .iter() - .map(|expr| { - let expr = if let Some(agg) = agg { - unproject_agg_exprs(expr.clone(), agg, None)? - } else { - expr.clone() - }; - self.select_item_to_sql(&expr) - }) - .collect::>>()?, - ); - select.projection(items); + self.project_window_output(&window.window_expr, select, agg)?; } Ok(()) diff --git a/datafusion/sql/src/unparser/utils.rs b/datafusion/sql/src/unparser/utils.rs index 3657516d534a..732e030b335d 100644 --- a/datafusion/sql/src/unparser/utils.rs +++ b/datafusion/sql/src/unparser/utils.rs @@ -54,7 +54,12 @@ pub(crate) fn find_agg_node_within_select( // Agg nodes explicitly return immediately with a single node if let LogicalPlan::Aggregate(agg) = input { Some(agg) - } else if let LogicalPlan::TableScan(_) = input { + } else if matches!( + input, + LogicalPlan::TableScan(_) + | LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + ) { None } else if let LogicalPlan::Projection(_) = input { if already_projected { diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 49c953d71878..db23b21cf900 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -2863,6 +2863,142 @@ fn test_unparse_stacked_windows_without_projection() -> Result<()> { Ok(()) } +#[test] +fn test_unparse_window_over_distinct_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("k", DataType::Int32, false), + Field::new("v", DataType::Int32, false), + ]); + let window_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![col("v").sort(true, true)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let plan = table_scan(Some("test"), &schema, None)? + .distinct()? + .window(vec![window_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @"SELECT *, row_number() OVER (ORDER BY derived_window_input.v ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx FROM (SELECT DISTINCT * FROM test) AS derived_window_input" + ); + + Ok(()) +} + +#[test] +fn test_unparse_window_over_limit_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("k", DataType::Int32, false), + Field::new("v", DataType::Int32, false), + ]); + let window_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![col("v").sort(true, true)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let plan = table_scan(Some("test"), &schema, None)? + .limit(0, Some(10))? + .window(vec![window_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @"SELECT *, row_number() OVER (ORDER BY derived_window_input.v ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx FROM (SELECT * FROM test LIMIT 10) AS derived_window_input" + ); + + Ok(()) +} + +#[test] +fn test_unparse_window_over_projection_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("k", DataType::Int32, false), + Field::new("v", DataType::Int32, false), + ]); + let window_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![col("v_alias").sort(true, true)], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let plan = table_scan(Some("test"), &schema, None)? + .project(vec![col("k"), col("v").alias("v_alias")])? + .window(vec![window_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @"SELECT *, row_number() OVER (ORDER BY derived_window_input.v_alias ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx FROM (SELECT test.k, test.v AS v_alias FROM test) AS derived_window_input" + ); + + Ok(()) +} + +#[test] +fn test_unparse_window_over_derived_aggregate_without_projection() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("time", DataType::Int64, false), + Field::new("value", DataType::Float64, true), + ]); + let window_expr = Expr::WindowFunction(Box::new(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(row_number_udwf()), + params: WindowFunctionParams { + args: vec![], + partition_by: vec![], + order_by: vec![ + Expr::Column(Column::new(Some(TableReference::bare("agg")), "sum_n")) + .sort(true, true), + ], + window_frame: WindowFrame::new(None), + null_treatment: None, + distinct: false, + filter: None, + }, + })) + .alias("row_idx"); + let plan = table_scan(Some("gas"), &schema, None)? + .aggregate(vec![col("time")], vec![sum(col("value")).alias("sum_n")])? + .alias("agg")? + .window(vec![window_expr])? + .build()?; + + let sql = Unparser::default().plan_to_sql(&plan)?; + assert_snapshot!( + sql, + @r#"SELECT *, row_number() OVER (ORDER BY agg.sum_n ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS row_idx FROM (SELECT sum(gas."value") AS sum_n, gas."time" FROM gas GROUP BY gas."time") AS agg"# + ); + + Ok(()) +} + #[test] fn test_array_to_sql_postgres() -> Result<(), DataFusionError> { roundtrip_statement_with_dialect_helper!(