Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Id;
import org.apache.doris.common.Pair;
Expand All @@ -44,7 +45,9 @@
import org.apache.doris.nereids.rules.rewrite.MergeProjectable;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
Expand Down Expand Up @@ -821,21 +824,28 @@ protected SplitPredicate predicatesCompensate(
Set<Set<Slot>> requireNoNullableViewSlot = comparisonResult.getViewNoNullableSlot();
// check query is use the null reject slot which view comparison need
if (!requireNoNullableViewSlot.isEmpty()) {
// Required null-reject slots are recorded on the view side. Map query slots to view slots
// before checking whether query predicates or INNER JoinEdges can reject those null rows.
SlotMapping queryToViewMapping = viewToQuerySlotMapping.inverse();
// try to use
boolean valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, queryStructInfo,
viewStructInfo, cascadesContext);
if (!valid) {
Optional<Set<Expression>> queryBasedNullRejectCompensationPredicates =
getQueryBasedNullRejectCompensationPredicates(
requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping,
queryStructInfo, viewStructInfo, viewToQuerySlotMapping, cascadesContext);
if (!queryBasedNullRejectCompensationPredicates.isPresent()) {
queryStructInfo = queryStructInfo.withPredicates(queryStructInfo.getPredicates()
.mergePulledUpPredicates(comparisonResult.getQueryAllPulledUpExpressions()));
valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping,
queryStructInfo, viewStructInfo, cascadesContext);
queryBasedNullRejectCompensationPredicates = getQueryBasedNullRejectCompensationPredicates(
requireNoNullableViewSlot, queryStructInfo.getPredicates().getPulledUpPredicates(),
queryToViewMapping, queryStructInfo, viewStructInfo, viewToQuerySlotMapping, cascadesContext);
}
if (!valid) {
if (!queryBasedNullRejectCompensationPredicates.isPresent()) {
return SplitPredicate.INVALID_INSTANCE;
}
if (!queryBasedNullRejectCompensationPredicates.get().isEmpty()) {
queryStructInfo = queryStructInfo.withPredicates(queryStructInfo.getPredicates()
.mergePulledUpPredicates(queryBasedNullRejectCompensationPredicates.get()));
}
}
// compensate couldNot PulledUp Conjunctions
Map<Expression, ExpressionInfo> couldNotPulledUpCompensateConjunctions =
Expand Down Expand Up @@ -863,55 +873,148 @@ protected SplitPredicate predicatesCompensate(
}

/**
* Check the queryPredicates contains the required nullable slot
* Check whether query-side null-reject evidence covers each required view-side slot set.
*
* <p>The check is view-based because the required null-reject slots come from the MV join graph.
* The returned compensation predicates are query-based because they will be merged into queryStructInfo.
*
* <p>Return meanings:
* Optional.empty(): no valid proof, or no safe output slot can carry the compensation predicate.
* Optional.of(emptySet()): existing query predicates already provide the required null-reject.
* Optional.of(nonEmptySet): INNER JoinEdge proof must be materialized as these IS NOT NULL predicates.
*/
private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
private Optional<Set<Expression>> getQueryBasedNullRejectCompensationPredicates(
Set<Set<Slot>> requireNoNullableViewSlot,
Set<Expression> queryPredicates,
SlotMapping queryToViewMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
SlotMapping viewToQueryMapping,
CascadesContext cascadesContext) {
Set<Slot> queryNullRejectSlots = new HashSet<>();
Set<Slot> predicateNullRejectViewSlots = getViewBasedNullRejectSlots(
getPredicateNullRejectSlots(queryPredicates, cascadesContext), queryToViewMapping, queryStructInfo);
Set<Slot> innerJoinNullRejectViewSlots = getViewBasedNullRejectSlots(
getInnerJoinNullRejectSlots(queryStructInfo, cascadesContext), queryToViewMapping, queryStructInfo);
Set<Slot> allNullRejectViewSlots = new HashSet<>(predicateNullRejectViewSlots);
allNullRejectViewSlots.addAll(innerJoinNullRejectViewSlots);
if (allNullRejectViewSlots.isEmpty()) {
return Optional.empty();
}
Set<Slot> viewOutputSlots = viewStructInfo.getPlanOutputShuttledExpressions().stream()
.filter(Slot.class::isInstance)
.map(Slot.class::cast)
.collect(Collectors.toSet());
Map<SlotReference, SlotReference> viewToQuerySlotReferenceMap = viewToQueryMapping.toSlotReferenceMap();
Set<Expression> compensationPredicates = new HashSet<>();
for (Set<Slot> requiredViewSlots : getShuttledRequireNoNullableViewSlots(
requireNoNullableViewSlot, viewStructInfo)) {
if (Sets.intersection(requiredViewSlots, allNullRejectViewSlots).isEmpty()) {
return Optional.empty();
}
if (!Sets.intersection(requiredViewSlots, predicateNullRejectViewSlots).isEmpty()) {
continue;
}
Optional<Slot> compensationViewSlot = findCompensationViewSlot(
requiredViewSlots, viewOutputSlots, innerJoinNullRejectViewSlots);
if (!compensationViewSlot.isPresent()) {
return Optional.empty();
}
Slot querySlot = viewToQuerySlotReferenceMap.get(compensationViewSlot.get());
if (querySlot == null) {
return Optional.empty();
}
compensationPredicates.add(new Not(new IsNull(querySlot), false));
}
return Optional.of(compensationPredicates);
}

private Set<Slot> getPredicateNullRejectSlots(Set<Expression> queryPredicates, CascadesContext cascadesContext) {
Set<Slot> nullRejectSlots = new HashSet<>();
for (Expression queryPredicate : queryPredicates) {
Optional<Slot> explicitNotNullSlot = TypeUtils.isNotNull(queryPredicate);
explicitNotNullSlot.ifPresent(queryNullRejectSlots::add);
TypeUtils.isNotNull(queryPredicate).ifPresent(nullRejectSlots::add);
}
Set<Expression> queryNullRejectPredicates = ExpressionUtils.inferNotNull(queryPredicates, cascadesContext);
for (Expression queryNullRejectPredicate : queryNullRejectPredicates) {
Optional<Slot> notNullSlot = TypeUtils.isNotNull(queryNullRejectPredicate);
notNullSlot.ifPresent(queryNullRejectSlots::add);
for (Expression inferredNotNull : ExpressionUtils.inferNotNull(queryPredicates, cascadesContext)) {
TypeUtils.isNotNull(inferredNotNull).ifPresent(nullRejectSlots::add);
}
return nullRejectSlots;
}

private Set<Slot> getInnerJoinNullRejectSlots(StructInfo queryStructInfo, CascadesContext cascadesContext) {
Set<Slot> nullRejectSlots = new HashSet<>();
// INNER JOIN conditions guarantee NOT NULL on join-key slots.
// After EliminateOuterJoin converts LEFTINNER, the JoinEdge objects in the HyperGraph
// After EliminateOuterJoin converts LEFT to INNER, the JoinEdge objects in the HyperGraph
// retain the INNER type even though EliminateNotNull removes filter-level NOT NULL predicates.
for (JoinEdge joinEdge : queryStructInfo.getHyperGraph().getJoinEdges()) {
if (joinEdge.getJoinType().isInnerJoin()) {
queryNullRejectSlots.addAll(ExpressionUtils.inferNotNullSlots(
nullRejectSlots.addAll(ExpressionUtils.inferNotNullSlots(
ImmutableSet.copyOf(joinEdge.getExpressions()), cascadesContext));
}
}
if (queryNullRejectSlots.isEmpty()) {
return false;
return nullRejectSlots;
}

private Set<Slot> getViewBasedNullRejectSlots(Set<Slot> queryNullRejectSlots,
SlotMapping queryToViewMapping, StructInfo queryStructInfo) {
Set<Slot> viewBasedSlots = new HashSet<>();
for (Slot queryNullRejectSlot : queryNullRejectSlots) {
Expression shuttledQuerySlot = ExpressionUtils.shuttleExpressionWithLineage(
queryNullRejectSlot, queryStructInfo.getTopPlan());
if (!(shuttledQuerySlot instanceof Slot)) {
continue;
}
Expression viewSlot = ExpressionUtils.replace(shuttledQuerySlot,
queryToViewMapping.toSlotReferenceMap());
if (viewSlot instanceof Slot) {
viewBasedSlots.add((Slot) viewSlot);
}
}
Set<Slot> queryUsedNeedRejectNullSlotsViewBased = ExpressionUtils.shuttleExpressionWithLineage(
new ArrayList<>(queryNullRejectSlots), queryStructInfo.getTopPlan()).stream()
.filter(Slot.class::isInstance)
.map(Slot.class::cast)
.map(slot -> ExpressionUtils.replace(slot, queryToViewMapping.toSlotReferenceMap()))
.filter(Slot.class::isInstance)
.map(Slot.class::cast)
.collect(Collectors.toSet());
// view slot need shuttle to use table slot, avoid alias influence
return viewBasedSlots;
}

private Set<Set<Slot>> getShuttledRequireNoNullableViewSlots(Set<Set<Slot>> requireNoNullableViewSlot,
StructInfo viewStructInfo) {
Set<Set<Slot>> shuttledRequireNoNullableViewSlot = new HashSet<>();
for (Set<Slot> requireNullableSlots : requireNoNullableViewSlot) {
shuttledRequireNoNullableViewSlot.add(
ExpressionUtils.shuttleExpressionWithLineage(new ArrayList<>(requireNullableSlots),
viewStructInfo.getTopPlan()).stream().map(Slot.class::cast)
.collect(Collectors.toSet()));
}
// query pulledUp predicates should have null reject predicates and contains any require noNullable slot
return shuttledRequireNoNullableViewSlot.stream().noneMatch(viewRequiredNullSlotSet ->
Sets.intersection(viewRequiredNullSlotSet, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
return shuttledRequireNoNullableViewSlot;
}

private Optional<Slot> findCompensationViewSlot(Set<Slot> requiredViewSlots, Set<Slot> viewOutputSlots,
Set<Slot> innerJoinNullRejectViewSlots) {
Set<Slot> outputRequiredSlots = Sets.intersection(requiredViewSlots, viewOutputSlots);
Optional<Slot> compensationViewSlot = outputRequiredSlots.stream()
.filter(innerJoinNullRejectViewSlots::contains)
.findFirst();
if (compensationViewSlot.isPresent()) {
return compensationViewSlot;
}
return outputRequiredSlots.stream()
.filter(slot -> isOriginalNonNullableSlotOnInnerJoinProofTable(slot, innerJoinNullRejectViewSlots))
.findFirst();
}

private boolean isOriginalNonNullableSlotOnInnerJoinProofTable(Slot slot, Set<Slot> innerJoinNullRejectViewSlots) {
if (!(slot instanceof SlotReference)) {
return false;
}
SlotReference slotReference = (SlotReference) slot;
if (!slotReference.getOriginalColumn().map(column -> !column.isAllowNull()).orElse(!slot.nullable())) {
return false;
}
Optional<TableIf> originalTable = slotReference.getOriginalTable();
if (!originalTable.isPresent()) {
return false;
}
return innerJoinNullRejectViewSlots.stream()
.filter(SlotReference.class::isInstance)
.map(SlotReference.class::cast)
.map(SlotReference::getOriginalTable)
.anyMatch(referenceTable -> referenceTable.isPresent()
&& referenceTable.get().equals(originalTable.get()));
}

/**
Expand Down
Loading
Loading