Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.doris.mtmv;

import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TimestampArithmeticExpr;
import org.apache.doris.common.AnalysisException;

import java.util.List;

/**
* MTMV Partition Expr Factory
*/
Expand All @@ -32,6 +36,14 @@ public static MTMVPartitionExprService getExprService(Expr expr) throws Analysis
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
String fnName = functionCallExpr.getFnName().getFunction().toLowerCase();
if ("date_trunc".equals(fnName)) {
List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
Expr dateTruncArg = paramsExprs.size() >= 1 ? paramsExprs.get(0) : null;
while (dateTruncArg instanceof CastExpr) {
dateTruncArg = dateTruncArg.getChild(0);
}
if (paramsExprs.size() == 2 && dateTruncArg instanceof TimestampArithmeticExpr) {
return new MTMVPartitionExprDateTruncDateAddSub(functionCallExpr);
}
return new MTMVPartitionExprDateTrunc(functionCallExpr);
}
throw new AnalysisException("async materialized view partition not support function name: " + fnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.AnalysisException;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -50,6 +52,19 @@ PartitionKeyDesc generateRollUpPartitionKeyDesc(
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelatedTableIf pctTable)
throws AnalysisException;

/**
* For range partition, a single base range partition may overlap multiple roll-up buckets
* (e.g. {@code date_trunc(date_add(col, INTERVAL 3 HOUR), 'day')} on UTC-midnight base partitions),
* so return all roll-up PartitionKeyDesc values that should be associated with the input range.
*
* <p>Default implementation keeps the historical 1-to-1 behavior.
*/
default List<PartitionKeyDesc> generateRollUpPartitionKeyDescs(
PartitionKeyDesc partitionKeyDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelatedTableIf pctTable)
throws AnalysisException {
return Collections.singletonList(generateRollUpPartitionKeyDesc(partitionKeyDesc, mvPartitionInfo, pctTable));
}

/**
* Check if user input is legal
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ public Map<PartitionKeyDesc, Set<String>> rollUpRange(Map<PartitionKeyDesc, Set<
Map<PartitionKeyDesc, Set<String>> result = Maps.newHashMap();
MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr());
for (Entry<PartitionKeyDesc, Set<String>> entry : relatedPartitionDescs.entrySet()) {
PartitionKeyDesc rollUpDesc = exprSerice.generateRollUpPartitionKeyDesc(entry.getKey(), mvPartitionInfo,
pctTable);
result.computeIfAbsent(rollUpDesc, k -> Sets.newHashSet()).addAll(entry.getValue());
List<PartitionKeyDesc> rollUpDescs = exprSerice.generateRollUpPartitionKeyDescs(entry.getKey(),
mvPartitionInfo, pctTable);
for (PartitionKeyDesc rollUpDesc : rollUpDescs) {
result.computeIfAbsent(rollUpDesc, k -> Sets.newHashSet()).addAll(entry.getValue());
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursAdd;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursSub;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.JoinType;
Expand Down Expand Up @@ -91,7 +94,8 @@ public static class PartitionIncrementChecker extends
DefaultPlanVisitor<Void, PartitionIncrementCheckContext> {
public static final PartitionIncrementChecker INSTANCE = new PartitionIncrementChecker();
public static final Set<Class<? extends Expression>> SUPPORT_EXPRESSION_TYPES =
ImmutableSet.of(DateTrunc.class, SlotReference.class, Literal.class);
ImmutableSet.of(DateTrunc.class, SlotReference.class, Literal.class, Cast.class,
HoursAdd.class, HoursSub.class);

@Override
public Void visitLogicalProject(LogicalProject<? extends Plan> project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionExprDateTruncDateAddSub;
import org.apache.doris.mtmv.MTMVPartitionExprFactory;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
Expand All @@ -44,7 +46,10 @@
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursAdd;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HoursSub;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
Expand Down Expand Up @@ -136,10 +141,36 @@ private static Map<TableIf, Set<Expression>> constructTableWithPredicates(MTMV m
PartitionItem partitionItem = mv.getPartitionItemOrAnalysisException(partitionName);
items.add(partitionItem);
}
// Detect hour-offset partition expression (date_trunc(date_add/sub(col, N), unit)).
// For these expressions, the predicate on the base column must be built against
// date_add/sub(col, N) rather than the raw column, so that the WHERE clause correctly
// maps the MV partition boundary back to the base table range.
long hourOffset = 0;
try {
if (mv.getMvPartitionInfo() != null && mv.getMvPartitionInfo().getExpr() != null) {
MTMVPartitionExprFactory.getExprService(mv.getMvPartitionInfo().getExpr());
Object service = MTMVPartitionExprFactory.getExprService(mv.getMvPartitionInfo().getExpr());
if (service instanceof MTMVPartitionExprDateTruncDateAddSub) {
hourOffset = ((MTMVPartitionExprDateTruncDateAddSub) service).getOffsetHours();
}
}
} catch (AnalysisException ignored) {
// fall back to no-offset behaviour
}
final long finalHourOffset = hourOffset;
ImmutableMap.Builder<TableIf, Set<Expression>> builder = new ImmutableMap.Builder<>();
tableWithPartKey.forEach((table, colName) ->
builder.put(table, constructPredicates(items, colName))
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableWithPartKey only preserves the raw base-column name (BaseColInfo.colName). That is enough for date_trunc(date_add(k2, ...), ...), but not for the casted form this PR explicitly supports: date_trunc(date_add(cast(k2 as date), INTERVAL 3 HOUR), 'day').

In that case this rebuilds the refresh predicate as hours_add(k2, 3) instead of hours_add(cast(k2 as date), 3), so a partial refresh reads a different base-table range than the MV partition definition. The create-time/unit tests cover expression preservation, but refresh is still wrong for that supported shape.

tableWithPartKey.forEach((table, colName) -> {
UnboundSlot slot = new UnboundSlot(colName);
if (finalHourOffset != 0) {
// Build predicate on date_add/sub(col, N) so partition pruning reflects the offset.
Expression adjustedExpr = finalHourOffset > 0
? new HoursAdd(slot, new IntegerLiteral((int) finalHourOffset))
: new HoursSub(slot, new IntegerLiteral((int) -finalHourOffset));
builder.put(table, constructPredicates(items, adjustedExpr));
} else {
builder.put(table, constructPredicates(items, slot));
}
});
return builder.build();
}

Expand All @@ -159,17 +190,27 @@ public static Set<Expression> constructPredicates(Set<PartitionItem> partitions,
*/
@VisibleForTesting
public static Set<Expression> constructPredicates(Set<PartitionItem> partitions, Slot colSlot) {
return constructPredicates(partitions, (Expression) colSlot);
}

/**
* construct predicates for partition items using an arbitrary expression as the predicate target.
* This overload supports hour-offset partition expressions where the predicate must be built
* against e.g. {@code hours_add(col, N)} rather than the raw column slot.
*/
@VisibleForTesting
public static Set<Expression> constructPredicates(Set<PartitionItem> partitions, Expression colExpr) {
Set<Expression> predicates = new HashSet<>();
if (partitions.isEmpty()) {
return Sets.newHashSet(BooleanLiteral.TRUE);
}
if (partitions.iterator().next() instanceof ListPartitionItem) {
for (PartitionItem item : partitions) {
predicates.add(convertListPartitionToIn(item, colSlot));
predicates.add(convertListPartitionToIn(item, colExpr));
}
} else {
for (PartitionItem item : partitions) {
predicates.add(convertRangePartitionToCompare(item, colSlot));
predicates.add(convertRangePartitionToCompare(item, colExpr));
}
}
return predicates;
Expand All @@ -180,7 +221,7 @@ private static Expression convertPartitionKeyToLiteral(PartitionKey key) {
Type.fromPrimitiveType(key.getTypes().get(0)));
}

private static Expression convertListPartitionToIn(PartitionItem item, Slot col) {
private static Expression convertListPartitionToIn(PartitionItem item, Expression col) {
List<Expression> inValues = ((ListPartitionItem) item).getItems().stream()
.map(UpdateMvByPartitionCommand::convertPartitionKeyToLiteral)
.collect(ImmutableList.toImmutableList());
Expand All @@ -201,7 +242,7 @@ private static Expression convertListPartitionToIn(PartitionItem item, Slot col)
return ExpressionUtils.or(predicates);
}

private static Expression convertRangePartitionToCompare(PartitionItem item, Slot col) {
private static Expression convertRangePartitionToCompare(PartitionItem item, Expression col) {
Range<PartitionKey> range = item.getItems();
List<Expression> expressions = new ArrayList<>();
if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) {
Expand Down
Loading
Loading