Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate reduce aggregations #3881

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/transform/src/nonnullable.rs
Expand Up @@ -125,7 +125,7 @@ fn aggregate_nonnullable(expr: &mut AggregateExpr, metadata: &RelationType) {
if let (AggregateFunc::Count, ScalarExpr::Column(c)) = (&expr.func, &expr.expr) {
if !metadata.column_types[*c].nullable && !expr.distinct {
expr.expr =
ScalarExpr::literal_ok(Datum::True, ColumnType::new(ScalarType::Bool, true));
ScalarExpr::literal_ok(Datum::True, ColumnType::new(ScalarType::Bool, false));
}
}
}
31 changes: 30 additions & 1 deletion src/transform/src/projection_extraction.rs
Expand Up @@ -11,7 +11,8 @@

use crate::{RelationExpr, ScalarExpr, TransformArgs};

/// Transform column references in a `Map` into a `Project`.
/// Transform column references in a `Map` into a `Project`, or repeated
/// aggregations in a `Reduce` into a `Project`.
#[derive(Debug)]
pub struct ProjectionExtraction;

Expand Down Expand Up @@ -61,6 +62,34 @@ impl ProjectionExtraction {
*relation = relation.take_dangerous().project(outputs);
}
}
} else if let RelationExpr::Reduce {
input: _,
group_key,
aggregates,
} = relation
{
// If any entry of aggregates exists earlier in aggregates, we can remove it
// and replace it with a projection that points to the first instance of it.
let mut projection = Vec::new();
projection.extend(0..group_key.len());
let mut finger = 0;
let mut must_project = false;
while finger < aggregates.len() {
if let Some(position) = aggregates[..finger]
.iter()
.position(|x| x == &aggregates[finger])
{
projection.push(group_key.len() + position);
aggregates.remove(finger);
must_project = true;
} else {
projection.push(group_key.len() + finger);
finger += 1;
}
}
if must_project {
*relation = relation.take_dangerous().project(projection);
}
}
}
}
6 changes: 3 additions & 3 deletions test/sqllogictest/tpch.slt
Expand Up @@ -161,9 +161,9 @@ ORDER BY
%0 =
| Get materialize.public.lineitem (u21)
| Filter (#10 <= 1998-12-01)
| Reduce group=(#8, #9) sum(#4) sum(#5) sum((#5 * (100dec - #6))) sum(((#5 * (100dec - #6)) * (100dec + #7))) count(true) count(true) sum(#6) count(true) count(true)
| Map (((#2 * 10000000dec) / i64todec(if (#6 = 0) then {null} else {#6})) / 10dec), (((#3 * 10000000dec) / i64todec(if (#7 = 0) then {null} else {#7})) / 10dec), (((#8 * 10000000dec) / i64todec(if (#9 = 0) then {null} else {#9})) / 10dec)
| Project (#0..#5, #11..#13, #10)
| Reduce group=(#8, #9) sum(#4) sum(#5) sum((#5 * (100dec - #6))) sum(((#5 * (100dec - #6)) * (100dec + #7))) count(true) sum(#6)
| Map i64todec(if (#6 = 0) then {null} else {#6}), (((#2 * 10000000dec) / #8) / 10dec), (((#3 * 10000000dec) / #8) / 10dec), (((#7 * 10000000dec) / #8) / 10dec)
| Project (#0..#5, #9..#11, #6)

Finish order_by=(#0 asc, #1 asc) limit=none offset=0 project=(#0..#9)

Expand Down