Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move min and max to user defined aggregate function #11013

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

edmondop
Copy link
Contributor

@edmondop edmondop commented Jun 19, 2024

Which issue does this PR close?

Closes #10943 .

  • Sliding window accumulator for min and max

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added sql logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core datafusion crate labels Jun 19, 2024
@@ -232,54 +222,6 @@ mod tests {
Ok(())
}

#[test]
fn test_min_max_expr() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this test to slt

@github-actions github-actions bot removed the sql label Jun 20, 2024
@edmondop
Copy link
Contributor Author

I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand

running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED

successes:

successes:

failures:

---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }

@jayzhan211
Copy link
Contributor

I do have something that's starting to look reasonable, but some tests on the optimizer now are failing for some reasons I can't understand

running 1 test
test custom_sources_cases::optimizers_catch_all_statistics ... FAILED

successes:

successes:

failures:

---- custom_sources_cases::optimizers_catch_all_statistics stdout ----
thread 'custom_sources_cases::optimizers_catch_all_statistics' panicked at datafusion/core/tests/custom_sources_cases/mod.rs:274:5:
Expected aggregate_statistics optimizations missing: AggregateExec { mode: Single, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [[]] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Count { name: "COUNT", signature: Signature { type_signature: VariadicAny, volatility: Immutable } } }, args: [Literal { value: Int64(1) }], logical_args: [Literal(Int64(1))], data_type: Int64, name: "COUNT(*)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int64 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Min { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["min"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MIN(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }, AggregateFunctionExpr { fun: AggregateUDF { inner: Max { signature: Signature { type_signature: Numeric(1), volatility: Immutable }, aliases: ["max"] } }, args: [Column { name: "c1", index: 0 }], logical_args: [Column(Column { relation: Some(Bare { table: "test" }), name: "c1" })], data_type: Int32, name: "MAX(test.c1)", schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, sort_exprs: [], ordering_req: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, input_type: Int32 }], filter_expr: [None, None, None], limit: None, input: CustomExecutionPlan { projection: Some([0]), cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }, schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input_schema: Schema { fields: [Field { name: "c1", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "COUNT(*)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MIN(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "MAX(test.c1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }

I guess you skip the aggregate statistic optimization for min/max

fn take_optimizable_min(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
if let Precision::Exact(num_rows) = &stats.num_rows {
match *num_rows {
0 => {
// MIN/MAX with 0 rows is always null
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if let Ok(min_data_type) =
ScalarValue::try_from(casted_expr.field().unwrap().data_type())
{
return Some((min_data_type, casted_expr.name().to_string()));
}
}
}
value if value > 0 => {
let col_stats = &stats.column_statistics;
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
if let Precision::Exact(val) =
&col_stats[col_expr.index()].min_value
{
if !val.is_null() {
return Some((
val.clone(),
casted_expr.name().to_string(),
));
}
}
}
}
}
}
_ => {}
}
}
None
}

You might need to check if the AggregateExpr is min/max in take_optimizable_min and take_optimizable_max

@edmondop
Copy link
Contributor Author

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jun 22, 2024

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function

But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

@edmondop
Copy link
Contributor Author

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function

But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw

@jayzhan211
Copy link
Contributor

I fixed this but now I have a test that doesn't pass on the optimizer (there are two actually)

---- single_distinct_to_groupby::tests::two_distinct_and_one_common stdout ----
thread 'single_distinct_to_groupby::tests::two_distinct_and_one_common' panicked at datafusion/optimizer/src/test/mod.rs:200:5:
assertion `left == right` failed
  left: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"
 right: "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias1)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

That suggests that the optimizer cannot use the existing aliases / doesn't understand the existing aliases that provide DISTINCT test.b . Looking, any tip would be highly appreciated

I think we should add distinct for MIN/MAX so we can get the distinct after group by is converted to distinct function
But I think there is no difference between MIN and Distinct Min, maybe we could remove distinct for MIN/MAX beforehand? Introduce EliminateDistinct optimize rule for MIN/MAX.

Is this a part of the optimizer i.e. https://github.com/edmondop/arrow-datafusion/blob/main/datafusion/optimizer/src/replace_distinct_aggregate.rs ? Thank your for your help btw

