Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move min and max to user defined aggregate function #11013

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

edmondop
Copy link
Contributor

@edmondop edmondop commented Jun 19, 2024

Which issue does this PR close?

Closes #10943 .

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core datafusion crate labels Jun 19, 2024
@github-actions github-actions bot removed the sql SQL Planner label Jun 20, 2024
@edmondop
Copy link
Contributor Author

I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand

running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED

successes:

successes:

failures:

---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }

@jayzhan211
Copy link
Contributor

I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand

running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED

successes:

successes:

failures:

---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }

I guess you skip the aggregate statistic optimization for min/max

fn take_optimizable_min(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
if let Precision::Exact(num_rows) = &stats.num_rows {
match *num_rows {
0 => {
// MIN/MAX with 0 rows is always null
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if let Ok(min_data_type) =
ScalarValue::try_from(casted_expr.field().unwrap().data_type())
{
return Some((min_data_type, casted_expr.name().to_string()));
}
}
}
value if value > 0 => {
let col_stats = &stats.column_statistics;
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
if let Precision::Exact(val) =
&col_stats[col_expr.index()].min_value
{
if !val.is_null() {
return Some((
val.clone(),
casted_expr.name().to_string(),
));
}
}
}
}
}
}
_ => {}
}
}
None
}

You might need to check if the AggregateExpr is min/max in take_optimizable_min and take_optimizable_max

@edmondop
Copy link
Contributor Author

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jun 22, 2024

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function

But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

@edmondop
Copy link
Contributor Author

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function

But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw

@jayzhan211
Copy link
Contributor

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function
But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw

I don't think so, Distinct/Distinct On is different from distinct in the function.

@edmondop
Copy link
Contributor Author

@jayzhan211 I have started experimenting with an optimizer rule, but removing the distinct result in such an error:

running 2 tests
test eliminate_distinct::tests::eliminate_distinct_from_min_expr ... FAILED
test eliminate_nested_union::tests::eliminate_distinct_nothing ... ok

failures:

