Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ fn test_cli_top_memory_consumers<'a>(
let _bound = bind_to_settings(snapshot_name);

let mut cmd = cli();
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1 desc;";
cmd.args(["--memory-limit", "10M", "--command", sql]);
cmd.args(top_memory_consumers);

Expand All @@ -237,7 +237,7 @@ fn test_cli_top_memory_consumers_with_mem_pool_type<'a>(
let _bound = bind_to_settings(snapshot_name);

let mut cmd = cli();
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1 desc;";
cmd.args([
"--memory-limit",
"10M",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ info:
- "--memory-limit"
- 10M
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"
- "--top-memory-consumers"
- "0"
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ info:
- "--memory-limit"
- 10M
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"
- "--top-memory-consumers"
- "2"
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ info:
- "--memory-limit"
- 10M
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"
---
success: false
exit_code: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ info:
- "--mem-pool-type"
- fair
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"
- "--top-memory-consumers"
- "0"
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ info:
- "--mem-pool-type"
- fair
- "--command"
- "select * from generate_series(1,500000) as t1(v1) order by v1;"
- "select * from generate_series(1,500000) as t1(v1) order by v1 desc;"
- "--top-memory-consumers"
- "2"
---
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ async fn test_in_mem_buffer_almost_full() {

let ctx = SessionContext::new_with_config_rt(config, runtime);

let query = "select * from generate_series(1,9000000) as t1(v1) order by v1;";
let query = "select * from generate_series(1,9000000) as t1(v1) order by v1 desc;";
let df = ctx.sql(query).await.unwrap();

// Check not fail
Expand All @@ -535,7 +535,7 @@ async fn test_external_sort_zero_merge_reservation() {

let ctx = SessionContext::new_with_config_rt(config, runtime);

let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;";
let query = "select * from generate_series(1,10000000) as t1(v1) order by v1 desc;";
let df = ctx.sql(query).await.unwrap();

let physical_plan = df.create_physical_plan().await.unwrap();
Expand Down Expand Up @@ -599,7 +599,7 @@ async fn test_disk_spill_limit_reached() -> Result<()> {
let ctx = setup_context(1024 * 1024, 1024 * 1024, spill_compression).await?; // 1MB disk limit, 1MB memory limit

let df = ctx
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1 desc")
.await
.unwrap();

Expand Down Expand Up @@ -627,7 +627,7 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> {
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit

let df = ctx
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1 desc")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();
Expand Down Expand Up @@ -663,7 +663,7 @@ async fn test_spill_file_compressed_with_zstd() -> Result<()> {
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd

let df = ctx
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1 desc")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();
Expand Down Expand Up @@ -699,7 +699,7 @@ async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> {
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame

let df = ctx
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1 desc")
.await
.unwrap();
let plan = df.create_physical_plan().await.unwrap();
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn test_memory_limit_with_spill() {
.await
.unwrap();

let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;";
let query = "select * from generate_series(1,10000000) as t1(v1) order by v1 desc;";
let df = ctx.sql(query).await.unwrap();

let plan = df.create_physical_plan().await.unwrap();
Expand Down Expand Up @@ -76,7 +76,7 @@ async fn test_no_spill_with_adequate_memory() {
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let query = "select * from generate_series(1,100000) as t1(v1) order by v1 desc;";
let df = ctx.sql(query).await.unwrap();

let plan = df.create_physical_plan().await.unwrap();
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn test_memory_limit_enforcement() {
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let query = "select * from generate_series(1,100000) as t1(v1) order by v1 desc;";
let result = ctx.sql(query).await.unwrap().collect().await;

assert!(result.is_err(), "Should fail due to memory limit");
Expand Down Expand Up @@ -201,7 +201,7 @@ async fn test_max_temp_directory_size_enforcement() {
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let query = "select * from generate_series(1,100000) as t1(v1) order by v1 desc;";
let result = ctx.sql(query).await.unwrap().collect().await;

assert!(
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ async-trait = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
parking_lot = { workspace = true }

Expand Down
37 changes: 33 additions & 4 deletions datafusion/functions-table/src/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::timezone::Tz;
use arrow::array::types::TimestampNanosecondType;
use arrow::array::{ArrayRef, Int64Array, TimestampNanosecondArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{
DataType, Field, IntervalMonthDayNano, Schema, SchemaRef, TimeUnit,
};
Expand All @@ -28,6 +29,8 @@ use datafusion_catalog::TableProvider;
use datafusion_catalog::{Session, TableFunctionArgs};
use datafusion_common::{Result, ScalarValue, plan_err};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
use parking_lot::RwLock;
Expand Down Expand Up @@ -333,6 +336,29 @@ impl GenerateSeriesTable {

Ok(generator)
}

/// Detects output sort order to potentially remove `SortExec`.
/// Only the `Int64` argument type is currently supported.
fn output_ordering(&self, schema: &Schema) -> Option<PhysicalSortExpr> {
let step = match &self.args {
GenSeriesArgs::Int64Args { step, .. } => *step,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if time type allows step like 1 year -13 month, so I've kept them outside the scope of this PR for simplicity.

_ => return None,
};

if schema.fields().is_empty() {
return None;
}

let descending = step < 0;
Some(PhysicalSortExpr::new(
Arc::new(Column::new(schema.field(0).name(), 0)),
SortOptions {
descending,
// this table function won't output nulls, so either is fine
nulls_first: false,
},
))
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -460,11 +486,14 @@ impl TableProvider for GenerateSeriesTable {
) -> Result<Arc<dyn ExecutionPlan>> {
let batch_size = state.config_options().execution.batch_size;
let generator = self.as_generator(batch_size)?;
let mut exec = LazyMemoryExec::try_new(self.schema(), vec![generator])?
.with_projection(projection.cloned());

Ok(Arc::new(
LazyMemoryExec::try_new(self.schema(), vec![generator])?
.with_projection(projection.cloned()),
))
if let Some(ordering) = self.output_ordering(exec.schema().as_ref()) {
exec.add_ordering([ordering]);
}

Ok(Arc::new(exec))
}
}

Expand Down
21 changes: 8 additions & 13 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -691,23 +691,18 @@ physical_plan
21)│ AND CURRENT ROW │
22)└─────────────┬─────────────┘
23)┌─────────────┴─────────────┐
24)│ SortExec
24)│ ProjectionExec
25)│ -------------------- │
26)│ v1@0 ASC NULLS LAST
26)│ v1: value
27)└─────────────┬─────────────┘
28)┌─────────────┴─────────────┐
29)│ ProjectionExec
29)│ LazyMemoryExec
30)│ -------------------- │
31)│ v1: value │
32)└─────────────┬─────────────┘
33)┌─────────────┴─────────────┐
34)│ LazyMemoryExec │
35)│ -------------------- │
36)│ batch_generators: │
37)│ generate_series: start=1, │
38)│ end=1000, batch_size │
39)│ =8192 │
40)└───────────────────────────┘
31)│ batch_generators: │
32)│ generate_series: start=1, │
33)│ end=1000, batch_size │
34)│ =8192 │
35)└───────────────────────────┘

