Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.nereids.memo.Memo;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterContextV2;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.RuleSet;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class CascadesContext implements ScheduleContext {
// subqueryExprIsAnalyzed: whether the subquery has been analyzed.
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
private final RuntimeFilterContext runtimeFilterContext;
private final RuntimeFilterContextV2 runtimeFilterV2Context;
private final TopnFilterContext topnFilterContext = new TopnFilterContext();
private Optional<Scope> outerScope = Optional.empty();

Expand Down Expand Up @@ -157,6 +159,7 @@ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> curren
IdGenerator<RuntimeFilterId> runtimeFilterIdGen = RuntimeFilterId.createGenerator();
this.runtimeFilterContext = new RuntimeFilterContext(getConnectContext().getSessionVariable(),
runtimeFilterIdGen);
this.runtimeFilterV2Context = new RuntimeFilterContextV2(runtimeFilterIdGen);
this.materializationContexts = new HashMap<>();
if (statementContext.getConnectContext() != null) {
ConnectContext connectContext = statementContext.getConnectContext();
Expand Down Expand Up @@ -606,4 +609,8 @@ public boolean isEnableExprTrace() {
public boolean rewritePlanContainsTypes(Class<?>... types) {
return getRewritePlan().containsType(types);
}

public RuntimeFilterContextV2 getRuntimeFilterV2Context() {
return runtimeFilterV2Context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,10 @@ public void appendTupleInfo(StringBuilder str) {

@Override
public List<RuntimeFilter> getRuntimeFilters() {
return new ArrayList<>(cascadesContext.getRuntimeFilterContext().getLegacyFilters());
ArrayList<RuntimeFilter> runtimeFilters = new ArrayList<>();
runtimeFilters.addAll(cascadesContext.getRuntimeFilterContext().getLegacyFilters());
runtimeFilters.addAll(cascadesContext.getRuntimeFilterV2Context().getLegacyFilters());
return runtimeFilters;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterV2;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecAllSingleton;
import org.apache.doris.nereids.properties.DistributionSpecAny;
Expand Down Expand Up @@ -1007,6 +1008,14 @@ private void translateRuntimeFilter(PhysicalRelation physicalRelation, ScanNode
}
}

// translate rf v2 target
List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
.getRuntimeFilterV2ByTargetPlan(physicalRelation);
for (RuntimeFilterV2 rfV2 : rfV2s) {
Expr targetExpr = rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
rfV2.setLegacyTargetNode(scanNode);
rfV2.setLegacyTargetExpr(targetExpr);
}
context.getTopnFilterContext().translateTarget(physicalRelation, scanNode, context);
}

Expand Down Expand Up @@ -1774,9 +1783,8 @@ public PlanFragment visitPhysicalHashJoin(
.collect(Collectors.toMap(Slot::getExprId, s -> s, (existing, replacement) -> existing));

// translate runtime filter
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.createLegacyRuntimeFilters(
physicalHashJoin.getRuntimeFilters(), hashJoinNode, context));
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> physicalHashJoin.getRuntimeFilters()
.forEach(filter -> runtimeFilterTranslator.createLegacyRuntimeFilter(filter, hashJoinNode, context)));

// make intermediate tuple
List<SlotDescriptor> leftIntermediateSlotDescriptor = Lists.newArrayList();
Expand Down Expand Up @@ -1954,7 +1962,8 @@ public PlanFragment visitPhysicalNestedLoopJoin(
// translate runtime filter
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> {
List<RuntimeFilter> filters = nestedLoopJoin.getRuntimeFilters();
runtimeFilterTranslator.createLegacyRuntimeFilters(filters, nestedLoopJoinNode, context);
filters.forEach(filter -> runtimeFilterTranslator
.createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context));
if (filters.stream().anyMatch(filter -> filter.getType() == TRuntimeFilterType.BITMAP)) {
nestedLoopJoinNode.setOutputLeftSideOnly(true);
}
Expand Down Expand Up @@ -2489,18 +2498,22 @@ public PlanFragment visitPhysicalSetOperation(
@Override
public PlanFragment visitPhysicalIntersect(PhysicalIntersect intersect, PlanTranslatorContext context) {
PlanFragment fragment = visitPhysicalSetOperation(intersect, context);
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.createLegacyRuntimeFilters(
intersect.getRuntimeFilters(), fragment.getPlanRoot(), context));
RunTimeFilterTranslatorV2.INSTANCE.createLegacyRuntimeFilters(
fragment.getPlanRoot(),
intersect.getRuntimeFiltersV2(),
context);

return fragment;
}

@Override
public PlanFragment visitPhysicalExcept(PhysicalExcept except, PlanTranslatorContext context) {
PlanFragment fragment = visitPhysicalSetOperation(except, context);
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.createLegacyRuntimeFilters(
except.getRuntimeFilters(), fragment.getPlanRoot(), context));
RunTimeFilterTranslatorV2.INSTANCE.createLegacyRuntimeFilters(
fragment.getPlanRoot(),
except.getRuntimeFiltersV2(),
context);

return fragment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterContextV2;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.SlotReference;
Expand All @@ -41,6 +42,7 @@
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -108,6 +110,7 @@ public class PlanTranslatorContext {
private final Map<RelationId, TPushAggOp> tablePushAggOp = Maps.newHashMap();

private final Map<ScanNode, Set<SlotId>> statsUnknownColumnsMap = Maps.newHashMap();
private final RuntimeFilterContextV2 runtimeFilterV2Context;

private boolean isTopMaterializeNode = true;

Expand All @@ -124,6 +127,7 @@ public PlanTranslatorContext(CascadesContext ctx) {
.build();
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
this.descTable = new DescriptorTable();
}

Expand All @@ -138,6 +142,7 @@ public PlanTranslatorContext(CascadesContext ctx, DescriptorTable descTable) {
.build();
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
this.descTable = descTable;
}

Expand All @@ -151,6 +156,8 @@ public PlanTranslatorContext() {
this.scanContext = ScanContext.EMPTY;
this.translator = null;
this.topnFilterContext = new TopnFilterContext();
IdGenerator<RuntimeFilterId> runtimeFilterIdGen = RuntimeFilterId.createGenerator();
this.runtimeFilterV2Context = new RuntimeFilterContextV2(runtimeFilterIdGen);
this.descTable = new DescriptorTable();
}

Expand Down Expand Up @@ -370,6 +377,10 @@ public void setTopMaterializeNode(boolean topMaterializeNode) {
isTopMaterializeNode = topMaterializeNode;
}

public RuntimeFilterContextV2 getRuntimeFilterV2Context() {
return runtimeFilterV2Context;
}

public Set<SlotId> getVirtualColumnIds() {
return virtualColumnIds;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.glue.translator;

import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.nereids.processor.post.runtimefilterv2.RuntimeFilterV2;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* RunTimeFilterTranslatorV2
*/
public class RunTimeFilterTranslatorV2 {
public static RunTimeFilterTranslatorV2 INSTANCE = new RunTimeFilterTranslatorV2();

/**
* createLegacyRuntimeFilters
*/
public void createLegacyRuntimeFilters(PlanNode sourceNode,
List<RuntimeFilterV2> filters, PlanTranslatorContext ctx) {
List<RuntimeFilterV2> filtersToTranslate = Lists.newArrayList(filters);
Set<Integer> ignoreRuntimeFilterIds = ConnectContext.get() != null
? ConnectContext.get().getSessionVariable().getIgnoredRuntimeFilterIds()
: new HashSet<>();
while (!filtersToTranslate.isEmpty()) {
List<RuntimeFilterV2> translateRound = Lists.newArrayListWithCapacity(filtersToTranslate.size());
List<RuntimeFilterV2> otherRound = Lists.newArrayListWithCapacity(filtersToTranslate.size());
RuntimeFilterV2 head = filtersToTranslate.get(0);
if (!ignoreRuntimeFilterIds.contains(head.getRuntimeFilterId().asInt())) {
translateRound.add(head);
}
for (int i = 1; i < filtersToTranslate.size(); i++) {
if (!ignoreRuntimeFilterIds.contains(filtersToTranslate.get(i).getRuntimeFilterId().asInt())) {
if (head.getSourceExpression().equals(filtersToTranslate.get(i).getSourceExpression())
&& head.getType() == filtersToTranslate.get(i).getType()) {
translateRound.add(filtersToTranslate.get(i));
} else {
otherRound.add(filtersToTranslate.get(i));
}
}
}
if (!translateRound.isEmpty()) {
translateRuntimeFilterGroup(sourceNode, translateRound, ctx);
}
filtersToTranslate = otherRound;
}

}

/**
* a group of RFs if their source and type are the same, but their targets are
* different.
* example:
* rf1[bloom](a->T1.b) rf2[bloom](a->T2.c) rf3[min_max](a->t3.d)
* rf1 and rf2 are in one group, but rf3 is not
*
*/
private void translateRuntimeFilterGroup(PlanNode sourceNode,
List<RuntimeFilterV2> filters, PlanTranslatorContext ctx) {
if (filters.isEmpty()) {
return;
}

RuntimeFilterV2 head = filters.get(0);

Expr srcExpr = ExpressionTranslator.translate(head.getSourceExpression(), ctx);

List<RuntimeFilterTarget> targets = new ArrayList<>();
for (RuntimeFilterV2 filter : filters) {
Expr targetExpr = filter.getLegacyTargetExpr();
if (!srcExpr.getType().equals(targetExpr.getType())) {
targetExpr = new CastExpr(srcExpr.getType(), targetExpr, Cast.castNullable(srcExpr.isNullable(),
DataType.fromCatalogType(srcExpr.getType()), DataType.fromCatalogType(targetExpr.getType())));
}
targets.add(new RuntimeFilterTarget(filter.getLegacyTargetNode(), targetExpr));
}

RuntimeFilter legacyFilter = new RuntimeFilter(
head.getId(),
sourceNode,
srcExpr,
head.getExprOrder(),
targets,
head.getType(),
head.getBuildNdvOrRowCount(),
head.getTMinMaxRuntimeFilterType());

ctx.getRuntimeFilterV2Context().addLegacyRuntimeFilter(legacyFilter);

// finalize
legacyFilter.assignToPlanNodes();
legacyFilter.extractTargetsPosition();
legacyFilter.markFinalized();
}

}
Loading
Loading