Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ private Expression getSingleNumericSlotOrExpressionCoveredByCast(Expression expr
public Boolean visit(Plan plan, PushDownContext ctx) {
boolean pushed = false;
for (Plan child : plan.children()) {
pushed |= child.accept(this, ctx);
if (child.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
pushed |= child.accept(this, ctx);
}
}
return pushed;
}
Expand Down Expand Up @@ -240,6 +242,10 @@ public Boolean visitPhysicalRelation(PhysicalRelation scan, PushDownContext ctx)
@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
PushDownContext ctx) {
if (!ctx.builderNode.equals(join)
&& !join.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
return false;
}
boolean pushed = false;

if (ctx.builderNode instanceof PhysicalHashJoin) {
Expand Down Expand Up @@ -314,6 +320,9 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
@Override
public Boolean visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
PushDownContext ctx) {
if (!join.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
return false;
}
if (ctx.builderNode instanceof PhysicalHashJoin) {
/*
hashJoin( t1.A <=> t2.A )
Expand Down Expand Up @@ -342,6 +351,9 @@ public Boolean visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan

@Override
public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> project, PushDownContext ctx) {
if (!project.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
return false;
}
// project ( A+1 as x)
// probeExpr: abs(x) => abs(A+1)
PushDownContext ctxProjectProbeExpr = ctx;
Expand Down Expand Up @@ -384,6 +396,9 @@ public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> project, Pus

@Override
public Boolean visitPhysicalSetOperation(PhysicalSetOperation setOperation, PushDownContext ctx) {
if (!setOperation.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
return false;
}
boolean pushedDown = false;
int projIndex = -1;
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(ctx.probeExpr);
Expand Down Expand Up @@ -429,6 +444,10 @@ public Boolean visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PushDownCont

@Override
public Boolean visitPhysicalWindow(PhysicalWindow<? extends Plan> window, PushDownContext ctx) {
if (!window.getOutputSet().containsAll(ctx.probeExpr.getInputSlots())) {
return false;
}

Set<SlotReference> commonPartitionKeys = window.getCommonPartitionKeyFromWindowExpressions();
if (commonPartitionKeys.containsAll(ctx.probeExpr.getInputSlots())) {
return window.child().accept(this, ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,23 @@
import org.apache.doris.nereids.datasets.ssb.SSBUtils;
import org.apache.doris.nereids.glue.translator.PhysicalPlanTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.processor.post.PlanPostProcessors;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.plans.DistributeType;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.PlanChecker;

Expand All @@ -33,6 +47,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -344,4 +359,65 @@ public void testRuntimeFilterBlockByTopN() {
Assertions.assertEquals(0, filters.size());
}

@Test
public void testNotGenerateRfOnDanglingSlot() {
String sql = "select lo_custkey from lineorder union all select c_custkey from customer union all select p_partkey from part;";
PlanChecker checker = PlanChecker.from(connectContext)
.analyze(sql)
.rewrite()
.implement();
PhysicalPlan plan = checker.getPhysicalPlan();

/* construct plan for
join (#18=p_partkey)
-->join()
-->project(null as #18, ...)
-->lineorder
-->project(c_custkey#17)
-->customer(output: c_custkey#17, c_name#18, ...)
-->project(p_partkey#25)
-->part

test purpose:
do not generate RF by "#18=p_partkey" and apply this rf on customer
*/
PhysicalProject<Plan> projectCustomer = (PhysicalProject<Plan>) plan.child(0).child(1);
SlotReference cCustkey = (SlotReference) projectCustomer.getProjects().get(0);
PhysicalProject<Plan> projectPart = (PhysicalProject<Plan>) plan.child(0).child(2);
SlotReference pPartkey = (SlotReference) projectPart.getProjects().get(0);

PhysicalOlapScan lo = (PhysicalOlapScan) plan.child(0).child(0).child(0);
SlotReference loCustkey = (SlotReference) lo.getBaseOutputs().get(2);
SlotReference loPartkey = (SlotReference) lo.getBaseOutputs().get(3);
Alias nullAlias = new Alias(new ExprId(18), new NullLiteral(), ""); // expr#18 is used by c_name
List<NamedExpression> projList = new ArrayList<>();
projList.add(loCustkey);
projList.add(loPartkey);
projList.add(nullAlias);
PhysicalProject projLo = new PhysicalProject(projList, null, lo);

PhysicalHashJoin joinLoC = new PhysicalHashJoin(JoinType.INNER_JOIN,
ImmutableList.of(new EqualTo(loCustkey, cCustkey)),
ImmutableList.of(),
new DistributeHint(DistributeType.NONE),
Optional.empty(),
null,
projLo,
projectCustomer
);
PhysicalHashJoin joinLoCP = new PhysicalHashJoin(JoinType.INNER_JOIN,
ImmutableList.of(new EqualTo(nullAlias.toSlot(), pPartkey)),
ImmutableList.of(),
new DistributeHint(DistributeType.NONE),
Optional.empty(),
null,
joinLoC,
projectPart
);
checker.getCascadesContext().getConnectContext().getSessionVariable().enableRuntimeFilterPrune = false;
plan = new PlanPostProcessors(checker.getCascadesContext()).process(joinLoCP);
System.out.println(plan.treeString());
Assertions.assertEquals(0, ((AbstractPhysicalPlan) plan.child(0).child(1).child(0))
.getAppliedRuntimeFilters().size());
}
}