Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 106 additions & 19 deletions core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.apache.calcite.rex.RexUtil.andNot;
import static org.apache.calcite.rex.RexUtil.removeAll;
Expand Down Expand Up @@ -192,8 +193,18 @@ public SubstitutionVisitor(RelNode target_, RelNode query_,
this.simplify =
new RexSimplify(cluster.getRexBuilder(), predicates, executor);
this.rules = rules;
this.query = Holder.of(MutableRels.toMutable(query_));
MutableRel mutableQuery = MutableRels.toMutable(query_);
this.target = MutableRels.toMutable(target_);
if (mutableQuery instanceof MutableCalc) {
this.query = Holder.of(mutableQuery);
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we always try to compensate Calc in the original RelNode. Why can't we do it through custom normalization, or there will be more patterns in the future materialized view recognition. There should be a unified way to implement it.
How do you think?

List<? extends RexNode> rexNodes =
cluster.getRexBuilder().identityProjects(mutableQuery.rowType);
RexProgram program = RexProgram.create(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a PR[1] to enhance the normalization ability before the recognition of materialized views by defining normalization rules. In the unit test, the ability of materialized recognition is enhanced by compensating the Calc operator. Through this way to achieve better versatility.
[1] Add an interface in RelOptMaterializations to allow registering normalization rules

mutableQuery.rowType, rexNodes, null, mutableQuery.rowType, cluster.getRexBuilder());
MutableCalc mutableCalc = MutableCalc.of(mutableQuery, program);
this.query = Holder.of(mutableCalc);
}
this.relBuilder = relBuilderFactory.create(cluster, null);
final Set<MutableRel> parents = Sets.newIdentityHashSet();
final List<MutableRel> allNodes = new ArrayList<>();
Expand Down Expand Up @@ -1034,7 +1045,8 @@ protected static Operand target(int ordinal) {
}

/** Implementation of {@link UnifyRule} that matches if the query is already
* equal to the target.
* equal to the target, or the query input is equal to target input and target
* is a {@link MutableCalc} and target just does some projections.
*
* <p>Matches scans to the same table, because these will be
* {@link MutableScan}s with the same
Expand All @@ -1051,6 +1063,35 @@ private TrivialRule() {
if (call.query.equals(call.target)) {
return call.result(call.target);
}
if (call.query.getInputs().size() == 1
&& !(call.query instanceof MutableCalc)
&& call.target instanceof MutableCalc
&& call.query.getInputs().get(0).equals(call.target.getInputs().get(0))) {
MutableRel queryInput = call.query.getInputs().get(0);
MutableCalc target = (MutableCalc) call.target;
Pair<RexNode, List<RexNode>> explainTarget = explainCalc(target);
// If target is a Calc and projects all the columns.
if (explainTarget.getKey() == null || explainTarget.getKey().isAlwaysTrue()) {
boolean onlyProjInTarget = explainTarget.getValue()
.stream().allMatch(rex -> rex instanceof RexInputRef);
if (onlyProjInTarget && explainTarget.getValue().size() == fieldCnt(queryInput)) {
final Mappings.TargetMapping mapping =
Project.getMapping(fieldCnt(queryInput), explainTarget.getValue()).inverse();
final RexBuilder rexBuilder = call.getCluster().getRexBuilder();
final List<? extends RexNode> compenProjs =
Util.transform(queryInput.rowType.getFieldList(),
input -> new RexInputRef(mapping.getTarget(input.getIndex()), input.getType()));
RexProgram compenRexProgram = RexProgram.create(
target.rowType, compenProjs,
null, queryInput.rowType, rexBuilder);
MutableCalc compenCalc =
MutableCalc.of(target, compenRexProgram);
MutableRel result = call.query.clone();
result.getInputs().get(0).replaceInParent(compenCalc);
return call.result(result);
}
}
}
return null;
}
}
Expand Down Expand Up @@ -1778,7 +1819,8 @@ private static UnifyResult tryMergeParentCalcAndGenResult(
return call.create(parent).result(mergedCalc, false);
}
}
return call.result(child);
boolean stopTrying = child.getInputs().get(0).equals(call.target) || child.equals(call.target);
return call.result(child, stopTrying);
}

/** Merge two MutableCalc together. */
Expand Down Expand Up @@ -1921,12 +1963,6 @@ public static MutableRel unifyAggregates(MutableAggregate query,
return null;
}
}
final ImmutableBitSet groupSet = query.groupSet.permute(map);
ImmutableList<ImmutableBitSet> groupSets = null;
if (query.getGroupType() != Aggregate.Group.SIMPLE) {
groupSets = ImmutableBitSet.ORDERING.immutableSortedCopy(
ImmutableBitSet.permute(query.groupSets, map));
}
final List<AggregateCall> aggregateCalls = new ArrayList<>();
for (AggregateCall aggregateCall : query.aggCalls) {
if (aggregateCall.isDistinct() && aggregateCall.getArgList().size() == 1) {
Expand Down Expand Up @@ -1962,18 +1998,69 @@ public static MutableRel unifyAggregates(MutableAggregate query,
aggregateCall.collation, aggregateCall.type,
aggregateCall.name));
}
if (targetCond != null && !targetCond.isAlwaysTrue()) {
RexProgram compenRexProgram = RexProgram.create(
target.rowType, rexBuilder.identityProjects(target.rowType),
targetCond, target.rowType, rexBuilder);
// Always add a Calc between target aggregate and the rolled up aggregate,
// but the calc just projects columns used by the rolled up aggregate.
List<Integer> projIndexs = new ArrayList<>();
// Mapping for column used by outside agg to index in added calc.
Map<Integer, Integer> columnMap = new HashMap<>();
final ImmutableBitSet groupSet = query.groupSet.permute(map);
List<Integer> groupList = groupSet.asList();
// Check for query group list
for (int index = 0; index < groupList.size(); index++) {
int inputIndexValue = groupList.get(index);
projIndexs.add(inputIndexValue);
columnMap.put(inputIndexValue, projIndexs.size() - 1);

}
// check for query agg calls
for (AggregateCall aggregateCall : aggregateCalls) {
for (Integer inputIndexValue : aggregateCall.getArgList()) {
if (columnMap.containsKey(inputIndexValue)) {
continue;
}
projIndexs.add(inputIndexValue);
columnMap.put(inputIndexValue, projIndexs.size() - 1);
}
}

result = MutableAggregate.of(
MutableCalc.of(target, compenRexProgram),
groupSet, groupSets, aggregateCalls);
} else {
result = MutableAggregate.of(
target, groupSet, groupSets, aggregateCalls);
// Update the index of aggs.
final List<AggregateCall> updatedAggregates = new ArrayList<>();
for (AggregateCall aggregateCall : aggregateCalls) {
List<Integer> newArgIndex = aggregateCall.getArgList()
.stream()
.map(i -> columnMap.get(i))
.collect(Collectors.toList());
updatedAggregates.add(
AggregateCall.create(aggregateCall.getAggregation(),
aggregateCall.isDistinct(), aggregateCall.isApproximate(),
aggregateCall.ignoreNulls(),
newArgIndex, -1,
aggregateCall.collation, aggregateCall.type,
aggregateCall.name));
}

List<String> fieldNames = new ArrayList<>();
List<RexNode> newProjs = new ArrayList<>();
for (int index : projIndexs) {
RelDataType fieldType = target.rowType.getFieldList().get(index).getType();
newProjs.add(rexBuilder.makeInputRef(fieldType, index));
fieldNames.add(target.rowType.getFieldNames().get(index));
}

RexProgram compenRexProgram = RexProgram.create(
target.rowType, newProjs,
targetCond, fieldNames, rexBuilder);

MutableCalc calc = MutableCalc.of(target, compenRexProgram);
ImmutableBitSet updatedGroupSet = groupSet.permute(columnMap);
ImmutableList<ImmutableBitSet> updatedGroupSets = null;
if (query.getGroupType() != Aggregate.Group.SIMPLE) {
updatedGroupSets = ImmutableBitSet.ORDERING.immutableSortedCopy(
ImmutableBitSet.permute(query.groupSets, map));
updatedGroupSets = ImmutableBitSet.ORDERING.immutableSortedCopy(
ImmutableBitSet.permute(updatedGroupSets, columnMap));
}
result = MutableAggregate.of(calc, updatedGroupSet, updatedGroupSets, updatedAggregates);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ public static Frameworks.ConfigBuilder config() {
ImmutableList.of(relOptMaterialization));

final String optimized = ""
+ "LogicalProject(deptno=[CAST($0):TINYINT], count_sal=[$1])\n"
+ " LogicalTableScan(table=[[mv0]])\n";
+ "LogicalCalc(expr#0..1=[{inputs}], proj#0..1=[{exprs}])\n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalCalc seems like unnecessary.

+ " LogicalProject(deptno=[CAST($0):TINYINT], count_sal=[$1])\n"
+ " LogicalTableScan(table=[[mv0]])\n";
final String relOptimizedStr = RelOptUtil.toString(relOptimized.get(0).getKey());
assertThat(isLinux(optimized).matches(relOptimizedStr), is(true));
}
Expand Down
Loading