Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,29 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.rules.exploration.mv.rollup.AggFunctionRollUpHandler;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.util.ExpressionUtils;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -96,6 +106,9 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf
viewToQuerySlotMapping, tempRewrittenPlan.treeString()));
return null;
}
Map<Expression, Expression> mvExprToMvScanExprQueryBased =
materializationContext.getShuttledExprToScanExprMapping().keyPermute(viewToQuerySlotMapping)
.flattenMap().get(0);
// Rewrite top projects, represent the query projects by view
List<Expression> expressionsRewritten = rewriteExpression(
queryStructInfo.getExpressions(),
Expand All @@ -104,20 +117,141 @@ protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInf
viewToQuerySlotMapping,
ImmutableMap.of(), cascadesContext
);
// Can not rewrite, bail out
// If generic rewrite fails, try roll up from query expressions.
if (expressionsRewritten.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"Rewrite expressions by view in window scan fail",
() -> String.format("expressionToRewritten is %s,\n mvExprToMvScanExprMapping is %s,\n"
+ "targetToSourceMapping = %s", queryStructInfo.getExpressions(),
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping));
return null;
expressionsRewritten = rollupWindowAggregateFunctions(queryStructInfo.getExpressions(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this fallback actually be reached for the documented case? Before rewriteQueryByView(), the MV rewrite flow calls StructInfo.isGraphLogicalEquals(), and window nodes appear to be compared as full
WindowExpression objects. A query window using xx_merge(...) and an MV window using xx_union(...) may be rejected before this fallback runs. Please add a regression test proving the PR example is
rewritten, or handle this equivalence during graph comparison.

queryStructInfo.getTopPlan(), mvExprToMvScanExprQueryBased, true, false);
if (expressionsRewritten.isEmpty()) {
materializationContext.recordFailReason(queryStructInfo,
"Rewrite expressions by view in window scan fail",
() -> String.format("expressionToRewritten is %s,\n mvExprToMvScanExprMapping is %s,\n"
+ "targetToSourceMapping = %s", queryStructInfo.getExpressions(),
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping));
return null;
}
}
return new LogicalProject<>(
expressionsRewritten.stream()
.map(expression -> expression instanceof NamedExpression ? expression : new Alias(expression))
.map(NamedExpression.class::cast)
.collect(Collectors.toList()), tempRewrittenPlan);
}

private static List<Expression> rollupWindowAggregateFunctions(List<? extends Expression> expressions,
Plan queryTopPlan, Map<Expression, Expression> mvExprToMvScanExprQueryBased,
boolean needShuttle, boolean strictSlotRewrite) {
WindowAggregateRollupContext context = new WindowAggregateRollupContext(queryTopPlan,
mvExprToMvScanExprQueryBased, strictSlotRewrite);
List<? extends Expression> inputExpressions = needShuttle
? ExpressionUtils.shuttleExpressionWithLineage(expressions, queryTopPlan)
: expressions;
List<Expression> rewrittenExpressions = inputExpressions.stream()
.map(expression -> expression.accept(WindowAggregateRollupRewriter.INSTANCE, context))
.collect(Collectors.toList());
return context.isValid() ? rewrittenExpressions : ImmutableList.of();
}

private static Function rollupWindowAggregateFunction(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled, Map<Expression, Expression> mvExprToMvScanExprQueryBased) {
for (Map.Entry<Expression, Expression> expressionEntry : mvExprToMvScanExprQueryBased.entrySet()) {
Expression viewExpression = expressionEntry.getKey();
// Window mapping keys may be full WindowExpression while rollup handlers match aggregate functions.
if (viewExpression instanceof WindowExpression) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the WindowExpression wrapper here removes the partition/order/frame information from the rollup check. The aggregate rollup handlers only validate function compatibility, so a view expression
with the same aggregate function but a different window spec could be selected incorrectly if graph comparison is relaxed for this feature. Please explicitly verify the query and MV window specs match
before using the MV window column.

viewExpression = ((WindowExpression) viewExpression).getFunction();
}
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair = Pair.of(viewExpression,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: this strips the candidate MV output from a full WindowExpression down to just its aggregate function before choosing expressionEntry.getValue(). If the MV has two window outputs like xx_union(...) OVER (PARTITION BY a ORDER BY dt) and xx_union(...) OVER (PARTITION BY b ORDER BY dt), both candidates collapse to the same xx_union(...) here, so the rewriter can bind the query to the wrong MV column. The window spec needs to stay part of the match, or you need an explicit equality check for partition keys, order keys, and frame before using the candidate slot.

expressionEntry.getValue());
for (AggFunctionRollUpHandler rollUpHandler : AbstractMaterializedViewAggregateRule.ROLL_UP_HANDLERS) {
if (!rollUpHandler.canRollup(queryAggregateFunction, queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair, mvExprToMvScanExprQueryBased)) {
continue;
}
Function rollupFunction = rollUpHandler.doRollup(queryAggregateFunction,
queryAggregateFunctionShuttled, mvExprToMvScanExprQueryBasedPair,
mvExprToMvScanExprQueryBased);
if (rollupFunction != null) {
return rollupFunction;
}
}
}
return null;
}

