Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.TableId;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -156,7 +157,7 @@ public enum TableFrom {
private final Map<CTEId, Set<LogicalCTEConsumer>> cteIdToConsumers = new HashMap<>();
private final Map<CTEId, Set<Slot>> cteIdToOutputIds = new HashMap<>();

private final Map<CTEId, Statistics> cteIdToProducerStats = new HashMap<>();
private final Map<CTEId, LogicalCTEProducer<? extends Plan>> cteIdToProducer = new HashMap<>();

private final Map<RelationId, Set<Expression>> consumerIdToFilters = new HashMap<>();
// Used to update consumer's stats
Expand Down Expand Up @@ -638,7 +639,7 @@ public CteEnvironmentSnapshot cacheCteEnvironment() {
return new CteEnvironmentSnapshot(
copyMapOfSets(cteIdToConsumers),
copyMapOfSets(cteIdToOutputIds),
new HashMap<>(cteIdToProducerStats),
new HashMap<>(cteIdToProducer),
copyMapOfSets(consumerIdToFilters),
copyMapOfLists(cteIdToConsumerGroup),
new HashMap<>(rewrittenCteProducer),
Expand All @@ -653,8 +654,8 @@ public void restoreCteEnvironment(CteEnvironmentSnapshot snapshot) {
cteIdToOutputIds.clear();
cteIdToOutputIds.putAll(snapshot.cteIdToOutputIds);

cteIdToProducerStats.clear();
cteIdToProducerStats.putAll(snapshot.cteIdToProducerStats);
cteIdToProducer.clear();
cteIdToProducer.putAll(snapshot.cteIdToProducer);

consumerIdToFilters.clear();
consumerIdToFilters.putAll(snapshot.consumerIdToFilters);
Expand Down Expand Up @@ -689,7 +690,7 @@ private static <K, V> Map<K, List<V>> copyMapOfLists(Map<K, List<V>> source) {
public static class CteEnvironmentSnapshot {
private final Map<CTEId, Set<LogicalCTEConsumer>> cteIdToConsumers;
private final Map<CTEId, Set<Slot>> cteIdToOutputIds;
private final Map<CTEId, Statistics> cteIdToProducerStats;
private final Map<CTEId, LogicalCTEProducer<? extends Plan>> cteIdToProducer;
private final Map<RelationId, Set<Expression>> consumerIdToFilters;
private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> cteIdToConsumerGroup;
private final Map<CTEId, LogicalPlan> rewrittenCteProducer;
Expand All @@ -701,14 +702,14 @@ public static class CteEnvironmentSnapshot {
public CteEnvironmentSnapshot(
Map<CTEId, Set<LogicalCTEConsumer>> cteIdToConsumers,
Map<CTEId, Set<Slot>> cteIdToOutputIds,
Map<CTEId, Statistics> cteIdToProducerStats,
Map<CTEId, LogicalCTEProducer<? extends Plan>> cteIdToProducer,
Map<RelationId, Set<Expression>> consumerIdToFilters,
Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> cteIdToConsumerGroup,
Map<CTEId, LogicalPlan> rewrittenCteProducer,
Map<CTEId, LogicalPlan> rewrittenCteConsumer) {
this.cteIdToConsumers = cteIdToConsumers;
this.cteIdToOutputIds = cteIdToOutputIds;
this.cteIdToProducerStats = cteIdToProducerStats;
this.cteIdToProducer = cteIdToProducer;
this.consumerIdToFilters = consumerIdToFilters;
this.cteIdToConsumerGroup = cteIdToConsumerGroup;
this.rewrittenCteProducer = rewrittenCteProducer;
Expand Down Expand Up @@ -1111,12 +1112,12 @@ public boolean isPrepareStage() {
return prepareStage;
}

public Statistics getProducerStatsByCteId(CTEId id) {
return cteIdToProducerStats.get(id);
public LogicalCTEProducer<? extends Plan> getCteProducerByCteId(CTEId id) {
return cteIdToProducer.get(id);
}

public void setProducerStats(CTEId id, Statistics stats) {
cteIdToProducerStats.put(id, stats);
public void setCteProducer(CTEId id, LogicalCTEProducer<? extends Plan> producer) {
cteIdToProducer.put(id, producer);
}

public void setIsInsert(boolean isInsert) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ private Pair<CTEContext, List<LogicalCTEProducer<Plan>>> analyzeCte(
aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx);
outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias));
LogicalCTEProducer<Plan> cteProducer = new LogicalCTEProducer<>(cteId, logicalSubQueryAlias);
cascadesContext.getStatementContext().setCteProducer(cteId, cteProducer);
cteProducerPlans.add(cteProducer);
}
return Pair.of(outerCteCtx, cteProducerPlans);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, O
.deepCopy((LogicalPlan) join.left(), new DeepCopierContext());
LogicalCTEProducer<? extends Plan> leftProducer = new LogicalCTEProducer<>(
ctx.statementContext.getNextCTEId(), leftClone);
ctx.statementContext.setCteProducer(leftProducer.getCteId(), leftProducer);
LogicalPlan rightClone = LogicalPlanDeepCopier.INSTANCE
.deepCopy((LogicalPlan) join.right(), new DeepCopierContext());
LogicalCTEProducer<? extends Plan> rightProducer = new LogicalCTEProducer<>(
ctx.statementContext.getNextCTEId(), rightClone);
ctx.statementContext.setCteProducer(rightProducer.getCteId(), rightProducer);
Map<Slot, Slot> leftCloneToLeft = new HashMap<>();
for (int i = 0; i < leftClone.getOutput().size(); i++) {
leftCloneToLeft.put(leftClone.getOutput().get(i), (join.left()).getOutput().get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@
import java.util.stream.Collectors;

/**
* rewrite CteAnchor consumer side and producer side recursively, all CteAnchor must at top of the plan
* rewrite CteAnchor consumer side and producer side recursively, all CteAnchor
* must at top of the plan
*/
@DependsRules({PullUpCteAnchor.class, CTEInline.class})
@DependsRules({ PullUpCteAnchor.class, CTEInline.class })
public class RewriteCteChildren extends DefaultPlanRewriter<CascadesContext> implements CustomRewriter {

private final List<RewriteJob> jobs;
Expand Down Expand Up @@ -127,8 +128,8 @@ public Plan visitLogicalCTEProducer(LogicalCTEProducer<? extends Plan> cteProduc
Set<Slot> producerOutputs = cascadesContext.getStatementContext()
.getCteIdToOutputIds().get(cteProducer.getCteId());
if (producerOutputs.size() < child.getOutput().size()) {
ImmutableList.Builder<NamedExpression> projectsBuilder
= ImmutableList.builderWithExpectedSize(producerOutputs.size());
ImmutableList.Builder<NamedExpression> projectsBuilder = ImmutableList
.builderWithExpectedSize(producerOutputs.size());
for (Slot slot : child.getOutput()) {
if (producerOutputs.contains(slot)) {
projectsBuilder.add(slot);
Expand All @@ -147,7 +148,10 @@ public Plan visitLogicalCTEProducer(LogicalCTEProducer<? extends Plan> cteProduc
cascadesContext.addPlanProcesses(rewrittenCtx.getPlanProcesses());
cascadesContext.getStatementContext().getRewrittenCteProducer().put(cteProducer.getCteId(), child);
}
return cteProducer.withChildren(child);
LogicalCTEProducer<? extends Plan> rewrittenProducer = (LogicalCTEProducer<? extends Plan>) cteProducer
.withChildren(child);
cascadesContext.getStatementContext().setCteProducer(rewrittenProducer.getCteId(), rewrittenProducer);
return rewrittenProducer;
}

private LogicalPlan pushPlanUnderAnchor(LogicalPlan plan) {
Expand All @@ -160,13 +164,16 @@ private LogicalPlan pushPlanUnderAnchor(LogicalPlan plan) {
}

/*
* An expression can only be pushed down if it has filter expressions on all consumers that reference the slot.
* For example, let's assume a producer has two consumers, consumer1 and consumer2:
* An expression can only be pushed down if it has filter expressions on all
* consumers that reference the slot.
* For example, let's assume a producer has two consumers, consumer1 and
* consumer2:
*
* filter(a > 5 and b < 1) -> consumer1
* filter(a < 8) -> consumer2
*
* In this case, the only expression that can be pushed down to the producer is filter(a > 5 or a < 8).
* In this case, the only expression that can be pushed down to the producer is
* filter(a > 5 or a < 8).
*/
private LogicalPlan tryToConstructFilter(CascadesContext cascadesContext, CTEId cteId, LogicalPlan child) {
Set<RelationId> consumerIds = cascadesContext.getCteIdToConsumers().get(cteId).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.nereids.rules.rewrite.DistinctAggStrategySelector.DistinctSelectorContext;
import org.apache.doris.nereids.rules.rewrite.StatsDerive.DeriveContext;
import org.apache.doris.nereids.trees.copier.DeepCopierContext;
import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
import org.apache.doris.nereids.trees.expressions.Alias;
Expand Down Expand Up @@ -61,8 +60,7 @@ public static Plan rewrite(LogicalAggregate<? extends Plan> agg, DistinctSelecto
LogicalCTEProducer<Plan> producer = new LogicalCTEProducer<>(ctx.statementContext.getNextCTEId(),
cloneAgg.child());
ctx.cteProducerList.add(producer);
StatsDerive derive = new StatsDerive(false);
producer.accept(derive, new DeriveContext());
ctx.statementContext.setCteProducer(producer.getCteId(), producer);

Map<Slot, Slot> originToProducerSlot = new HashMap<>();
for (int i = 0; i < agg.child().getOutput().size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public class StatsDerive extends PlanVisitor<Statistics, StatsDerive.DeriveConte
private final boolean deepDerive;

/**
* context
*/
* context
*/
public static class DeriveContext {
StatsCalculator calculator = new StatsCalculator(null);
}
Expand Down Expand Up @@ -351,20 +351,28 @@ public Statistics visitLogicalWindow(LogicalWindow<? extends Plan> window, Deriv

@Override
public Statistics visitLogicalCTEProducer(LogicalCTEProducer<? extends Plan> cteProducer, DeriveContext context) {
// Fallback registration: ensure cteId -> producer mapping is always available
// even if some upstream rewrite path misses explicit registration.
ConnectContext.get().getStatementContext().setCteProducer(cteProducer.getCteId(), cteProducer);
Statistics prodStats = cteProducer.child().accept(this, context);
StatisticsBuilder builder = new StatisticsBuilder(prodStats);
builder.setWidthInJoinCluster(1);
Statistics stats = builder.build();
cteProducer.setStatistics(stats);
ConnectContext.get().getStatementContext().setProducerStats(cteProducer.getCteId(), stats);
return stats;
}

@Override
public Statistics visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, DeriveContext context) {
CTEId cteId = cteConsumer.getCteId();
Statistics prodStats = ConnectContext.get().getStatementContext().getProducerStatsByCteId(cteId);
Preconditions.checkArgument(prodStats != null, String.format("Stats for CTE: %s not found", cteId));
LogicalCTEProducer<? extends Plan> cteProducer = ConnectContext.get().getStatementContext()
.getCteProducerByCteId(cteId);
Preconditions.checkState(cteProducer != null,
String.format("CTE producer for CTE: %s not found", cteId));
Statistics prodStats = cteProducer.getStats();
if (prodStats == null || deepDerive) {
prodStats = cteProducer.accept(this, context);
}
Statistics consumerStats = new Statistics(prodStats.getRowCount(), 1, new HashMap<>());
for (Slot slot : cteConsumer.getOutput()) {
Slot prodSlot = cteConsumer.getProducerSlot(slot);
Expand Down Expand Up @@ -404,8 +412,3 @@ public Statistics visitLogicalRelation(LogicalRelation relation, DeriveContext c
return stats;
}
}





Loading