---- eliminate_distinct::tests::eliminate_distinct_from_min_expr stdout ----
Transformed yes true
Error: Context("Optimizer rule 'eliminate_distinct' failed", Context("eliminate_distinct", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(DISTINCT test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }")))

Do I need to change also the equivalence rules?

@edmondop edmondop requested a review from jayzhan211 June 23, 2024 20:29
@edmondop edmondop marked this pull request as ready for review June 23, 2024 20:29
@jayzhan211
Copy link
Contributor

eliminate_distinct_from_min_expr

You can take single_distinct_groupby as reference, there is alias to remain schema equivalence.
Also, I suggest we introduce this rule in another PR, not mixing this with MIN/MAX UDAF.

@edmondop
Copy link
Contributor Author

Thanks. I guess I wasn't clear in my comment here #11013 (comment) . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available

@jayzhan211
Copy link
Contributor

Thanks. I guess I wasn't clear in my comment here #11013 (comment) . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available

If we eliminate distinct of min/max prior to single_distinct_to_group_by, we don't expect to get distinct min/max at this point, we should rewrite the test to other function like sum.

@edmondop
Copy link
Contributor Author

---- single_distinct_to_groupby::tests::two_distinct_and_one_common

Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?

@jayzhan211
Copy link
Contributor

---- single_distinct_to_groupby::tests::two_distinct_and_one_common

Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?

Yes, I suggest we update the test like

    #[test]
    fn one_distinct_and_two_common() -> Result<()> {
        let table_scan = test_table_scan()?;

        let plan = LogicalPlanBuilder::from(table_scan)
            .aggregate(
                vec![col("a")],
                vec![sum(col("c")), count_distinct(col("b")), max(col("b"))],
            )?
            .build()?;
        // Should work
        let expected = "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

        assert_optimized_plan_equal(plan, expected)
    }

@edmondop
Copy link
Contributor Author

There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place

This is a subset of the new plan

aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

while this is the subset from the previous plan

Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

there is an alias3:UInt64 that gets added

@jayzhan211
Copy link
Contributor

There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place

This is a subset of the new plan

aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

while this is the subset from the previous plan

Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

there is an alias3:UInt64 that gets added

Remove the Min/Max matching in is_single_distinct_agg and the alias is removed

alamb
alamb previously approved these changes Jun 27, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @edmondop -- I took a look at this PR and I think in general it is quite close.

It needs:

  1. to remove the old min/max implementation in https://github.com/apache/datafusion/blob/5bb6b356277ea1c6f1d7af64e2d66f005d7e1ed4/datafusion/physical-expr/src/aggregate/min_max.rs
  2. resolve some merge conflicts

There is also a follow on issue / PR I would like to make regarding the optimizer check

Given this PR has hung out for a while and has some merge conflicts now I am going to try and help polish it up

datafusion/expr/src/test/function_stub.rs Outdated Show resolved Hide resolved
datafusion/expr/src/test/function_stub.rs Outdated Show resolved Hide resolved
@alamb alamb changed the title Moving min and max to new API and removing from protobuf Moving min and max to user defined aggregate function Jun 27, 2024
@edmondop
Copy link
Contributor Author

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

@alamb alamb dismissed their stale review June 27, 2024 20:58

Working out the duffs

@edmondop edmondop force-pushed the issue-10943 branch 3 times, most recently from 49f7a7f to 4440a8d Compare July 22, 2024 12:12
@edmondop
Copy link
Contributor Author

I think this is now blocked by #11595

@alamb
Copy link
Contributor

alamb commented Jul 22, 2024

I think this is now blocked by #11595

Thanks @edmondop -- good 🕵️ work

pub(crate) mod prim_op {
pub use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
}
pub(crate) mod prim_op {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can delete it

@alamb
Copy link
Contributor

alamb commented Jul 24, 2024

As I understand it this PR is still a work in progress, so marking it as a draft (I am trying to make sure it is clear what PRs are waiting for review and what are not)

@alamb alamb marked this pull request as draft July 24, 2024 18:39
@github-actions github-actions bot added sql SQL Planner substrait labels Jul 24, 2024
@edmondop edmondop force-pushed the issue-10943 branch 3 times, most recently from 356a8e1 to ab26352 Compare July 25, 2024 12:32
@edmondop
Copy link
Contributor Author

@jayzhan211 the change is dropping the limit in the physical plan node, I wasn't able to find out the source of it. Do you have any hint ?

@jayzhan211
Copy link
Contributor

@edmondop
You need to add get_minmax_desc method to AggregateFunctionExpr

fn transform_agg(
aggr: &AggregateExec,
order: &PhysicalSortExpr,
limit: usize,
) -> Option<Arc<dyn ExecutionPlan>> {
// ensure the sort direction matches aggregate function
let (field, desc) = aggr.get_minmax_desc()?;
if desc != order.options.descending {
return None;
}
let group_key = aggr.group_expr().expr().iter().exactly_one().ok()?;
let kt = group_key.0.data_type(&aggr.input().schema()).ok()?;
if !kt.is_primitive() && kt != DataType::Utf8 {
return None;
}
if aggr.filter_expr().iter().any(|e| e.is_some()) {
return None;
}
// ensure the sort is on the same field as the aggregate output
let col = order.expr.as_any().downcast_ref::<Column>()?;
if col.name() != field.name() {
return None;
}
// We found what we want: clone, copy the limit down, and return modified node
let new_aggr = AggregateExec::try_new(
*aggr.mode(),
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
aggr.input().clone(),
aggr.input_schema(),
)
.expect("Unable to copy Aggregate!")
.with_limit(Some(limit));
Some(Arc::new(new_aggr))
}

@edmondop
Copy link
Contributor Author

@jayzhan211 I am a little confused about the test case here

Interval(MonthDayNano) 0 years -2 mons 0 days 0 hours 0 mins 0.000000000 secs 0 years 2 mons 15 days 0 hours 0 mins 0.000000000 secs

I have noticed that in the signature for min/max as an AggregateFunction, TimeInterval is not added

.

If I don't add it, the SQL test fails because arrow_type(min(c1)) returns Utf8 for me, but if I add it then the query fails with error

External error: query failed: DataFusion error: Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano).

@edmondop edmondop requested a review from jayzhan211 July 27, 2024 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions sql SQL Planner sqllogictest substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Convert Min and Max to UDAF
3 participants