private static class WindowAggregateRollupRewriter
extends DefaultExpressionRewriter<WindowAggregateRollupContext> {

private static final WindowAggregateRollupRewriter INSTANCE = new WindowAggregateRollupRewriter();

@Override
public Expression visitWindow(WindowExpression windowExpression, WindowAggregateRollupContext context) {
if (!context.isValid()) {
return windowExpression;
}
Expression rewrittenWindowExpr = context.getMvExprToMvScanExprQueryBased().get(windowExpression);
if (rewrittenWindowExpr != null) {
return rewrittenWindowExpr;
}
Expression function = windowExpression.getFunction();
if (!(function instanceof AggregateFunction)) {
return super.visitWindow(windowExpression, context);
}
Expression queryFunctionShuttled = ExpressionUtils.shuttleExpressionWithLineage(function,
context.getQueryTopPlan());
Function rewrittenFunction = rollupWindowAggregateFunction((AggregateFunction) function,
queryFunctionShuttled, context.getMvExprToMvScanExprQueryBased());
if (rewrittenFunction == null) {
context.setValid(false);
return windowExpression;
}
return super.visitWindow(windowExpression.withFunction(rewrittenFunction), context);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to keep the expression as a WindowExpression after rollup. The PR description says the rewrite result should be xx_merge(mv_col), but this code builds xx_merge(mv_col) OVER (...). That can
compute a new window over the already-windowed MV column instead of using the precomputed MV result directly, which may double-aggregate state values. Should this return the rolled-up scalar function
instead of windowExpression.withFunction(...)?

}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: rewrittenFunction is built on top of expressionEntry.getValue(), which in this new path is the MV's window-result slot. Wrapping it back into windowExpression.withFunction(rewrittenFunction) therefore re-applies the same OVER (...) to an already-windowed value. That only happens to be harmless for idempotent set-style states such as HLL; for non-idempotent combinators like sum_union / sum_merge it changes the result. The rewrite described in the PR body should collapse to the scalar xx_merge(mv_col) once you've selected the MV window column, not xx_merge(mv_col) OVER (...).

@Override
public Expression visitSlot(Slot slot, WindowAggregateRollupContext context) {
if (!context.isValid()) {
return slot;
}
Expression rewritten = context.getMvExprToMvScanExprQueryBased().get(slot);
if (rewritten == null && context.isStrictSlotRewrite()) {
context.setValid(false);
return slot;
}
return rewritten == null ? slot : rewritten;
}
}

private static class WindowAggregateRollupContext {
private boolean valid = true;
private final Plan queryTopPlan;
private final Map<Expression, Expression> mvExprToMvScanExprQueryBased;
private final boolean strictSlotRewrite;

private WindowAggregateRollupContext(Plan queryTopPlan,
Map<Expression, Expression> mvExprToMvScanExprQueryBased, boolean strictSlotRewrite) {
this.queryTopPlan = queryTopPlan;
this.mvExprToMvScanExprQueryBased = mvExprToMvScanExprQueryBased;
this.strictSlotRewrite = strictSlotRewrite;
}

public boolean isValid() {
return valid;
}

public void setValid(boolean valid) {
this.valid = valid;
}

public Plan getQueryTopPlan() {
return queryTopPlan;
}

public Map<Expression, Expression> getMvExprToMvScanExprQueryBased() {
return mvExprToMvScanExprQueryBased;
}

public boolean isStrictSlotRewrite() {
return strictSlotRewrite;
}
}
}
Loading