-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
For some aggregation functions input ordering changes the result emitted. In these cases, it is helpful to require desired ordering in the input. Consider query below
SELECT (ARRAY_AGG(s.amount ORDER BY s.sn DESC)) AS amounts
FROM sales_global AS s
GROUP BY s.tsThis query successfully runs in Postgre.
However, in datafusion it returns following message.
NotImplemented("ORDER BY not supported in ARRAY_AGG: s.sn DESC")
Describe the solution you'd like
I would like to have this support. With this feature in place, we can add new aggregate functions that makes sense with this feature, such as FIRST/ FIRST_VALUE, LAST/ LAST_VALUE etc.
Describe alternatives you've considered
No response
Additional context
To reproduce problem, you can use test below.
#[tokio::test]
async fn test_ordered_aggregation() -> Result<()> {
let config = SessionConfig::new()
.with_target_partitions(1);
let ctx = SessionContext::with_config(config);
ctx.sql("CREATE TABLE sales_global (
sn INT PRIMARY KEY,
ts TIMESTAMP,
currency VARCHAR(3),
amount INT
) as VALUES
(1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.00),
(2, '2022-01-01 11:30:00'::timestamp, 'EUR', 75.00),
(3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.00),
(4, '2022-01-03 10:00:00'::timestamp, 'EUR', 100.00)").await?;
let sql = "SELECT (ARRAY_AGG(s.amount ORDER BY s.sn DESC)) AS amounts
FROM sales_global AS s
GROUP BY s.sn";
let msg = format!("Creating logical plan for '{sql}'");
let dataframe: DataFrame = ctx.sql(sql).await.expect(&msg);
let physical_plan = dataframe.create_physical_plan().await?;
let batches = collect(physical_plan, ctx.task_ctx()).await?;
print_batches(&batches)?;
Ok(())
}Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request