I don't think so, Distinct/Distinct On is different from distinct in the function.

@edmondop
Copy link
Contributor Author

@jayzhan211 I have started experimenting with an optimizer rule, but removing the distinct result in such an error:

running 2 tests
test eliminate_distinct::tests::eliminate_distinct_from_min_expr ... FAILED
test eliminate_nested_union::tests::eliminate_distinct_nothing ... ok

failures:

---- eliminate_distinct::tests::eliminate_distinct_from_min_expr stdout ----
Transformed yes true
Error: Context("Optimizer rule 'eliminate_distinct' failed", Context("eliminate_distinct", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(DISTINCT test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"MIN(test.b)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: \"test\" }), None], functional_dependencies: FunctionalDependencies { deps: [FunctionalDependence { source_indices: [0], target_indices: [0, 1], nullable: false, mode: Single }] } }")))

Do I need to change also the equivalence rules?

@edmondop edmondop requested a review from jayzhan211 June 23, 2024 20:29
@edmondop edmondop marked this pull request as ready for review June 23, 2024 20:29
@jayzhan211
Copy link
Contributor

eliminate_distinct_from_min_expr

You can take single_distinct_groupby as reference, there is alias to remain schema equivalence.
Also, I suggest we introduce this rule in another PR, not mixing this with MIN/MAX UDAF.

@edmondop
Copy link
Contributor Author

Thanks. I guess I wasn't clear in my comment here #11013 (comment) . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available

@jayzhan211
Copy link
Contributor

Thanks. I guess I wasn't clear in my comment here #11013 (comment) . How should that test failure be addressed? It seems that min/max udaf uses other aliases and is not reusing the intermediate results already available

If we eliminate distinct of min/max prior to single_distinct_to_group_by, we don't expect to get distinct min/max at this point, we should rewrite the test to other function like sum.

@edmondop
Copy link
Contributor Author

---- single_distinct_to_groupby::tests::two_distinct_and_one_common

Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?

@jayzhan211
Copy link
Contributor

---- single_distinct_to_groupby::tests::two_distinct_and_one_common

Wouldn't eliminating it require the optimizer rule? Or do you suggest I update the test case? Or the expected value?

Yes, I suggest we update the test like

    #[test]
    fn one_distinct_and_two_common() -> Result<()> {
        let table_scan = test_table_scan()?;

        let plan = LogicalPlanBuilder::from(table_scan)
            .aggregate(
                vec![col("a")],
                vec![sum(col("c")), count_distinct(col("b")), max(col("b"))],
            )?
            .build()?;
        // Should work
        let expected = "Projection: test.a, sum(alias2) AS sum(test.c), COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias3) AS MAX(test.b) [a:UInt32, sum(test.c):UInt64;N, COUNT(DISTINCT test.b):Int64;N, MAX(test.b):UInt32;N]\n  Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), COUNT(alias1), MAX(alias3)]] [a:UInt32, sum(alias2):UInt64;N, COUNT(alias1):Int64;N, MAX(alias3):UInt32;N]\n    Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

        assert_optimized_plan_equal(plan, expected)
    }

@edmondop
Copy link
Contributor Author

There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place

This is a subset of the new plan

aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

while this is the subset from the previous plan

Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

there is an alias3:UInt64 that gets added

@jayzhan211
Copy link
Contributor

There seems to be a column added to the Aggregate node in the logical plan, can that affect performance and/or memory footprint? This was the reason why I didn't update the test in the first place

This is a subset of the new plan

aggr=[[sum(test.c) AS alias2, MAX(test.b) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

while this is the subset from the previous plan

Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2]] [a:UInt32, alias1:UInt32, alias2:UInt64;N]\n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]"

there is an alias3:UInt64 that gets added

Remove the Min/Max matching in is_single_distinct_agg and the alias is removed

