From 91484b65c838ad177c2c28168a131fbfd34a0b24 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Oct 2025 14:35:46 +0800 Subject: [PATCH 1/9] Add recursive CTE handling for aliased self-references in optimizer --- .../optimizer/src/optimize_projections/mod.rs | 21 ++++++++++++++++- .../optimizer/tests/optimizer_integration.rs | 23 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 312e788db7be..5db71417bc8f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -880,7 +880,9 @@ pub fn is_projection_unnecessary( /// pushdown for now because we cannot safely reason about their column usage. fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool { if let LogicalPlan::SubqueryAlias(alias) = plan { - if alias.alias.table() != cte_name { + if alias.alias.table() != cte_name + && !subquery_alias_targets_recursive_cte(alias.input.as_ref(), cte_name) + { return true; } } @@ -913,6 +915,23 @@ fn expr_contains_subquery(expr: &Expr) -> bool { .unwrap() } +fn subquery_alias_targets_recursive_cte(plan: &LogicalPlan, cte_name: &str) -> bool { + match plan { + LogicalPlan::TableScan(scan) => scan.table_name.table() == cte_name, + LogicalPlan::SubqueryAlias(alias) => { + subquery_alias_targets_recursive_cte(alias.input.as_ref(), cte_name) + } + _ => { + let inputs = plan.inputs(); + if inputs.len() == 1 { + subquery_alias_targets_recursive_cte(inputs[0], cte_name) + } else { + false + } + } + } +} + #[cfg(test)] mod tests { use std::cmp::Ordering; diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index b17375ac01b7..c0f48b8ebfc4 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -548,6 +548,29 @@ fn recursive_cte_projection_pushdown() -> Result<()> { Ok(()) } +#[test] +fn recursive_cte_with_aliased_self_reference() -> Result<()> { + let sql = "WITH RECURSIVE nodes AS (\ + SELECT col_int32 AS id, col_utf8 AS name FROM test \ + UNION ALL \ + SELECT child.id + 1, child.name FROM nodes AS child WHERE child.id < 3\ + ) SELECT id FROM nodes"; + let plan = test_sql(sql)?; + + assert_snapshot!( + format!("{plan}"), + @r#"SubqueryAlias: nodes + RecursiveQuery: is_distinct=false + Projection: test.col_int32 AS id + TableScan: test projection=[col_int32] + Projection: CAST(CAST(child.id AS Int64) + Int64(1) AS Int32) + SubqueryAlias: child + Filter: nodes.id < Int32(3) + TableScan: nodes projection=[id]"#, + ); + Ok(()) +} + #[test] fn recursive_cte_with_unused_columns() -> Result<()> { // Test projection pushdown with a recursive CTE where the base case From bab227b472cd8d97f31e3a1cb48d3804e9d65c35 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Oct 2025 16:21:56 +0800 Subject: [PATCH 2/9] Add recursive projection pushdown test for Parquet --- datafusion/core/tests/sql/explain_analyze.rs | 87 ++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 852b350b27df..1d3b0aad44e0 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -727,6 +727,93 @@ async fn parquet_explain_analyze() { assert_contains!(&formatted, "row_groups_pruned_statistics=0"); } +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn parquet_recursive_projection_pushdown() { + use parquet::arrow::arrow_writer::ArrowWriter; + use parquet::file::properties::WriterProperties; + + let temp_dir = TempDir::new().unwrap(); + let parquet_path = temp_dir.path().join("hierarchy.parquet"); + + let ids = Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let parent_ids = Int64Array::from(vec![ + Some(0), + Some(1), + Some(1), + Some(2), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + Some(7), + ]); + let values = Int64Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("value", DataType::Int64, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids), Arc::new(parent_ids), Arc::new(values)], + ) + .unwrap(); + + let file = File::create(&parquet_path).unwrap(); + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let ctx = SessionContext::new(); + ctx.register_parquet( + "hierarchy", + parquet_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let sql = r#" + EXPLAIN ANALYZE + WITH RECURSIVE number_series AS ( + SELECT id, 1 as level + FROM hierarchy + WHERE id = 1 + + UNION ALL + + SELECT ns.id + 1, ns.level + 1 + FROM number_series ns + WHERE ns.id < 10 + ) + SELECT * FROM number_series ORDER BY id + "#; + + let actual = execute_to_batches(&ctx, sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); + + let scan_line = formatted + .lines() + .find(|line| line.contains("DataSourceExec")) + .expect("DataSourceExec not found"); + + assert!( + scan_line.contains("projection=[id]"), + "expected scan to only project id column, found: {scan_line}" + ); + assert!( + !scan_line.contains("parent_id") && !scan_line.contains("value"), + "unexpected columns projected in scan: {scan_line}" + ); +} + #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn parquet_explain_analyze_verbose() { From 118c390bd115b92ad9b98be2435ffc626d4957ba Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Oct 2025 17:58:10 +0800 Subject: [PATCH 3/9] Add comment for parquet recursive projection pushdown --- datafusion/core/tests/sql/explain_analyze.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 1d3b0aad44e0..b31d4644edf9 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -727,6 +727,11 @@ async fn parquet_explain_analyze() { assert_contains!(&formatted, "row_groups_pruned_statistics=0"); } +// This test reproduces the behavior described in +// https://github.com/apache/datafusion/issues/16684 where projection +// pushdown with recursive CTEs could fail to remove unused columns +// (e.g. nested/recursive expansion causing full schema to be scanned). +// Keeping this test ensures we don't regress that behavior. #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn parquet_recursive_projection_pushdown() { From cb08f1785fabd4aeb33f12b4f8052c399e765493 Mon Sep 17 00:00:00 2001 From: kosiew Date: Fri, 3 Oct 2025 15:44:12 +0800 Subject: [PATCH 4/9] Update datafusion/core/tests/sql/explain_analyze.rs Co-authored-by: Jeffrey Vo --- datafusion/core/tests/sql/explain_analyze.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index b31d4644edf9..fabbf3c1c846 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -742,18 +742,7 @@ async fn parquet_recursive_projection_pushdown() { let parquet_path = temp_dir.path().join("hierarchy.parquet"); let ids = Int64Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); - let parent_ids = Int64Array::from(vec![ - Some(0), - Some(1), - Some(1), - Some(2), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - Some(7), - ]); + let parent_ids = Int64Array::from(vec![0, 1, 1, 2, 2, 3, 4, 5, 6, 7]); let values = Int64Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100]); let schema = Arc::new(Schema::new(vec![ From 6a97be7279593309f3fb8e5d9d8c0267d0f3d31a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 3 Oct 2025 17:25:45 +0800 Subject: [PATCH 5/9] Refactor parquet_recursive_projection_pushdown test to return Result and improve SQL formatting --- datafusion/core/tests/sql/explain_analyze.rs | 78 ++++++++++++-------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index fabbf3c1c846..4db020e5a301 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -734,7 +734,7 @@ async fn parquet_explain_analyze() { // Keeping this test ensures we don't regress that behavior. #[tokio::test] #[cfg_attr(tarpaulin, ignore)] -async fn parquet_recursive_projection_pushdown() { +async fn parquet_recursive_projection_pushdown() -> Result<()> { use parquet::arrow::arrow_writer::ArrowWriter; use parquet::file::properties::WriterProperties; @@ -769,43 +769,61 @@ async fn parquet_recursive_projection_pushdown() { parquet_path.to_str().unwrap(), ParquetReadOptions::default(), ) - .await - .unwrap(); + .await?; let sql = r#" - EXPLAIN ANALYZE - WITH RECURSIVE number_series AS ( - SELECT id, 1 as level - FROM hierarchy - WHERE id = 1 - - UNION ALL - - SELECT ns.id + 1, ns.level + 1 - FROM number_series ns - WHERE ns.id < 10 - ) - SELECT * FROM number_series ORDER BY id + WITH RECURSIVE number_series AS ( + SELECT id, 1 as level + FROM hierarchy + WHERE id = 1 + + UNION ALL + + SELECT ns.id + 1, ns.level + 1 + FROM number_series ns + WHERE ns.id < 10 + ) + SELECT * FROM number_series ORDER BY id "#; - let actual = execute_to_batches(&ctx, sql).await; - let formatted = arrow::util::pretty::pretty_format_batches(&actual) - .unwrap() - .to_string(); + let dataframe = ctx.sql(sql).await?; + let physical_plan = dataframe.create_physical_plan().await?; - let scan_line = formatted + let normalizer = ExplainNormalizer::new(); + let actual = format!("{}", displayable(physical_plan.as_ref()).indent(true)) + .trim() .lines() - .find(|line| line.contains("DataSourceExec")) - .expect("DataSourceExec not found"); + .map(|line| normalizer.normalize(line)) + .collect::>() + .join("\n"); + let actual = temp_dir + .path() + .file_name() + .map(|name| name.to_string_lossy().to_string()) + .map(|temp_name| actual.replace(&temp_name, "TMP_DIR")) + .unwrap_or(actual); - assert!( - scan_line.contains("projection=[id]"), - "expected scan to only project id column, found: {scan_line}" - ); - assert!( - !scan_line.contains("parent_id") && !scan_line.contains("value"), - "unexpected columns projected in scan: {scan_line}" + assert_snapshot!( + actual, + @r###" + SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + RecursiveQueryExec: name=number_series, is_distinct=false + CoalescePartitionsExec + ProjectionExec: expr=[id@0 as id, 1 as level] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: id@0 = 1 + RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 + DataSourceExec: file_groups={1 group: [[tmp/TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] + CoalescePartitionsExec + ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: id@0 < 10 + RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 + WorkTableExec: name=number_series + "### ); + + Ok(()) } #[tokio::test] From d42b9f7245a7091f3dc79044ad2751b1f0871e38 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 3 Oct 2025 18:16:00 +0800 Subject: [PATCH 6/9] Fix snapshot assertion in parquet_recursive_projection_pushdown test to use correct temporary file path --- datafusion/core/tests/sql/explain_analyze.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 4db020e5a301..f80a9689fccf 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -805,7 +805,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> { assert_snapshot!( actual, - @r###" + @r" SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] RecursiveQueryExec: name=number_series, is_distinct=false CoalescePartitionsExec @@ -813,14 +813,14 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> { CoalesceBatchesExec: target_batch_size=8192 FilterExec: id@0 = 1 RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 - DataSourceExec: file_groups={1 group: [[tmp/TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] + DataSourceExec: file_groups={1 group: [[var/folders/_x/tlhn6w_12c3dxxj062jf79900000gn/T/TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] CoalescePartitionsExec ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)] CoalesceBatchesExec: target_batch_size=8192 FilterExec: id@0 < 10 RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 WorkTableExec: name=number_series - "### + " ); Ok(()) From 1b1b0510e4454b6531c2f2b2d092630fe985a32f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 3 Oct 2025 21:33:05 +0800 Subject: [PATCH 7/9] Enhance parquet_recursive_projection_pushdown test to normalize temporary directory paths in snapshots --- datafusion/core/tests/sql/explain_analyze.rs | 42 ++++++++++++++++---- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index f80a9689fccf..0969bbda593c 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -20,6 +20,7 @@ use insta::assert_snapshot; use rstest::rstest; use datafusion::config::ConfigOptions; +use object_store::path::Path; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::metrics::Timestamp; @@ -790,18 +791,43 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> { let physical_plan = dataframe.create_physical_plan().await?; let normalizer = ExplainNormalizer::new(); - let actual = format!("{}", displayable(physical_plan.as_ref()).indent(true)) + let mut actual = format!("{}", displayable(physical_plan.as_ref()).indent(true)) .trim() .lines() .map(|line| normalizer.normalize(line)) .collect::>() .join("\n"); - let actual = temp_dir - .path() - .file_name() - .map(|name| name.to_string_lossy().to_string()) - .map(|temp_name| actual.replace(&temp_name, "TMP_DIR")) - .unwrap_or(actual); + + fn replace_path_variants(actual: &mut String, path: &str) { + let mut candidates = vec![path.to_string()]; + + let trimmed = path.trim_start_matches(std::path::MAIN_SEPARATOR); + if trimmed != path { + candidates.push(trimmed.to_string()); + } + + let forward_slash = path.replace('\\', "/"); + if forward_slash != path { + candidates.push(forward_slash.clone()); + + let trimmed_forward = forward_slash.trim_start_matches('/'); + if trimmed_forward != forward_slash { + candidates.push(trimmed_forward.to_string()); + } + } + + for candidate in candidates { + *actual = actual.replace(&candidate, "TMP_DIR"); + } + } + + let temp_dir_path = temp_dir.path(); + let fs_path = temp_dir_path.to_string_lossy().to_string(); + replace_path_variants(&mut actual, &fs_path); + + if let Ok(url_path) = Path::from_filesystem_path(temp_dir_path) { + replace_path_variants(&mut actual, &url_path.to_string()); + } assert_snapshot!( actual, @@ -813,7 +839,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> { CoalesceBatchesExec: target_batch_size=8192 FilterExec: id@0 = 1 RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 - DataSourceExec: file_groups={1 group: [[var/folders/_x/tlhn6w_12c3dxxj062jf79900000gn/T/TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] + DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] CoalescePartitionsExec ProjectionExec: expr=[id@0 + 1 as ns.id + Int64(1), level@1 + 1 as ns.level + Int64(1)] CoalesceBatchesExec: target_batch_size=8192 From ba5234ac6e5a0577fc4220b01c29e9f5af0e20fa Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 3 Oct 2025 22:16:27 +0800 Subject: [PATCH 8/9] Fix fmt errors --- datafusion/core/tests/sql/explain_analyze.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 0969bbda593c..3c9b41c70afb 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -20,9 +20,9 @@ use insta::assert_snapshot; use rstest::rstest; use datafusion::config::ConfigOptions; -use object_store::path::Path; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::metrics::Timestamp; +use object_store::path::Path; #[tokio::test] async fn explain_analyze_baseline_metrics() { From bf79a56e99ec281d14f2e86657fc24ce6fc20563 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 3 Oct 2025 22:31:26 +0800 Subject: [PATCH 9/9] Fix clippy error --- datafusion/core/tests/sql/explain_analyze.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 3c9b41c70afb..e082cabaadaf 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -826,7 +826,7 @@ async fn parquet_recursive_projection_pushdown() -> Result<()> { replace_path_variants(&mut actual, &fs_path); if let Ok(url_path) = Path::from_filesystem_path(temp_dir_path) { - replace_path_variants(&mut actual, &url_path.to_string()); + replace_path_variants(&mut actual, url_path.as_ref()); } assert_snapshot!(