query TT
explain select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ explain select * from small_table join large_table on small_table.k = large_tabl
----
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]}, projection=[k], file_type=parquet
03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]}, projection=[k], output_ordering=[k@0 ASC NULLS LAST], file_type=parquet
03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]}, projection=[k, v], output_ordering=[k@0 ASC NULLS LAST], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]

statement ok
drop table small_table;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/set_variable.slt
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,14 @@ SET datafusion.runtime.memory_limit = '1K'

# This query should fail with low memory
statement error Not enough memory to continue external sort
EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1
EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 DESC

statement ok
RESET datafusion.runtime.memory_limit

# This query should succeed after resetting memory limit
statement ok
EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1
EXPLAIN ANALYZE SELECT * FROM generate_series(1, 1000) AS t1(v1) ORDER BY v1 DESC

statement ok
SET datafusion.runtime.list_files_cache_limit = '1K'
Expand Down
20 changes: 10 additions & 10 deletions datafusion/sqllogictest/test_files/statistics_registry.slt
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ JOIN dim_small d ON o.small_id = d.small_id;
----
physical_plan
01)HashJoinExec: mode=Partitioned, join_type=Inner, on=[(small_id@2, small_id@0)], projection=[order_id@1, region_id@0, label@4]
02)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1
02)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1, maintains_sort_order=true
03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@0, customer_id@1)], projection=[region_id@1, order_id@2, small_id@4]
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], file_type=parquet
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], file_type=parquet, predicate=DynamicFilter [ empty ]
06)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1
07)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], file_type=parquet, predicate=DynamicFilter [ empty ]
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], output_ordering=[region_id@1 ASC NULLS LAST], file_type=parquet
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], output_ordering=[order_id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
06)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1, maintains_sort_order=true
07)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], output_ordering=[small_id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]