match expr {
Expr::AggregateFunction(ref fun) => {
let fn_name = fun.func_def.name().to_lowercase();
if fun.distinct && WORKSPACE_ROOT_LOCK.get_or_init(|| vec!["min".to_string(), "max".to_string()]).contains(&fn_name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need oncelock here? I think or is enough 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I move this anyways in a different PR so we can discuss that separately?

Copy link
Contributor

@jayzhan211 jayzhan211 Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I suggest we move eliminate distinct in another PR, maybe there is better idea from other reviewer

alamb
alamb previously approved these changes Jun 27, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @edmondop -- I took a look at this PR and I think in general it is quite close.

It needs:

  1. to remove the old min/max implementation in https://github.com/apache/datafusion/blob/5bb6b356277ea1c6f1d7af64e2d66f005d7e1ed4/datafusion/physical-expr/src/aggregate/min_max.rs
  2. resolve some merge conflicts

There is also a follow on issue / PR I would like to make regarding the optimizer check

Given this PR has hung out for a while and has some merge conflicts now I am going to try and help polish it up

impl Min {
pub fn new() -> Self {
Self {
aliases: vec!["count".to_string()],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure min should have a "count" alias 🤔

impl Max {
pub fn new() -> Self {
Self {
aliases: vec!["count".to_string()],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise here, "count" doesn't seem right

@@ -173,6 +173,23 @@ fn take_optimizable_column_and_table_count(
None
}

fn unwrap_min(agg_expr: &dyn AggregateExpr) -> Option<&AggregateFunctionExpr> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of a follow on PR, I think we should change this optimizer to use some method on AggregateFunctionExpr rather than the name

So something like

impl AggregateFunctionExpr {

 /// Return the value of the aggregate function if there are no rows in
 /// 
 /// If the optimizer knows the input to an aggregation operation has
 /// no rows then it will replace the aggregation with a constant
 /// 
 /// If the value for 0 input rows is not known, returns None (the default)
 fn zero_row_value(&self) -> Option<ScalarValue> { None }
...
}

We can do this as a follow on PR

@alamb alamb changed the title Moving min and max to new API and removing from protobuf Moving min and max to user defined aggregate function Jun 27, 2024
@edmondop
Copy link
Contributor Author

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

@alamb alamb dismissed their stale review June 27, 2024 20:58

Working out the duffs

@alamb
Copy link
Contributor

alamb commented Jun 27, 2024

Now that I have spent some more time working with this PR I see it still needs some additional work -- sorry for the noise

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

I started with merging up from main and resolving the conflicts: edmondop#1

Once that is merged / ready I think we could keep hacking at this PR together

Alternately, we could potentially make some smaller PRs to remove the barriers / unblock this one -- for example we could remove the direct use of the Min/Max PhysicalExprs

For example in #11013 (comment)

As well as here:

pub fn get_minmax_desc(&self) -> Option<(Field, bool)> {
let agg_expr = self.aggr_expr.iter().exactly_one().ok()?;
if let Some(max) = agg_expr.as_any().downcast_ref::<Max>() {
Some((max.field().ok()?, true))
} else if let Some(min) = agg_expr.as_any().downcast_ref::<Min>() {
Some((min.field().ok()?, false))
} else {
None
}
}

If you are interested, I can file tickets explaining how those smaller tasks

@alamb alamb changed the title Moving min and max to user defined aggregate function Move min and max to user defined aggregate function Jun 27, 2024
@edmondop
Copy link
Contributor Author

Now that I have spent some more time working with this PR I see it still needs some additional work -- sorry for the noise

I think as long as you can explain me how to resolve the current test failure I should be fine. Agree using names for min and max unwrapping is not very robust

I started with merging up from main and resolving the conflicts: edmondop#1

Once that is merged / ready I think we could keep hacking at this PR together

Alternately, we could potentially make some smaller PRs to remove the barriers / unblock this one -- for example we could remove the direct use of the Min/Max PhysicalExprs

For example in #11013 (comment)

As well as here:

pub fn get_minmax_desc(&self) -> Option<(Field, bool)> {
let agg_expr = self.aggr_expr.iter().exactly_one().ok()?;
if let Some(max) = agg_expr.as_any().downcast_ref::<Max>() {
Some((max.field().ok()?, true))
} else if let Some(min) = agg_expr.as_any().downcast_ref::<Min>() {
Some((min.field().ok()?, false))
} else {
None
}
}

If you are interested, I can file tickets explaining how those smaller tasks

Yes 🙏

@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

If you are interested, I can file tickets explaining how those smaller tasks

Yes 🙏

Ok, I filed #11153 and then some starting tasks like this

Task List

Hopefully that helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Convert Min and Max to UDAF
3 participants