From 8da167d5676f6569639e98ff0f7b6ae6147cdf70 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 16 May 2026 11:33:26 +0800 Subject: [PATCH 1/3] elimiate SortExec on generate_series() --- Cargo.lock | 1 + datafusion-cli/tests/cli_integration.rs | 4 +- .../cli_top_memory_consumers@no_track.snap | 2 +- .../cli_top_memory_consumers@top2.snap | 2 +- ...cli_top_memory_consumers@top3_default.snap | 2 +- ...consumers_with_mem_pool_type@no_track.snap | 2 +- ...ory_consumers_with_mem_pool_type@top2.snap | 2 +- datafusion/core/tests/memory_limit/mod.rs | 12 +++--- datafusion/core/tests/sql/runtime_config.rs | 8 ++-- datafusion/functions-table/Cargo.toml | 1 + .../functions-table/src/generate_series.rs | 37 +++++++++++++++++-- .../sqllogictest/test_files/set_variable.slt | 4 +- .../test_files/table_functions.slt | 14 +++++++ 13 files changed, 68 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2ce889675b1d..010dd315c44ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2267,6 +2267,7 @@ dependencies = [ "datafusion-catalog", "datafusion-common", "datafusion-expr", + "datafusion-physical-expr", "datafusion-physical-plan", "parking_lot", ] diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 5609591268539..4849ac9e9a5e2 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -219,7 +219,7 @@ fn test_cli_top_memory_consumers<'a>( let _bound = bind_to_settings(snapshot_name); let mut cmd = cli(); - let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;"; + let sql = "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"; cmd.args(["--memory-limit", "10M", "--command", sql]); cmd.args(top_memory_consumers); @@ -237,7 +237,7 @@ fn test_cli_top_memory_consumers_with_mem_pool_type<'a>( let _bound = bind_to_settings(snapshot_name); let mut cmd = cli(); - let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;"; + let sql = "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"; cmd.args([ "--memory-limit", "10M", diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap index c34e1202f55da..fb8a265c0e7d1 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap @@ -6,7 +6,7 @@ info: - "--memory-limit" - 10M - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "select * from generate_series(1,500000) as t1(v1) order by v1 desc;" - "--top-memory-consumers" - "0" --- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap index ebf7a540d8d44..f260110869999 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -6,7 +6,7 @@ info: - "--memory-limit" - 10M - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "select * from generate_series(1,500000) as t1(v1) order by v1 desc;" - "--top-memory-consumers" - "2" --- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap index 9e279ca93ddcd..227e83658451f 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -6,7 +6,7 @@ info: - "--memory-limit" - 10M - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "select * from generate_series(1,500000) as t1(v1) order by v1 desc;" --- success: false exit_code: 1 diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap index 9a228fcfb6e93..95284482409cb 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@no_track.snap @@ -8,7 +8,7 @@ info: - "--mem-pool-type" - fair - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "select * from generate_series(1,500000) as t1(v1) order by v1 desc;" - "--top-memory-consumers" - "0" --- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap index d7f964a339313..6b757ba8e97d2 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers_with_mem_pool_type@top2.snap @@ -8,7 +8,7 @@ info: - "--mem-pool-type" - fair - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" + - "select * from generate_series(1,500000) as t1(v1) order by v1 desc;" - "--top-memory-consumers" - "2" --- diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 90df245718db7..64861f237074e 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -514,7 +514,7 @@ async fn test_in_mem_buffer_almost_full() { let ctx = SessionContext::new_with_config_rt(config, runtime); - let query = "select * from generate_series(1,9000000) as t1(v1) order by v1;"; + let query = "select * from generate_series(1,9000000) as t1(v1) order by v1 desc;"; let df = ctx.sql(query).await.unwrap(); // Check not fail @@ -535,7 +535,7 @@ async fn test_external_sort_zero_merge_reservation() { let ctx = SessionContext::new_with_config_rt(config, runtime); - let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;"; + let query = "select * from generate_series(1,10000000) as t1(v1) order by v1 desc;"; let df = ctx.sql(query).await.unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); @@ -599,7 +599,7 @@ async fn test_disk_spill_limit_reached() -> Result<()> { let ctx = setup_context(1024 * 1024, 1024 * 1024, spill_compression).await?; // 1MB disk limit, 1MB memory limit let df = ctx - .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1") + .sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1 desc") .await .unwrap(); @@ -627,7 +627,7 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> { let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit let df = ctx - .sql("select * from generate_series(1, 10000) as t1(v1) order by v1") + .sql("select * from generate_series(1, 10000) as t1(v1) order by v1 desc") .await .unwrap(); let plan = df.create_physical_plan().await.unwrap(); @@ -663,7 +663,7 @@ async fn test_spill_file_compressed_with_zstd() -> Result<()> { let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd let df = ctx - .sql("select * from generate_series(1, 100000) as t1(v1) order by v1") + .sql("select * from generate_series(1, 100000) as t1(v1) order by v1 desc") .await .unwrap(); let plan = df.create_physical_plan().await.unwrap(); @@ -699,7 +699,7 @@ async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> { let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame let df = ctx - .sql("select * from generate_series(1, 100000) as t1(v1) order by v1") + .sql("select * from generate_series(1, 100000) as t1(v1) order by v1 desc") .await .unwrap(); let plan = df.create_physical_plan().await.unwrap(); diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index cf5237d725805..ccc11afbff347 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -46,7 +46,7 @@ async fn test_memory_limit_with_spill() { .await .unwrap(); - let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;"; + let query = "select * from generate_series(1,10000000) as t1(v1) order by v1 desc;"; let df = ctx.sql(query).await.unwrap(); let plan = df.create_physical_plan().await.unwrap(); @@ -76,7 +76,7 @@ async fn test_no_spill_with_adequate_memory() { .await .unwrap(); - let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let query = "select * from generate_series(1,100000) as t1(v1) order by v1 desc;"; let df = ctx.sql(query).await.unwrap(); let plan = df.create_physical_plan().await.unwrap(); @@ -127,7 +127,7 @@ async fn test_memory_limit_enforcement() { .await .unwrap(); - let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let query = "select * from generate_series(1,100000) as t1(v1) order by v1 desc;"; let result = ctx.sql(query).await.unwrap().collect().await; assert!(result.is_err(), "Should fail due to memory limit"); @@ -201,7 +201,7 @@ async fn test_max_temp_directory_size_enforcement() { .await .unwrap(); - let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let query = "select * from generate_series(1,100000) as t1(v1) order by v1 desc;"; let result = ctx.sql(query).await.unwrap().collect().await; assert!( diff --git a/datafusion/functions-table/Cargo.toml b/datafusion/functions-table/Cargo.toml index 4edb640cb2cf2..fb02c2c5e2cb3 100644 --- a/datafusion/functions-table/Cargo.toml +++ b/datafusion/functions-table/Cargo.toml @@ -46,6 +46,7 @@ async-trait = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } +datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } parking_lot = { workspace = true } diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 175a6b3bff06c..fa4c32a24ac16 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -18,6 +18,7 @@ use arrow::array::timezone::Tz; use arrow::array::types::TimestampNanosecondType; use arrow::array::{ArrayRef, Int64Array, TimestampNanosecondArray}; +use arrow::compute::SortOptions; use arrow::datatypes::{ DataType, Field, IntervalMonthDayNano, Schema, SchemaRef, TimeUnit, }; @@ -28,6 +29,8 @@ use datafusion_catalog::TableProvider; use datafusion_catalog::{Session, TableFunctionArgs}; use datafusion_common::{Result, ScalarValue, plan_err}; use datafusion_expr::{Expr, TableType}; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; use parking_lot::RwLock; @@ -333,6 +336,29 @@ impl GenerateSeriesTable { Ok(generator) } + + /// Detects output sort order to potentially remove `SortExec` + /// Now only `Int64` argument type is supported + fn output_ordering(&self, schema: &Schema) -> Option { + let step = match &self.args { + GenSeriesArgs::Int64Args { step, .. } => *step, + _ => return None, + }; + + if schema.fields().is_empty() { + return None; + } + + let descending = step < 0; + Some(PhysicalSortExpr::new( + Arc::new(Column::new(schema.field(0).name(), 0)), + SortOptions { + descending, + // this table function won't output nulls, so either is fine + nulls_first: false, + }, + )) + } } #[derive(Debug, Clone)] @@ -460,11 +486,14 @@ impl TableProvider for GenerateSeriesTable { ) -> Result> { let batch_size = state.config_options().execution.batch_size; let generator = self.as_generator(batch_size)?; + let mut exec = LazyMemoryExec::try_new(self.schema(), vec![generator])? + .with_projection(projection.cloned()); - Ok(Arc::new( - LazyMemoryExec::try_new(self.schema(), vec![generator])? - .with_projection(projection.cloned()), - )) + if let Some(ordering) = self.output_ordering(exec.schema().as_ref()) { + exec.add_ordering([ordering]); + } + + Ok(Arc::new(exec)) } } diff --git a/datafusion/sqllogictest/test_files/set_variable.slt b/datafusion/sqllogictest/test_files/set_variable.slt index 378a9c83dbc70..508b301fe3c09 100644 --- a/datafusion/sqllogictest/test_files/set_variable.slt +++ b/datafusion/sqllogictest/test_files/set_variable.slt @@ -342,14 +342,14 @@ SET datafusion.runtime.memory_limit = '1K' # This query should fail with low memory statement error Not enough memory to continue external sort -EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 +EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 DESC statement ok RESET datafusion.runtime.memory_limit # This query should succeed after resetting memory limit statement ok -EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 +EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 DESC statement ok SET datafusion.runtime.list_files_cache_limit = '1K' diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index 3d654c4195feb..b562a81eb1e72 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -100,6 +100,20 @@ SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC 2 1 +# Ensures the physical plan doesn't include `SortExec` +# Tests that generate_series() on Int64 auto-detects output sort order +query TT +EXPLAIN FORMAT INDENT SELECT * FROM generate_series(100000) AS t1(v1) ORDER BY v1 +---- +logical_plan +01)Sort: t1.v1 ASC NULLS LAST +02)--SubqueryAlias: t1 +03)----Projection: generate_series().value AS v1 +04)------TableScan: generate_series() projection=[value] +physical_plan +01)ProjectionExec: expr=[value@0 as v1] +02)--LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100000, batch_size=8192] + # Test generate_series with LIMIT query I rowsort SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5 From 59c1c271f6452116ec0107926ba21ff95b2313b0 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 16 May 2026 12:24:57 +0800 Subject: [PATCH 2/3] typo fix --- datafusion/functions-table/src/generate_series.rs | 4 ++-- datafusion/sqllogictest/test_files/table_functions.slt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index fa4c32a24ac16..52baa7e6cf8ef 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -337,8 +337,8 @@ impl GenerateSeriesTable { Ok(generator) } - /// Detects output sort order to potentially remove `SortExec` - /// Now only `Int64` argument type is supported + /// Detects output sort order to potentially remove `SortExec`. + /// Only the `Int64` argument type is currently supported. fn output_ordering(&self, schema: &Schema) -> Option { let step = match &self.args { GenSeriesArgs::Int64Args { step, .. } => *step, diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index b562a81eb1e72..e1ab444d81044 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -100,8 +100,8 @@ SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC 2 1 -# Ensures the physical plan doesn't include `SortExec` -# Tests that generate_series() on Int64 auto-detects output sort order +# Ensures the physical plan doesn't include `SortExec`. +# Tests that generate_series() on Int64 auto-detects output sort order. query TT EXPLAIN FORMAT INDENT SELECT * FROM generate_series(100000) AS t1(v1) ORDER BY v1 ---- From 8641016f963ec96cbd816d7009382836459fbc3e Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sat, 16 May 2026 12:29:44 +0800 Subject: [PATCH 3/3] lint fix --- .../sqllogictest/test_files/explain_tree.slt | 21 +++++++------------ .../test_files/push_down_filter_parquet.slt | 6 +++--- .../test_files/statistics_registry.slt | 20 +++++++++--------- .../sqllogictest/test_files/subquery.slt | 21 +++++++++---------- 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index a7d3bead0e8e5..5bb4817be9644 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -691,23 +691,18 @@ physical_plan 21)│ AND CURRENT ROW │ 22)└─────────────┬─────────────┘ 23)┌─────────────┴─────────────┐ -24)│ SortExec │ +24)│ ProjectionExec │ 25)│ -------------------- │ -26)│ v1@0 ASC NULLS LAST │ +26)│ v1: value │ 27)└─────────────┬─────────────┘ 28)┌─────────────┴─────────────┐ -29)│ ProjectionExec │ +29)│ LazyMemoryExec │ 30)│ -------------------- │ -31)│ v1: value │ -32)└─────────────┬─────────────┘ -33)┌─────────────┴─────────────┐ -34)│ LazyMemoryExec │ -35)│ -------------------- │ -36)│ batch_generators: │ -37)│ generate_series: start=1, │ -38)│ end=1000, batch_size │ -39)│ =8192 │ -40)└───────────────────────────┘ +31)│ batch_generators: │ +32)│ generate_series: start=1, │ +33)│ end=1000, batch_size │ +34)│ =8192 │ +35)└───────────────────────────┘ query TT explain select diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index 8469c32a17033..06f7e4b48b36f 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -156,9 +156,9 @@ explain select * from small_table join large_table on small_table.k = large_tabl ---- physical_plan 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]}, projection=[k], file_type=parquet -03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]}, projection=[k], output_ordering=[k@0 ASC NULLS LAST], file_type=parquet +03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]}, projection=[k, v], output_ordering=[k@0 ASC NULLS LAST], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] statement ok drop table small_table; diff --git a/datafusion/sqllogictest/test_files/statistics_registry.slt b/datafusion/sqllogictest/test_files/statistics_registry.slt index 6baa4e218ed20..c856e779a0877 100644 --- a/datafusion/sqllogictest/test_files/statistics_registry.slt +++ b/datafusion/sqllogictest/test_files/statistics_registry.slt @@ -101,12 +101,12 @@ JOIN dim_small d ON o.small_id = d.small_id; ---- physical_plan 01)HashJoinExec: mode=Partitioned, join_type=Inner, on=[(small_id@2, small_id@0)], projection=[order_id@1, region_id@0, label@4] -02)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1 +02)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1, maintains_sort_order=true 03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@0, customer_id@1)], projection=[region_id@1, order_id@2, small_id@4] -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], file_type=parquet -05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], file_type=parquet, predicate=DynamicFilter [ empty ] -06)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1 -07)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], file_type=parquet, predicate=DynamicFilter [ empty ] +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], output_ordering=[region_id@1 ASC NULLS LAST], file_type=parquet +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], output_ordering=[order_id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +06)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1, maintains_sort_order=true +07)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], output_ordering=[small_id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] # -- With registry ----------------------------------------------------------- # Conservative estimate 100 > 50: dim_small correctly swapped to build side @@ -122,12 +122,12 @@ JOIN dim_small d ON o.small_id = d.small_id; ---- physical_plan 01)HashJoinExec: mode=Partitioned, join_type=Inner, on=[(small_id@0, small_id@2)], projection=[order_id@3, region_id@2, label@1] -02)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], file_type=parquet -04)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1 +02)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1, maintains_sort_order=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], output_ordering=[small_id@0 ASC NULLS LAST], file_type=parquet +04)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1, maintains_sort_order=true 05)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@0, customer_id@1)], projection=[region_id@1, order_id@2, small_id@4] -06)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], file_type=parquet -07)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] +06)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], output_ordering=[region_id@1 ASC NULLS LAST], file_type=parquet +07)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], output_ordering=[order_id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] # -- Verify results are identical regardless of join order -------------------- diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 1ee92fc75a365..35c80744bebbb 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1848,17 +1848,16 @@ logical_plan physical_plan 01)ScalarSubqueryExec: subqueries=1 02)--SortPreservingMergeExec: [i@0 ASC NULLS LAST] -03)----SortExec: expr=[i@0 ASC NULLS LAST], preserve_partitioning=[true] -04)------ProjectionExec: expr=[value@0 as i] -05)--------FilterExec: CAST(value@0 AS Float64) > scalar_subquery() -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192] -08)--AggregateExec: mode=Final, gby=[], aggr=[avg(u.j)] -09)----CoalescePartitionsExec -10)------AggregateExec: mode=Partial, gby=[], aggr=[avg(u.j)] -11)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -12)----------ProjectionExec: expr=[value@0 as j] -13)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192] +03)----ProjectionExec: expr=[value@0 as i] +04)------FilterExec: CAST(value@0 AS Float64) > scalar_subquery() +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192] +07)--AggregateExec: mode=Final, gby=[], aggr=[avg(u.j)] +08)----CoalescePartitionsExec +09)------AggregateExec: mode=Partial, gby=[], aggr=[avg(u.j)] +10)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true +11)----------ProjectionExec: expr=[value@0 as j] +12)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192] query I SELECT i