# -- With registry -----------------------------------------------------------
# Conservative estimate 100 > 50: dim_small correctly swapped to build side
Expand All @@ -122,12 +122,12 @@ JOIN dim_small d ON o.small_id = d.small_id;
----
physical_plan
01)HashJoinExec: mode=Partitioned, join_type=Inner, on=[(small_id@0, small_id@2)], projection=[order_id@3, region_id@2, label@1]
02)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], file_type=parquet
04)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1
02)--RepartitionExec: partitioning=Hash([small_id@0], 4), input_partitions=1, maintains_sort_order=true
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/dim_small.parquet]]}, projection=[small_id, label], output_ordering=[small_id@0 ASC NULLS LAST], file_type=parquet
04)--RepartitionExec: partitioning=Hash([small_id@2], 4), input_partitions=1, maintains_sort_order=true
05)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(customer_id@0, customer_id@1)], projection=[region_id@1, order_id@2, small_id@4]
06)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], file_type=parquet
07)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
06)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/customers.parquet]]}, projection=[customer_id, region_id], output_ordering=[region_id@1 ASC NULLS LAST], file_type=parquet
07)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/statistics_registry/orders.parquet]]}, projection=[order_id, customer_id, small_id], output_ordering=[order_id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]

# -- Verify results are identical regardless of join order --------------------

Expand Down
21 changes: 10 additions & 11 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1848,17 +1848,16 @@ logical_plan
physical_plan
01)ScalarSubqueryExec: subqueries=1
02)--SortPreservingMergeExec: [i@0 ASC NULLS LAST]
03)----SortExec: expr=[i@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------ProjectionExec: expr=[value@0 as i]
05)--------FilterExec: CAST(value@0 AS Float64) > scalar_subquery(<pending>)
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192]
08)--AggregateExec: mode=Final, gby=[], aggr=[avg(u.j)]
09)----CoalescePartitionsExec
10)------AggregateExec: mode=Partial, gby=[], aggr=[avg(u.j)]
11)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------ProjectionExec: expr=[value@0 as j]
13)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192]
03)----ProjectionExec: expr=[value@0 as i]
04)------FilterExec: CAST(value@0 AS Float64) > scalar_subquery(<pending>)
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192]
07)--AggregateExec: mode=Final, gby=[], aggr=[avg(u.j)]
08)----CoalescePartitionsExec
09)------AggregateExec: mode=Partial, gby=[], aggr=[avg(u.j)]
10)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
11)----------ProjectionExec: expr=[value@0 as j]
12)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10, batch_size=8192]

query I
SELECT i
Expand Down
14 changes: 14 additions & 0 deletions datafusion/sqllogictest/test_files/table_functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ SELECT * FROM generate_series(1, 5) t1(v1) ORDER BY v1 DESC
2
1

# Ensures the physical plan doesn't include `SortExec`.
# Tests that generate_series() on Int64 auto-detects output sort order.
query TT
EXPLAIN FORMAT INDENT SELECT * FROM generate_series(100000) AS t1(v1) ORDER BY v1
----
logical_plan
01)Sort: t1.v1 ASC NULLS LAST
02)--SubqueryAlias: t1
03)----Projection: generate_series().value AS v1
04)------TableScan: generate_series() projection=[value]
physical_plan
01)ProjectionExec: expr=[value@0 as v1]
02)--LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100000, batch_size=8192]

# Test generate_series with LIMIT
query I rowsort
SELECT * FROM generate_series(1, 100) t1(v1) LIMIT 5
Expand Down
Loading