Skip to content
Permalink
Browse files
IGNITE-16482 Adoption of a bunch of tickets from Ignite-2 (#635)
IGNITE-16107 Project and filters are not merged into scan node correctly
IGNITE-16333 IgniteProject need to be initialized with single distribution
IGNITE-15980 Flaky test SetOpPlannerTest.testSetOpRandom[SetOp = INTERSECT]
  • Loading branch information
zstan committed Feb 24, 2022
1 parent 6e0aff3 commit f1a932faa76dd51cfc38bdffc4cfc36c1f1f5469
Showing 11 changed files with 376 additions and 164 deletions.
@@ -20,9 +20,11 @@
import static org.apache.calcite.util.NumberUtil.multiply;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;

import org.apache.calcite.rel.core.Intersect;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdRowCount;
import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -36,7 +38,6 @@
import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
import org.jetbrains.annotations.Nullable;

/**
@@ -64,9 +65,16 @@ public double getRowCount(IgniteSortedIndexSpool rel, RelMetadataQuery mq) {
}

/**
* Estimation of row count for set op (MINUS, INTERSECT).
* Estimation of row count for Intersect operator.
*/
public double getRowCount(IgniteSetOp rel, RelMetadataQuery mq) {
@Override public Double getRowCount(Intersect rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}

/**
* Estimation of row count for Minus operator.
*/
@Override public Double getRowCount(Minus rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}

@@ -87,6 +87,8 @@ public Program getProgram(PlanningContext ctx) {

HEP_FILTER_PUSH_DOWN(
"Heuristic phase to push down filters",
FilterScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,

CoreRules.FILTER_MERGE,
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
CoreRules.FILTER_SET_OP_TRANSPOSE,
@@ -104,7 +106,6 @@ public Program getProgram(PlanningContext ctx) {
HEP_PROJECT_PUSH_DOWN(
"Heuristic phase to push down and merge projects",
ProjectScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
ProjectScanMergeRule.INDEX_SCAN_SKIP_CORRELATED,

CoreRules.JOIN_PUSH_EXPRESSIONS,
CoreRules.PROJECT_MERGE,
@@ -198,10 +199,11 @@ public Program getProgram(PlanningContext ctx) {

LogicalOrToUnionRule.INSTANCE,

// TODO: https://issues.apache.org/jira/browse/IGNITE-16334 join rules ordering is significant here.
MergeJoinConverterRule.INSTANCE,
CorrelatedNestedLoopJoinRule.INSTANCE,
CorrelateToNestedLoopRule.INSTANCE,
NestedLoopJoinConverterRule.INSTANCE,
MergeJoinConverterRule.INSTANCE,

ValuesConverterRule.INSTANCE,
LogicalScanConverterRule.INDEX_SCAN,
@@ -22,13 +22,11 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.sql.engine.rel.AbstractIndexScan;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.IndexConditions;
@@ -52,16 +50,12 @@ public static IgniteLogicalIndexScan create(
) {
InternalIgniteTable tbl = table.unwrap(InternalIgniteTable.class);
IgniteTypeFactory typeFactory = Commons.typeFactory(cluster);
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
RelCollation collation = tbl.getIndex(idxName).collation();

if (requiredColumns != null) {
Mappings.TargetMapping targetMapping = Commons.mapping(requiredColumns,
tbl.getRowType(typeFactory).getFieldCount());
collation = collation.apply(targetMapping);
if (proj != null) {
collation = TraitUtils.projectCollation(collation, proj, rowType);
}
}

IndexConditions idxCond = new IndexConditions();
@@ -29,6 +29,8 @@
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
import org.apache.ignite.internal.sql.engine.util.RexUtils;

/**
@@ -50,14 +52,24 @@ public FilterConverterRule() {
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalFilter rel) {
RelOptCluster cluster = rel.getCluster();
RelTraitSet traits = rel.getTraitSet().replace(IgniteConvention.INSTANCE);

RelTraitSet traits = cluster
.traitSetOf(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single());

Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.getCondition());

if (!corrIds.isEmpty()) {
traits = traits.replace(CorrelationTrait.correlations(corrIds));
traits = traits
.replace(CorrelationTrait.correlations(corrIds))
.replace(RewindabilityTrait.REWINDABLE);
}

return new IgniteFilter(cluster, traits, rel.getInput(), rel.getCondition());
return new IgniteFilter(
cluster,
traits,
convert(rel.getInput(), traits.replace(CorrelationTrait.UNCORRELATED)),
rel.getCondition()
);
}
}
@@ -17,16 +17,22 @@

package org.apache.ignite.internal.sql.engine.rule;

import java.util.Collection;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
import org.apache.ignite.internal.sql.engine.util.RexUtils;

/**
* ProjectConverterRule.
@@ -47,7 +53,19 @@ public ProjectConverterRule() {
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalProject rel) {
RelOptCluster cluster = rel.getCluster();
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE);

RelTraitSet traits = cluster
.traitSetOf(IgniteConvention.INSTANCE)
.replace(IgniteDistributions.single());

Collection<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.getProjects());

if (!corrIds.isEmpty()) {
traits = traits
.replace(CorrelationTrait.correlations(corrIds))
.replace(RewindabilityTrait.REWINDABLE);
}

RelNode input = convert(rel.getInput(), traits);

return new IgniteProject(cluster, traits, input, rel.getProjects(), rel.getRowType());
@@ -40,7 +40,7 @@
*/
@Value.Enclosing
public class ExposeIndexRule extends RelRule<ExposeIndexRule.Config> {
public static final RelOptRule INSTANCE = Config.DEFAULT.toRule();
public static final RelOptRule INSTANCE = Config.DEFAULT.withDescription("ExposeIndexRule").toRule();

public ExposeIndexRule(Config config) {
super(config);
@@ -17,34 +17,22 @@

package org.apache.ignite.internal.sql.engine.rule.logical;

import static java.util.Arrays.asList;

import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.ControlFlowException;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.ignite.internal.sql.engine.rel.ProjectableFilterableTableScan;
import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.RexUtils;
import org.immutables.value.Value;

@@ -59,6 +47,8 @@

public static final RelOptRule TABLE_SCAN = Config.TABLE_SCAN.toRule();

public static final RelOptRule TABLE_SCAN_SKIP_CORRELATED = Config.TABLE_SCAN_SKIP_CORRELATED.toRule();

/**
* Constructor.
*
@@ -78,69 +68,31 @@ public void onMatch(RelOptRuleCall call) {
RexBuilder builder = RexUtils.builder(cluster);

RexNode condition = filter.getCondition();
RexNode remaining = null;

if (scan.condition() != null) {
condition = RexUtil.composeConjunction(builder, asList(scan.condition(), condition));
}

if (scan.projects() != null) {
IgniteTypeFactory typeFactory = Commons.typeFactory(scan);

IgniteTable tbl = scan.getTable().unwrap(IgniteTable.class);

RelDataType cols = tbl.getRowType(typeFactory, scan.requiredColumns());

Mappings.TargetMapping permutation = RexUtils.permutation(scan.projects(), cols, true);

List<RexNode> conjunctions = RelOptUtil.conjunctions(condition);

List<RexNode> condition0 = new ArrayList<>(conjunctions.size());
List<RexNode> remaining0 = new ArrayList<>(conjunctions.size());

RexShuttle shuttle = new RexShuttle() {
@Override
public RexNode visitInputRef(RexInputRef ref) {
int targetRef = permutation.getTargetOpt(ref.getIndex());
if (targetRef == -1) {
throw new ControlFlowException();
}
return new RexInputRef(targetRef, ref.getType());
return scan.projects().get(ref.getIndex());
}
};

for (RexNode cond0 : conjunctions) {
try {
condition0.add(shuttle.apply(cond0));
} catch (ControlFlowException e) {
remaining0.add(cond0);
}
}
condition = shuttle.apply(condition);
}

condition = RexUtil.composeConjunction(builder, condition0, false);
remaining = RexUtil.composeConjunction(builder, remaining0, true);
if (scan.condition() != null) {
condition = RexUtil.composeConjunction(builder, Arrays.asList(scan.condition(), condition));
}

// We need to replace RexInputRef with RexLocalRef because TableScan doesn't have inputs.
// TODO SEARCH support
condition = RexUtils.replaceInputRefs(RexUtil.expandSearch(builder, null, condition));

RelTraitSet trait = scan.getTraitSet();
CorrelationTrait filterCorr = TraitUtils.correlation(filter);

if (filterCorr.correlated()) {
trait = trait.replace(filterCorr);
}
// Set default traits, real traits will be calculated for physical node.
RelTraitSet trait = cluster.traitSet();

RelNode res = createNode(cluster, scan, trait, condition);

if (remaining != null) {
res = relBuilderFactory.create(cluster, null)
.push(res)
.filter(remaining)
.build();
}

call.transformTo(res);
}

@@ -190,19 +142,28 @@ public interface Config extends RuleFactoryConfig<Config> {
.withRuleFactory(FilterTableScanMergeRule::new)
.build();

Config TABLE_SCAN = DEFAULT.withScanRuleConfig(IgniteLogicalTableScan.class, "FilterTableScanMergeRule");
Config TABLE_SCAN = DEFAULT
.withScanRuleConfig(IgniteLogicalTableScan.class, "FilterTableScanMergeRule", false);

Config TABLE_SCAN_SKIP_CORRELATED = DEFAULT
.withScanRuleConfig(IgniteLogicalTableScan.class, "FilterTableScanMergeSkipCorrelatedRule", true);

Config INDEX_SCAN = DEFAULT
.withRuleFactory(FilterIndexScanMergeRule::new)
.withScanRuleConfig(IgniteLogicalIndexScan.class, "FilterIndexScanMergeRule");
.withScanRuleConfig(IgniteLogicalIndexScan.class, "FilterIndexScanMergeRule", false);

/**
* Create configuration for specified scan.
*/
default Config withScanRuleConfig(Class<? extends ProjectableFilterableTableScan> scanCls, String desc) {
default Config withScanRuleConfig(
Class<? extends ProjectableFilterableTableScan> scanCls,
String desc,
boolean skipCorrelated
) {
return withDescription(desc)
.withOperandSupplier(b ->
b.operand(LogicalFilter.class).oneInput(b1 -> b1.operand(scanCls).noInputs()))
.withOperandSupplier(b -> b.operand(LogicalFilter.class)
.predicate(p -> !skipCorrelated || !RexUtils.hasCorrelation(p.getCondition()))
.oneInput(b1 -> b1.operand(scanCls).noInputs()))
.as(Config.class);
}
}

0 comments on commit f1a932f

Please sign in to comment.