From f3062f2f41baa058438b3ac2215132f50704c539 Mon Sep 17 00:00:00 2001 From: renato2099 Date: Thu, 18 Sep 2025 09:16:54 +0200 Subject: [PATCH 1/2] fix: Not adding generated windown expr resulting column twice (#17630) --- datafusion/core/src/dataframe/mod.rs | 33 +++++++++++-------- datafusion/core/tests/dataframe/mod.rs | 44 +++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 9832c0e9db1e..1c023eaab89f 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -43,7 +43,7 @@ use crate::physical_plan::{ use crate::prelude::SessionContext; use std::any::Any; use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; @@ -2023,31 +2023,38 @@ impl DataFrame { pub fn with_column(self, name: &str, expr: Expr) -> Result { let window_func_exprs = find_window_exprs([&expr]); - let (window_fn_str, plan) = if window_func_exprs.is_empty() { - (None, self.plan) + let original_names: HashSet = self + .plan + .schema() + .iter() + .map(|(_, f)| f.name().clone()) + .collect(); + + // Maybe build window plan + let plan = if window_func_exprs.is_empty() { + self.plan } else { - ( - Some(window_func_exprs[0].to_string()), - LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?, - ) + LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)? }; - let mut col_exists = false; let new_column = expr.alias(name); + let mut col_exists = false; + let mut fields: Vec<(Expr, bool)> = plan .schema() .iter() .filter_map(|(qualifier, field)| { + // Skip new fields introduced by window_plan + if !original_names.contains(field.name()) { + return None; + } + if field.name() == name { col_exists = true; Some((new_column.clone(), true)) } else { let e = col(Column::from((qualifier, field))); - window_fn_str - .as_ref() - .filter(|s| *s == &e.to_string()) - .is_none() - .then_some((e, self.projection_requires_validation)) + Some((e, self.projection_requires_validation)) } }) .collect(); diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aa984775e457..d964637eaaca 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -38,7 +38,7 @@ use datafusion_functions_aggregate::expr_fn::{ array_agg, avg, count, count_distinct, max, median, min, sum, }; use datafusion_functions_nested::make_array::make_array_udf; -use datafusion_functions_window::expr_fn::{first_value, row_number}; +use datafusion_functions_window::expr_fn::{first_value, lead, row_number}; use insta::assert_snapshot; use object_store::local::LocalFileSystem; use std::collections::HashMap; @@ -85,6 +85,9 @@ use datafusion_physical_expr::Partitioning; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::{displayable, ExecutionPlanProperties}; +use datafusion::error::Result as DataFusionResult; +use datafusion_functions_window::expr_fn::lag; + // Get string representation of the plan async fn physical_plan_to_string(df: &DataFrame) -> String { let physical_plan = df @@ -152,6 +155,45 @@ async fn test_array_agg_ord_schema() -> Result<()> { Ok(()) } +#[tokio::test] +async fn with_column_window_functions() -> DataFusionResult<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))], + )?; + + let ctx = SessionContext::new(); + + let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; + ctx.register_table("t", Arc::new(provider))?; + + // Define test cases: (expr builder, alias name) + let test_cases: Vec<(Box Expr>, &str)> = vec![ + (Box::new(|| lag(col("a"), Some(1), None)), "lag_val"), + (Box::new(|| lead(col("a"), Some(1), None)), "lead_val"), + (Box::new(|| row_number()), "row_num"), + ]; + + for (make_expr, alias) in test_cases { + let df = ctx.table("t").await?; + let expr = make_expr(); + let df_with = df.with_column(alias, expr)?; + let df_schema = df_with.schema().clone(); + + assert!( + df_schema.has_column_with_unqualified_name(alias), + "Schema does not contain expected column {}", + alias + ); + + assert_eq!(2, df_schema.columns().len()); + } + + Ok(()) +} + #[tokio::test] async fn test_coalesce_schema() -> Result<()> { let ctx = SessionContext::new(); From b497c1132fb6ae4457e579b3aeb10c81f5cccafa Mon Sep 17 00:00:00 2001 From: renato2099 Date: Thu, 18 Sep 2025 20:32:24 +0200 Subject: [PATCH 2/2] Making clippy happier --- datafusion/core/tests/dataframe/mod.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index d964637eaaca..b0f794db892d 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -155,6 +155,8 @@ async fn test_array_agg_ord_schema() -> Result<()> { Ok(()) } +type WindowFnCase = (fn() -> Expr, &'static str); + #[tokio::test] async fn with_column_window_functions() -> DataFusionResult<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); @@ -170,10 +172,10 @@ async fn with_column_window_functions() -> DataFusionResult<()> { ctx.register_table("t", Arc::new(provider))?; // Define test cases: (expr builder, alias name) - let test_cases: Vec<(Box Expr>, &str)> = vec![ - (Box::new(|| lag(col("a"), Some(1), None)), "lag_val"), - (Box::new(|| lead(col("a"), Some(1), None)), "lead_val"), - (Box::new(|| row_number()), "row_num"), + let test_cases: Vec = vec![ + (|| lag(col("a"), Some(1), None), "lag_val"), + (|| lead(col("a"), Some(1), None), "lead_val"), + (row_number, "row_num"), ]; for (make_expr, alias) in test_cases { @@ -184,8 +186,7 @@ async fn with_column_window_functions() -> DataFusionResult<()> { assert!( df_schema.has_column_with_unqualified_name(alias), - "Schema does not contain expected column {}", - alias + "Schema does not contain expected column {alias}", ); assert_eq!(2, df_schema.columns().len());