Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{not_impl_err, plan_err, Column, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -329,20 +329,55 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
|| !group_by_exprs.is_empty()
|| !window_func_exprs.is_empty()
{
return not_impl_err!("DISTINCT ON expressions with GROUP BY, aggregation or window functions are not supported ");
// With aggregation/window functions, the DISTINCT ON expressions and ORDER BY
// should reference columns from the aggregated/windowed result
let on_expr = on_expr
.into_iter()
.map(|e| {
self.sql_expr_to_logical_expr(
e,
plan.schema(),
planner_context,
)
})
.collect::<Result<Vec<_>>>()?;

let on_expr = on_expr
.into_iter()
.map(|expr| normalize_col(expr, &plan))
.collect::<Result<Vec<_>>>()?;

// Get select expressions from the current plan's output
let select_exprs_from_plan = plan
.schema()
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect::<Vec<_>>();

// Build the final plan with DISTINCT ON
LogicalPlanBuilder::from(plan)
.distinct_on(on_expr, select_exprs_from_plan, None)?
.build()
} else {
// Original logic: without aggregation, use base_plan
let on_expr = on_expr
.into_iter()
.map(|e| {
self.sql_expr_to_logical_expr(
e,
plan.schema(),
planner_context,
)
})
.collect::<Result<Vec<_>>>()?;

// Build the final plan
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_expr, select_exprs, None)?
.build()
}

let on_expr = on_expr
.into_iter()
.map(|e| {
self.sql_expr_to_logical_expr(e, plan.schema(), planner_context)
})
.collect::<Result<Vec<_>>>()?;

// Build the final plan
LogicalPlanBuilder::from(base_plan)
.distinct_on(on_expr, select_exprs, None)?
.build()
}
}?;

Expand Down
20 changes: 20 additions & 0 deletions datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,23 @@ logical_plan

statement ok
drop table t;

# test distinct on with aggregation
query TII
SELECT DISTINCT ON (c1) c1, c3, max(c4) as agg2 FROM aggregate_test_100 GROUP BY c1, c3 ORDER BY c1, agg2;
----
a 65 -28462
b -60 -21739
c 3 -30508
d 102 -24558
e -56 -31500

query TII
SELECT DISTINCT ON (c0) c1 as c0, c3, max(c4) as agg2 FROM aggregate_test_100 GROUP BY c1, c3 ORDER BY c0, agg2;
----
a 65 -28462
b -60 -21739
c 3 -30508
d 102 -24558
e -56 -31500